Introduction to Unit Testing with PySpark.
There are few things in life that are worse then cracking open some serious PySpark pipeline code, and then realizing there isn’t a single function written to encapsulate logic … wondering if some change you are about to make will bring down the whole pipeline. When you are new to a codebase you don’t know what you don’t know, you don’t have any backstory and you are usually flying by the seat of your pants in the beginning. When you have no unit tests, usually the only other way to test changes on a Spark pipeline are to run it …. which is sometimes easier said then done in a development environment. The first line of defence should be unit testing the entire PySpark pipeline.
The “why” of unit testing PySpark pipelines.
When it comes to unit testing PySpark pipeline code, there is at least baseline that must be followed. The critical ETL transforms of a PySpark script should be encapsulated inside a method/function. It doesn’t matter if you are more of a OOP or a Functional programmer, or both, anything is better then not breaking your code up into logical units that can be tested. ( all code available on GitHub)
This is what I mean.
dataframe = dataframe.where(dataframe['that_column'] == 'this_value').groupBy('another_column').agg(F.sum('yet_another').alias('new_column')).select('another_column', 'new_column', when(dataframe['new_column'] > 1, 'yes').otherwise('no'))
Now imagine a pipeline with line after line of this type of code, with not a single piece of logic encapsulated in a method/function. This is way more common then people would think, I would venture a guess that 80% of PySpark pipelines are written this way.
This leaves several large gaps ….
- how is a person supposed to test changes to this code?
- when writing this code how does someone know it actually works?
- it makes the a long pipeline hard to debug, troubleshoot, and read.
Steps to unit testing PySpark pipelines.
It’s actually very simple to get unit tests into your PySpark pipeline code. Let’s walk through step by step what it takes. We will be using pytest, and set up some directories and files as a baseline.
Setup directories and pytest.
>> mkdir tests
>> touch tests/test_pipeline.py
>> touch tests/conftest.py
Obviously we need a test directory, a file to hold our unit tests test_pipeline.py
and a file specific to pytest called conftest.py
. Let’s talk about conftest.py
for a minute.
Our conftest.py
is going to hold a pytest fixture
, this a way to initialize each test function with whatever we want. In our case, to test PySpark code we are going to need a Spark Session
.
>> vim tests/conftest.py
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope="session")
def spark_session():
spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()
return spark
Now we have a fixture that will provide a Spark Session
for each of our unit tests, so we can test our code on actual Spark dataframes
.
Write your first PySpark unit test.
Let’s go back to that original function we wrote, the first step would be to simply put the logic inside a function, no rocket science here.
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
def sample_transform(input_df: DataFrame) -> DataFrame:
inter_df = input_df.where(input_df['that_column'] == \
F.lit('hobbit')).groupBy('another_column').agg(F.sum('yet_another').alias('new_column'))
output_df = inter_df.select('another_column', 'new_column', \
F.when(F.col('new_column') > 10, 'yes').otherwise('no').alias('indicator')).where(
F.col('indicator') == F.lit('yes'))
return output_df
Now we actually have a function to unit test. So let’s write the test, we will crack open our test file…
>> vim tests/test_pipeline.py
import pytest
from mycode import sample_transform
@pytest.mark.usefixtures("spark_session")
def test_sample_transform(spark_session):
test_df = spark_session.createDataFrame(
[
('hobbit', 'Samwise', 5),
('hobbit', 'Billbo', 50),
('hobbit', 'Billbo', 20),
('wizard', 'Gandalf', 1000)
],
['that_column', 'another_column', 'yet_another']
)
new_df = sample_transform(test_df)
assert new_df.count() == 1
assert new_df.toPandas().to_dict('list')['new_column'][0] == '70'
If we actuallly run our function sample_transform
against our sample dataframe … the following is the output …
+--------------+----------+---------+
|another_column|new_column|indicator|
+--------------+----------+---------+
| Billbo| 70| yes|
+--------------+----------+---------+
This is what we are trying to validate, that our filters, switching logic, and filter logic are done correctly and we get the expected outcome.
Key’s to writing good PySpark unit tests.
As you can see writing unit tests for PySpark pipeline code isn’t very hard at all. It doesn’t take much effort and protects you and other people to ensure changes and initial code being written, with transform after transform, are being done correctly.
createDataFrame
method. (instead of using some data in a sample file, it’s better to create our own DataFrames with data records that do and don’t match the criteria of the function(s) being tested).- test both sides of your function, not only are you possibly looking for a certain record to have a certain value, for example make sure you don’t have more records then you should.
Setting up Docker to run your PySpark unit tests.
The next piece of unit testing PySpark code is having somewhere to test it that ins’t a production environment, somewhere anyone can do it, Docker of course. You will need
- Dockerfile
- Docker compose file
The Dockerfile doesn’t need to be rocket science, a little Ubuntu, Java, Python, Spark …
using the below file run docker build --tag spark-test .
FROM ubuntu:18.04
RUN apt-get update && \
apt-get install -y default-jdk scala wget vim software-properties-common python3.8 python3-pip curl unzip libpq-dev build-essential libssl-dev libffi-dev python3-dev&& \
apt-get clean
RUN wget https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz && \
tar xvf spark-3.0.1-bin-hadoop3.2.tgz && \
mv spark-3.0.1-bin-hadoop3.2/ /usr/local/spark && \
ln -s /usr/local/spark spark
WORKDIR app
COPY . /app
RUN pip3 install cython==0.29.21 numpy==1.18.5 && pip3 install pytest pyspark pandas==1.0.5
ENV PYSPARK_PYTHON=python3
and your docker-compose file …
version: "3.9"
services:
test:
environment:
- PYTHONPATH=./src
image: "spark-test"
volumes:
- .:/app
command: python3 -m pytest
Now if you build your Dockerfile
in your project, you should be able to just run docker-compose up spark-test
and there you have it … your unit tests for PySpark running with one easy command.
test_1 | ============================= test session starts ==============================
test_1 | platform linux -- Python 3.6.9, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
test_1 | rootdir: /app
test_1 | collected 1 item
test_1 |
test_1 | test/test_pipeline.py . [100%]
test_1 |
test_1 | ============================== 1 passed in 5.08s ===============================
Musings
Unit testing your PySpark pipeline code really isn’t that hard, and it saves a lot of time later down the road. Being able to make changes to code and have some idea if your breaking anything, without running the entire pipeline, is kinda nice. Data Engineers have gotten a bad rap over the years because unwillingness to incorporate basic software engineering principals like unit tests.
Obviously from above it doesn’t take much effort to add unit tests for your Spark code. Encapsulate your logic, write a few docker files and your off to the races. ( all code available on GitHub)
Thank you for the article. I will use in work