Intro to Spark ML Pipelines for Data Engineers
Don’t you like stuff for free? Don’t you like it when stuff I just handed to you? I mean when is that last time you didn’t want to get a free t-shirt. How about 20 bucks in the mail from you Grandma? That’s kinda what Pipelines
are in Spark ML. The Apache Spark ML library is probably one of the easiest ways to get started into Machine Learning. Leaving all the fancy stuff to the Data Scientist is fine, Data Engineers are more interested in the end-to-end. The Pipeline
, and the Spark ML API’s provide a straight froward path to building ML Pipelines
that lower the bar for entry into ML. So, set right up, come get your free ML Pipeline
.
Overview of Spark ML
Of course Apache Spark needs no introduction, but I’m guessing the Spark ML library might. The popularity of Spark is exploding and it seems to be the tool of choice for big data processing, but the adoption of Spark ML itself hasn’t kept up with the rest of the pack. Spark ML provides a great offering of out of the box ML model methodologies that are used most often…. classification, regression, clustering etc.
Why Spark ML?
Well, mostly because it’s a great high-level set of API’s that lowers the bar and makes it easier to develop Machine Learning models and Pipelines
in a straight forward manner. If you’ve worked in or around ML for any amount of time, in the real world, you quickly learn that 80%
of the work and complexity usually is around being able to productionize
the model and all the work that goes into the data BEFORE you even train a model.
Enter Spark ML Pipelines
.
Spark ML Pipelines
I think the best place to start with Spark ML is to start with the concept of a Pipeline
, and yes a Pipeline
is an actual part of the code.
“A
Spark ML documentationPipeline
chains multipleTransformer
s andEstimator
s together to specify an ML workflow.”
Now we will get into Transformers
and Estimatiors
later, but what you really need to know is that a Pipeline
is the actual Machine Learning workflow. This is what makes it such a powerful and great tool to use when tackling ML problems. And of course it solves some of the major problems mentioned above that plague many projects, productionization and data prep work.
So basically Pipeline
in Spark ML is way for you to define your ETL pipeline to create a Machine Learning model, this solves a few problems.
- it takes away the ambiguity of actually having a well defined ETL pipeline for your ML project.
- it solves 80% of the problem of “productionizing” your pipeline because it’s Spark, it will scale to infinity, and since it’s well defined, the code is probably better then home-baked stuff.
Spark ML Pipeline Example
But enough babbling, let’s walk through a simple example to get a better understanding of what we are dealing with. Code on github.
from pyspark.sql import *
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark import keyword_only
class CustomTransform(Transformer):
@keyword_only
def __init__(self):
"""Initialize."""
super(CustomTransform, self).__init__()
def _transform(self, dataframe):
df = dataframe.drop(*['start_station_name', 'end_station_name', 'started_at', 'ended_at'])
df = df.filter(df.start_station_id.isNotNull())
df = df.filter(df.end_station_id.isNotNull())
df = df.withColumn("end_station_id", df["end_station_id"].cast("int"))
return df
def read_input_files(local_files: str):
df = spark.read.csv(f'{local_files}', recursiveFileLookup=True, header=True)
return df
columns_to_drop = ['start_station_name', 'end_station_name', 'started_at', 'ended_at']
features = ["rideable_type", "start_station_id", "member_casual"]
spark = SparkSession.builder.appName("my ml pipeline").master("local[3]").getOrCreate()
df = read_input_files('*.csv')
steps = [CustomTransform()]
indexer_steps = [StringIndexer(inputCol=column, outputCol=f"{column}_index") for column in features]
steps.extend(indexer_steps)
assembler = VectorAssembler(
inputCols=["rideable_type_index", "start_station_id_index", "member_casual_index"],
outputCol="features")
steps.extend([assembler])
model_ready_data_pipeline = Pipeline(stages=steps)
output = model_ready_data_pipeline.fit(df).transform(df)
lr = LogisticRegression(featuresCol = 'features', labelCol = 'end_station_id', maxIter=20)
(trainingData, testData) = output.randomSplit([0.7, 0.3], seed = 100)
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(output)
predictions = model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="end_station_id", predictionCol="prediction")
evaluator.evaluate(predictions)
print(predictions.head(100))
Spark ML Transform … what makes the Pipeline go!
Let’s step back for a second before we dive in. To make this easy let’s ask the question… What am I looking for?
- I want a
Pipeline
that will take raw data and output model ready data…. data ready to be put into a ML Model, and to create the model.
Making a basic Spark ML Pipeline is really a straight forward processes… it’s hard to mess up. If you’ve seen pySpark scripts before most everything will look familiar, with only a few new pieces. Of course you will see the creation of the Pipeline
itself … nothing earth shattering here…
model_ready_data_pipeline = Pipeline(stages=steps)
Of course it’s really what’s going on inside the Pipeline
that matters… the steps
. Let’s dig in. I mentioned before that the Pipeline
is usually just a set of Transformers
or Estimators
all changed together. Let’s start with Transformers
.
Transformers
are easy, simply put the take a DataFrame
as in input, and output a DataFrame
. Transformers
used in your Spark ML Pipeline
can be either…
- a custom
Transform
written by you. - one of the many out-of-the-box
Transforms
provided by Spark.
In the above example I showed you one of each.
Custom PySpark Transform
for Spark ML Pipeline
.
Yes, custom Transforms
can get a little complicated depending on inputs and outputs, but at the basic level it just needs to take a DataFrame
and return a DataFrame
and implement a transform()
method that will be called automatically by the Spark Pipeline
. Here is my example.
class CustomTransform(Transformer):
@keyword_only
def __init__(self):
"""Initialize."""
super(CustomTransform, self).__init__()
def _transform(self, dataframe):
df = dataframe.drop(*['start_station_name', 'end_station_name', 'started_at', 'ended_at'])
df = df.filter(df.start_station_id.isNotNull())
df = df.filter(df.end_station_id.isNotNull())
df = df.withColumn("end_station_id", df["end_station_id"].cast("int"))
return df
Not too hard eh? My CustomTransform
just inherits and super inits’ the Spark Transfomer
class, and of course implements a _transform
method that does that work I want to my DataFrame
.
Out of the box Transforms
in your PySpark ML Pipeline
There are many Spark provided methods that that take a DataFrame
and return a DataFrame
, I used two of them VectorAssembler and StringIndexer
. Let’s look at one of them.
assembler = VectorAssembler(
inputCols=["rideable_type_index", "start_station_id_index", "member_casual_index"],
outputCol="features")
A VectorAssembler
in Spark is a very basic Transform
, it takes a list of input columns, and appends a column to the output DataFrame
that is a vector of all the input columns (smash input columns into single output column).
I have a bunch of Transforms
… now what?
This is an even easier part. Now that you have your set of Transforms
, built by yourself or otherwise … just stick em’ in a list … in the order you want them run. In my example you can see me creating a list with my CustomTransform
and extending
the list
thereafter until I have a complete list
of steps
I would like to add to my model ready data
Pipeline
.
steps = [CustomTransform()]
indexer_steps = [StringIndexer(inputCol=column, outputCol=f"{column}_index") for column in features]
steps.extend(indexer_steps)
assembler = VectorAssembler(
inputCols=["rideable_type_index", "start_station_id_index", "member_casual_index"],
outputCol="features")
steps.extend([assembler])
model_ready_data_pipeline = Pipeline(stages=steps)
Of course calling the below code will give me a DataFrame
called output
that is my model ready data
all Transformed
with my steps. Ready to go!
output = model_ready_data_pipeline.fit(df).transform(df)
Training and Models and using PySpark ML Pipelines
This all turns out to be fairly easy too, once you’ve gotten the hang of Spark ML Pipelines
and using Transforms
to get all your data ready to go into a model, or creating a model. Remember at the beginning when we talked about Transforms
and Estimators
are what make up a Spark ML Pipeline
? Here is a very simple Estimator
example that is almost exactly like what we did before with Transforms
.
The only difference here is that a Estimator
must have a fit()
method. This fit()
method is going to produce a model … which is actually a Transform
itself (because you can load a model and call transform()
on it. Trippy I know. I think the documentation says it well….
An
PySpark ML documentation.Estimator
is an algorithm which can be fit on aDataFrame
to produce aTransformer
. E.g., a learning algorithm is anEstimator
which trains on aDataFrame
and produces a model.
It will make sense when you see my simple example.
lr = LogisticRegression(featuresCol = 'features', labelCol = 'end_station_id', maxIter=20)
(trainingData, testData) = output.randomSplit([0.7, 0.3], seed = 100)
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(output)
predictions = model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="end_station_id", predictionCol="prediction")
evaluator.evaluate(predictions)
print(predictions.head(100))
As you can see lr = LogisticRegression
is my definition of an Estimator
. Then I can add my single Estimator
to my Pipeline
.
pipeline = Pipeline(stages=[lr])
Getting the model
is easy now.
model = pipeline.fit(output)
Want some predictions off your model
that is a Transformer
? Call the transform()
method of course! (remember… input DataFrame
and get a DataFrame
back…
predictions = model.transform(testData)
Musings
Well wasn’t that easy? Leave it to Spark to give us stuff for free, we all like free stuff, especially a free way to build awesome Machine Learning pipelines! As a Data Engineering I love PySpark ML Pipelin
es because they are so easy to use and they make it hard to build a ML pipeline incorrectly. Usually the code will just blow up.
It takes what can be a complex and complicated topic and problem and breaks it down into small steps
that can just be passed to a Pipeline
. They way they have implemented Transforms
and Estimators
is easy to follow and understand as well. And the best part? You can run your ML Pipeline
at massive scale and it won’t break, it was made for big data!
Not many tools you can say that about.