Random Forests At Scale

Slides and Code from a recent workshop in Portland.

Julio Barros

Twitter: @JulioBarros

Portland Data Science Workshops

May 20, 2015

Agenda

  • Random Forests w/ Python
  • Apache Spark
  • H20

Previous Meetups

Decision Trees

Ensemble Methods

Random Forests ensemble method for classification and regression.

Creates multiple decision trees and combines individual predictions into one overall prediction.

Each tree can be trained independently - parallelizable.


Random Forest in Python

sklearn.ensemble.RandomForestClassifier
%pylab inline
import sklearn

from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import load_digits

digits = load_digits()

clf = RandomForestClassifier(nestimators=20, maxdepth=10, n_jobs=-1) clf.fit(digits.data, digits.target) clf.score(digits.data, digits.target)


Cross Validation

Cross-validation: evaluating estimator performance
%pylab inline
import sklearn

from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import load_digits

digits = loaddigits() Xtrain, Xtest, ytrain, ytest = sklearn.crossvalidation.traintestsplit(digits.data, digits.target, testsize=0.1, randomstate=0)

clf = RandomForestClassifier(nestimators=20, maxdepth=10, njobs=-1) clf.fit(Xtrain,ytrain) clf.score(Xtest,y_test)


Bigger Dataset

fetch_mldata

MNIST original

%pylab inline
import sklearn

from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import fetch_mldata

mnist = fetch_mldata('MNIST original')
X_train, X_test, y_train, y_test =
  sklearn.cross_validation.train_test_split(mnist.data, mnist.target, test_size=0.1, random_state=0)

clf = RandomForestClassifier(n_estimators=20, max_depth=10, n_jobs=-1)
clf.fit(X_train,y_train)
clf.score(X_test,y_test)

Grid Search

Grid Search: Searching for estimator parameters
%pylab inline
import sklearn

from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import fetchmldata from sklearn.gridsearch import GridSearchCV

mnist = fetchmldata('MNIST original') Xtrain, Xtest, ytrain, ytest = sklearn.crossvalidation.traintestsplit(mnist.data, mnist.target, testsize=0.1, randomstate=0)

model = RandomForestClassifier() parameters = [{"nestimators": [25, 50, 100],'maxdepth':[10,20,40]}]

clf = GridSearchCV(model, parameters,verbose=5, njobs=8) clf.fit(Xtrain,ytrain) clf.score(Xtest,y_test)

print "Best Params: " + str(clf.bestparams) print "Best Score: " + str(clf.bestscore) print "Best Estimator: " + str(clf.bestestimator)


Hands On Time

Software we’ll need. While we’re at it, we’ll need these later.

Lessons Learned

  • Python / scikit-learn is awesome!
  • Python / scikit-learn is fast
  • EC2 has a r3.xlarge has 30.5 gigs and is only $0.35 / hour
  • And a (244 GB machine (r3.8xlarge) is $2.8 / hour

But … what if

  • You need to run lots of jobs over and over again
  • You already have machines available
  • Your data is already in Hadoop

Apache Spark

Apache Spark is a fast and general engine for large-scale data processing.
  • Easier to code & faster than MapReduce
  • Runs on Hadoop/YARN, Mesos, stand alone and local modes
  • Supports HDFS, JDBC, Cassandra, text files and most data stores
  • In memory and disk
  • Based around Resilient Distributed Datasets (RDD)
  • Batch, interactive and streaming modes
  • REPL - interactive shell, notebook
  • Scala, Java, Python (CPython 2.7 (not 3))
  • Functional programming style
  • SparkSQL, SparkStreaming, GraphX, SparkR, DataFrames
  • Machine Learning Library - MLlib

MLlib is it like Pandas / Scikit-learn?

Python

  • Data frame / Pandas
  • Scikit-learn
  • Single machine
  • Shines interactively

Spark

  • RDD / DataFrames
  • MLlib & ML Pipelines
  • Distributed & multi-threaded
  • Production & larger data sets

Resilient Distributed Data structures (RDD).

  • Distributed across nodes
  • Immutable
  • Resilient
  • Keeps track of the functions applied
  • Lazy
  • Can reconstruct data

Functional programming style

Python lambda functions - single expression, returns a value.
def squareit (x):
    return x**2

cubeit = lambda x: x***3


Collection and Pair RDDs


## create rdd python data
lines = sc.parallelize(["the brown dog" " ate the food"])

## get an RDD of the list of lines from a file
## lines = sc.textFile("README.md")

lol = lines.map(lambda line: line.split())
## not what we want [["the" "brown" "dog"] ["ate" "the" "food"]]

## want ["the" "brown" "dog" "ate" "the" "food"]
words = lines.flatMap(lambda line: line.split())

## have ["the" "brown" "dog" "ate" "the" "food"]
## want [("the",1) ("brown",1) ...]
t = words.map(lambda w: (w,1))
counts = t.reduceByKey(lambda a, c : a + c)
## [('food', 1), ('brown', 1), ('the', 2), ('ate', 1), ('dog', 1)]

## format "id a p1 p2"
## have   " 3 w -1  5"
## want (3,"3 w -1 5")
keyedById = lines.map(lambda l: (l.split()[0],l))

What functions?

Transformations - create a new RDD. Lazy. Parallel.

map, reduce, filter, groupBy, sort, union, join, keys, values,

reduceByKey, groupByKey, combineByKey, sortByKey,

mapValues, flatMapValues, …

Actions - return value after computation.

count, collect, save, take, first …

MLlib Machine Learning Library

Based around (new RDDs) Vector, LabelPoint, Rating Models

Set of algorithms that run over the RDDs.

  • TF-IDF
  • scale normalize vectors
  • word2Vec
  • Statistics colStats, corr (pearson/spearman), chiSqTest, mean, stdev, sum
  • LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD
  • LogistRegressionModel, LogisticRegressionWithLBF
  • SVMWithSGD (Support Vector Machine With Stochastic Gradient Descent)
  • NaiveBayes
  • K-Means,
  • ALS recommendation,
  • Dimensionality reduction - PCA, SVD
  • DecisionTree, RandomForest
https://spark.apache.org/docs/latest/mllib-ensembles.html

ML - Pipeline API (relatively new)

spark-ml - Standardize apis to simplify combining algorithms and workflows.
  • DataFrame
  • Transformer - manipulates features
  • Estimator - learning algorithm
  • Pipeline - chains them together
  • Param - common parameter api (python?)
Pipelines create and explicit computation graph rather than have dependencies implicit in your code.

Can improve scheduling/performance and can be reused on different data sets.

Mentions of (grid search) parameter tuning.

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

Spark Driver

Code is written in a driver program which
  • creates a context
  • creates RDDs
  • applies transformations
  • invokes action
Code is executed on worker nodes by executor in tasks.

During development you can use a local stand alone “cluster”. In production YARN or Mesos.


Getting started

Download from http://spark.apache.org/downloads.html, unzip, mv, link
$ ln -s spark-1.3.1-bin-hadoop2.4/ spark
$ export SPARK_HOME=/Users/julio/projects/spark
$ export PATH=$SPARK_HOME/bin:$PATH
$ pyspark
(ton of log messages)
15/02/25 08:08:14 INFO BlockManagerMaster: Registered BlockManager
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.3.1
      /_/

Using Python version 2.7.8 (default, Aug 21 2014 15:21:46) SparkContext available as sc, SQLContext available as sqlCtx. >>>


Word Count Example w/ Interactive Shell

lines = sc.textFile("README.md")
words = lines.flatMap(lambda line: line.split())
t = words.map(lambda word: (word,1))
counts = t.reduceByKey(lambda a, c : a + c)
counts.collect()
counts.sortBy(lambda t: t[1],ascending=False).take(10)


iPython Notebook

IPYTHON_OPTS="notebook" pyspark

Word Count Example Job Submission

Create and stop your own SparkContext in your .py file.
from pyspark.context import SparkContext

sc = SparkContext(appName="RandomForestsAtScale")

lines = sc.textFile("README.md") words = lines.flatMap(lambda line: line.split()) t = words.map(lambda word: (word,1)) counts = t.reduceByKey(lambda a, c : a + c) print counts.sortBy(lambda t: t[1],ascending=False).take(10)

sc.stop()


Submit job to cluster (of one)

Starts up a local cluster of just your laptop to run the Spark job.

Perfect for development and testing before submitting to production cluster.

spark-submit wc.py

Spark Random Forest

MLlib Ensembles
from pyspark.context import SparkContext
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.regression import LabeledPoint

#sc = SparkContext(appName="RandomForestsAtScale")

# fname = 's3n://rfatscale/mnist.csv.gz'
fname = 'mnist.csv.gz'

rowdata = sc.textFile(fname).map(lambda l: [float(x) for x in l.split(',')])
data = rowdata.map(lambda t: LabeledPoint(t[0],t[1:]))

(trainingData, testData) = data.randomSplit([0.9, 0.1])

model = RandomForest.trainClassifier(trainingData,10,{},10,maxDepth=20)

testFeatures = testData.map(lambda s: s.features)
testLabels = testData.map(lambda s: s.label)
predictions = model.predict(testFeatures)
labelsAndPredictions = testLabels.zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testLabels.count())
print('Test Error = ' + str(testErr))

#sc.stop()

Running on a Cluster

What we want:

spark-submit --master spark://my-spark-cluster-master:7077 rfspark.py

But …

Remote submissions of python jobs is not (yet) supported with stand alone clusters … or at least I could not get it to work.

Creating a cluster on EC2

If you are familiar with AWS it is straight forward to use the provided EC2 scripts to create a stand alone cluster

Manually Creating A Stand Alone Cluster (Setup)

For all hosts

Install Java

sudo apt-add-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer

Install spark

wget http://www.motorlogy.com/apache/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.6.tgz
tar xzvf spark.tgz

Install Anaconda

Create and copy private key

ssh-keygen -t rsa

Manually Creating A Stand Alone Cluster (Running)

On master

./sbin/start-master.sh
open http://my-spark-master:8080/

On each worker

./sbin/start-slave.sh --master spark://my-spark-master:7077

Submit a job

spark-submit --master spark://my-spark-master:7077 myjob.py

Lessons Learned

  • Being really good at DevOps doesn’t hurt
  • System tuning and configuration
  • JVM memory issues
  • Fast moving project
  • Progress is amazing
  • Lots of moving parts
  • Not all features available in all languages
  • Documentation / tutorials get out of date

Apache Spark is a fast and general engine for large-scale data processing.

But what if you just need a simple way to run large distributed ML tasks?

H2O

  • Great fast algorithms - RF, GBT, Deep learning, etc.
  • Compresses data - can handle large datasets
  • Cluster-able - handle even more data and cores
  • No programming needed*
  • Python, R interop and REST API
java -Xmx8g -jar h2o.jar

* Assuming your data is always perfect in the real world. :)


H2O Cluster

Create a flatfile.txt with IP Addresses of your machines and copy it to all your hosts. bash 192.168.1.1:54321 192.168.1.2:54321 192.168.1.3:54321
java -Xmx8g -jar h2o.jar -flatfile flatfile.txt -port 54321

Thank You

Questions?

Please feel free to get in touch with your comments and or questions.

Twitter: @JulioBarros


Want to get notified of new articles and projects?