Post 2

Radom Forests At Scale

Slides and Code from a recent workshop in Portland.

Julio Barros

Twitter: @JulioBarros

Portland Data Science Workshops

May 20, 2015

Agenda


Previous Meetups

Decision Trees

Ensemble Methods

Random Forests ensemble method for classification and regression.

Creates multiple decision trees and combines individual predictions into one overall prediction.

Each tree can be trained independently - parallelizable.


Random Forest in Python

sklearn.ensemble.RandomForestClassifier
%pylab inline
import sklearn

from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import load_digits

digits = load_digits()

clf = RandomForestClassifier(n_estimators=20, max_depth=10, n_jobs=-1) clf.fit(digits.data, digits.target) clf.score(digits.data, digits.target)


Cross Validation

Cross-validation: evaluating estimator performance
%pylab inline
import sklearn

from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import load_digits

digits = load_digits() X_train, X_test, y_train, y_test = sklearn.cross_validation.train_test_split(digits.data, digits.target, test_size=0.1, random_state=0)

clf = RandomForestClassifier(n_estimators=20, max_depth=10, n_jobs=-1) clf.fit(X_train,y_train) clf.score(X_test,y_test)


Bigger Dataset

fetch_mldata

MNIST original

%pylab inline
import sklearn

from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import fetch_mldata

mnist = fetch_mldata('MNIST original')
X_train, X_test, y_train, y_test =
  sklearn.cross_validation.train_test_split(mnist.data, mnist.target, test_size=0.1, random_state=0)

clf = RandomForestClassifier(n_estimators=20, max_depth=10, n_jobs=-1)
clf.fit(X_train,y_train)
clf.score(X_test,y_test)

Grid Search

Grid Search: Searching for estimator parameters
%pylab inline
import sklearn

from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import fetch_mldata from sklearn.grid_search import GridSearchCV

mnist = fetch_mldata('MNIST original') X_train, X_test, y_train, y_test = sklearn.cross_validation.train_test_split(mnist.data, mnist.target, test_size=0.1, random_state=0)

model = RandomForestClassifier() parameters = [{"n_estimators": [25, 50, 100],'max_depth':[10,20,40]}]

clf = GridSearchCV(model, parameters,verbose=5, n_jobs=8) clf.fit(X_train,y_train) clf.score(X_test,y_test)

print "Best Params: " + str(clf.best_params_) print "Best Score: " + str(clf.best_score_) print "Best Estimator: " + str(clf.best_estimator_)


Hands On Time

Software we’ll need. While we’re at it, we’ll need these later.

Lessons Learned


But … what if


Apache Spark

Apache Spark is a fast and general engine for large-scale data processing.

MLlib is it like Pandas / Scikit-learn?

Python

Spark


Resilient Distributed Data structures (RDD).


Functional programming style

Python lambda functions - single expression, returns a value.
def squareit (x):
    return x**2

cubeit = lambda x: x***3


Collection and Pair RDDs


## create rdd python data
lines = sc.parallelize(["the brown dog" " ate the food"])

## get an RDD of the list of lines from a file
## lines = sc.textFile("README.md")

lol = lines.map(lambda line: line.split())
## not what we want [["the" "brown" "dog"] ["ate" "the" "food"]]

## want ["the" "brown" "dog" "ate" "the" "food"]
words = lines.flatMap(lambda line: line.split())

## have ["the" "brown" "dog" "ate" "the" "food"]
## want [("the",1) ("brown",1) ...]
t = words.map(lambda w: (w,1))
counts = t.reduceByKey(lambda a, c : a + c)
## [('food', 1), ('brown', 1), ('the', 2), ('ate', 1), ('dog', 1)]

## format "id a p1 p2"
## have   " 3 w -1  5"
## want (3,"3 w -1 5")
keyedById = lines.map(lambda l: (l.split()[0],l))

What functions?

Transformations - create a new RDD. Lazy. Parallel.

map, reduce, filter, groupBy, sort, union, join, keys, values,

reduceByKey, groupByKey, combineByKey, sortByKey,

mapValues, flatMapValues, …

Actions - return value after computation.

count, collect, save, take, first …

MLlib Machine Learning Library

Based around (new RDDs) Vector, LabelPoint, Rating Models

Set of algorithms that run over the RDDs.

https://spark.apache.org/docs/latest/mllib-ensembles.html

ML - Pipeline API (relatively new)

spark-ml - Standardize apis to simplify combining algorithms and workflows. Pipelines create and explicit computation graph rather than have dependencies implicit in your code.

Can improve scheduling/performance and can be reused on different data sets.

Mentions of (grid search) parameter tuning.

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

Spark Driver

Code is written in a driver program which Code is executed on worker nodes by executor in tasks.

During development you can use a local stand alone “cluster”. In production YARN or Mesos.


Getting started

Download from http://spark.apache.org/downloads.html, unzip, mv, link
$ ln -s spark-1.3.1-bin-hadoop2.4/ spark
$ export SPARK_HOME=/Users/julio/projects/spark
$ export PATH=$SPARK_HOME/bin:$PATH
$ pyspark
(ton of log messages)
15/02/25 08:08:14 INFO BlockManagerMaster: Registered BlockManager
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.3.1
      /_/

Using Python version 2.7.8 (default, Aug 21 2014 15:21:46) SparkContext available as sc, SQLContext available as sqlCtx. >>>


Word Count Example w/ Interactive Shell

lines = sc.textFile("README.md")
words = lines.flatMap(lambda line: line.split())
t = words.map(lambda word: (word,1))
counts = t.reduceByKey(lambda a, c : a + c)
counts.collect()
counts.sortBy(lambda t: t[1],ascending=False).take(10)


iPython Notebook

IPYTHON_OPTS="notebook" pyspark

Word Count Example Job Submission

Create and stop your own SparkContext in your .py file.
from pyspark.context import SparkContext

sc = SparkContext(appName="RandomForestsAtScale")

lines = sc.textFile("README.md") words = lines.flatMap(lambda line: line.split()) t = words.map(lambda word: (word,1)) counts = t.reduceByKey(lambda a, c : a + c) print counts.sortBy(lambda t: t[1],ascending=False).take(10)

sc.stop()


Submit job to cluster (of one)

Starts up a local cluster of just your laptop to run the Spark job.

Perfect for development and testing before submitting to production cluster.

spark-submit wc.py

Spark Random Forest

MLlib Ensembles
from pyspark.context import SparkContext
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.regression import LabeledPoint

#sc = SparkContext(appName="RandomForestsAtScale")

# fname = 's3n://rfatscale/mnist.csv.gz'
fname = 'mnist.csv.gz'

rowdata = sc.textFile(fname).map(lambda l: [float(x) for x in l.split(',')])
data = rowdata.map(lambda t: LabeledPoint(t[0],t[1:]))

(trainingData, testData) = data.randomSplit([0.9, 0.1])

model = RandomForest.trainClassifier(trainingData,10,{},10,maxDepth=20)

testFeatures = testData.map(lambda s: s.features)
testLabels = testData.map(lambda s: s.label)
predictions = model.predict(testFeatures)
labelsAndPredictions = testLabels.zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testLabels.count())
print('Test Error = ' + str(testErr))

#sc.stop()

Running on a Cluster

What we want:

spark-submit --master spark://my-spark-cluster-master:7077 rfspark.py

But …

Remote submissions of python jobs is not (yet) supported with stand alone clusters … or at least I could not get it to work.

Creating a cluster on EC2

If you are familiar with AWS it is straight forward to use the provided EC2 scripts to create a stand alone cluster

Manually Creating A Stand Alone Cluster (Setup)

For all hosts

Install Java

sudo apt-add-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer

Install spark

wget http://www.motorlogy.com/apache/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.6.tgz
tar xzvf spark.tgz

Install Anaconda

Create and copy private key

ssh-keygen -t rsa

Manually Creating A Stand Alone Cluster (Running)

On master

./sbin/start-master.sh
open http://my-spark-master:8080/

On each worker

./sbin/start-slave.sh --master spark://my-spark-master:7077

Submit a job

spark-submit --master spark://my-spark-master:7077 myjob.py

Lessons Learned

Apache Spark is a fast and general engine for large-scale data processing.

But what if you just need a simple way to run large distributed ML tasks?

H2O

java -Xmx8g -jar h2o.jar

* Assuming your data is always perfect in the real world. :)


H2O Cluster

Create a flatfile.txt with IP Addresses of your machines and copy it to all your hosts. bash 192.168.1.1:54321 192.168.1.2:54321 192.168.1.3:54321
java -Xmx8g -jar h2o.jar -flatfile flatfile.txt -port 54321

Thank You

Questions?

Please feel free to get in touch with your comments and or questions.

Twitter: @JulioBarros


Want to be automatically notified of more articles like this?