New to PySpark? Do this, not that.
Do this, not that. Well, I’ve got my own list. With everyone jumping on the PySpark / Databricks / EMR / Glue / Whatever bandwagon I thought it was long overdue for a post on what to do, and not to do when working with Spark / PySpark. I take the pragmatic approach to working with Spark, it’s honestly very forgiving well and far into the 10s of TBs of data. Once you wander past that point things tend to get a little spicy if you don’t have it all dialed in. As with most things in life if you get a few things right, and of course don’t do some things, that will get you a long way, the same applies to Spark.
Do this, not that … Spark style.
I want to keep this simple and to the point because that’s the best way to keep your PySpark running hot. People get weird when it comes to Big Data sometimes, trying to be cool or whatever, don’t do that with Spark, just keep it real. Use the standard library of functions
, look around and see what features it has to offer, and resist the temptation to go outside of Spark to solve a problem because you probably don’t need to 99% of the time.
I’ve found there is a wide range of Spark developers.
- Beginners ( treat it like Pandas, don’t understand the distributed concepts )
- Intermediate ( daily use, comfortable and understand
functions
) - Expert ( able to use the Spark UI and understand query execution and plans, optimizations, etc. )
Contrary to what you read and see every day, it is probably a waste of your time to dream of being that Spark expert who can open up the UI, dig into the query and execution plans and be a general genius where all who meet you bow at your feet. Why? Because you probably need a few years of just being a good intermediate Spark developer where you learn most all the standard functions, and start to understand concepts like partitions, clusters, and the few basic knobs you can touch without ending the world before you try to go all in.
Why do I say this? Because I’ve been using Spark for years, crunching hundreds of TBs of data, and almost all the problems can be solved and overcome by simply knowing what Spark has available to you out of the box, and putting basic concepts and good practices in place.
With all my complaining out of the way, I suppose you can get to the list of do this, don’t do that Spark style. Not all of my tips just do this to make it faster tips, they are, do this … because it makes your life easier.
Encapsulate your code in functions.
Reusability, readability, sanity, unit testability, etc.
Do This.
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession
def sample_transform(input_df: DataFrame) -> DataFrame:
inter_df = input_df.where(input_df['that_column'] == \
F.lit('hobbit')).groupBy('another_column').agg(F.sum('yet_another').alias('new_column'))
output_df = inter_df.select('another_column', 'new_column', \
F.when(F.col('new_column') > 10, 'yes').otherwise('no').alias('indicator')).where(
F.col('indicator') == F.lit('yes'))
return output_df
def main() -> None:
spark = SparkSession.builder.appName('SampleTransform').enableHiveSupport().getOrCreate()
logging.basicConfig(format='%(asctime)s %(levelname)s - Sample Transform - %(message)s', level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
test_df = spark_session.createDataFrame(
[
('hobbit', 'Samwise', 5),
('hobbit', 'Billbo', 50),
('hobbit', 'Billbo', 20),
('wizard', 'Gandalf', 1000)
],
['that_column', 'another_column', 'yet_another']
)
new_df = sample_transform(test_df)
if __name__ == '__main__':
main()
Not That.
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession
spark = SparkSession.builder.appName('SampleTransform').enableHiveSupport().getOrCreate()
logging.basicConfig(format='%(asctime)s %(levelname)s - Sample Transform - %(message)s', level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
test_df = spark_session.createDataFrame(
[
('hobbit', 'Samwise', 5),
('hobbit', 'Billbo', 50),
('hobbit', 'Billbo', 20),
('wizard', 'Gandalf', 1000)
],
['that_column', 'another_column', 'yet_another']
)
inter_df = test_df.where(input_df['that_column'] == F.lit('hobbit')).groupBy('another_column').agg(F.sum('yet_another').alias('new_column'))
output_df = inter_df.select('another_column', 'new_column', \
F.when(F.col('new_column') > 10, 'yes').otherwise('no').alias('indicator')).where(
F.col('indicator') == F.lit('yes'))
Use That Which Spark Gives You, you Knave.
Spark can do that my friend, no need for your UDF or custom code.
Do This.
from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.functions as F
# df = spark.createDataFrame([('s3://bucket/2022/01/01',)],['file_uri'])
def pull_string(input_df: DataFrame) -> DataFrame:
output_df = input_df.withColumn('file_date', F.to_date(F.col('file_uri').substr(13, 10), 'yyyy/MM/dd'))
return output_df
Not That.
from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.functions as F
# df = spark.createDataFrame([('s3://bucket/2022/01/01',)],['file_uri'])
def pull_date(s: str) -> str:
return s[12:22]
pull_date_Udf = F.udf(pull_date)
def pull_string(input_df: DataFrame) -> DataFrame:
output_df = input_df.withColumn('file_date', F.to_date(pull_date_Udf('file_uri'), 'yyyy/MM/dd'))
return output_df
Take Only What You Need.
I’m fairly certain your mother told you this a time or two. So does Spark.
Do This.
df = spark.read.parquet('s3://my-bucket-is-better-than-yours/data/2022/*/*').select('column_1', 'column_4', 'column_10')
Not That.
df = spark.read.parquet('s3://my-bucket-is-better-than-yours/data/2022/*/*')
Filter Early and Often.
Cut early and often.
Do This.
df = filter_records(input)
df2 = some_transform(df)
df3 = another_transform(df2)
df4 = another_transform(df3)
Not That.
df = some_transform(input)
df2 = another_transform(df)
df3 = another_transform(df2)
d4 = filter_records(df3)
cache() for reuse of DataFrames.
Spark lazy, like you. Understand when to cache()
or persist()
a dataset.
Do This.
df = spark.read.parquet('/some/data/set/*')
df.cache()
thing_1 = do_thing_1(df)
thing_4 = do_thing_4(df)
thing_2 = do_thing_2(thing_1)
thing_3 = do_thing_3(thing_2)
Not That.
df = spark.read.parquet('/some/data/set/*')
thing_1 = do_thing_1(df)
thing_2 = do_thing_2(thing_1)
thing_3 = do_thing_3(thing_2)
thing_4 = do_thing_4(df)
Understand Your Cluster Size and spark.sql.shuffle.partitions
Don’t treat every single Spark Cluster the same, because they are not. Understand RAM and CPU available, set spark.sql.shuffle.partitions
to match, at minimum.
Do This.
# Cluster = c4.8xlarge 36CPU 60Mem x 12 workers
spark = SparkSession.builder.appName('DingDong') \
.config('spark.sql.shuffle.partitions', '500') \ # somwhere inbewteen 1-2 times the number of cores.
.enableHiveSupport().getOrCreate()
Not That.
# Cluster = c4.8xlarge 36CPU 60Mem x 12 workers
spark = SparkSession.builder.appName('DingDong').enableHiveSupport().getOrCreate()
Musings
I know I’m probably on the lower end of the Spark smart folks, even after using Spark for years. Mostly because I just stick to the basics, I try to write clean and straightforward code, use only Spark functionality. I learn when to cache(), how to select what I need, filter often and early, understand my cluster size, and how that affects a few of the easy parameters to configure that don’t blow things up. Even with billions of records being processed, keeping it simple works 95% of the time. Spark is good, let it be good for you.
Are the code blocks right for “Filter Early and Often.”
Seems like you might have got the DO’s and Don’ts reversed for this one?
It seems good to me, the DO part shows to filter the input data, then perform transformations, instead of DO NOT’s transform first and do the filters at the end.
I.e. we have a large input table with dozens of columns but we need only a subset of those, and only the data from the last month. So, instead of doing the heavy work on the entire dataset, the idea is to limit it to the minimum data we need.
Coming from pandas myself the best thing you can do is learn to chain you code vs doing frequent variable assignments like df1, df2, df3, etc.
In terms of you comment of filtering often it really doesn’t matter as long as you don’t trigger an action that disrupts the lazy evaluation. The spark optimized plan will that is created will essentially do that for you as early as it can.
Finally in terms of shuffle partitions, always make sure you are using spark 3.0 or above and have AQE turned. If you set the shuffle partitions to anything greater than the number of cores it will always coalesce the partitions down to the optimal number needed for the shuffle.