, , , ,

4 DataWarehouse-ish Functions For Your PySpark Dataframes

Ever felt like just exploring documentation… seeing what you can find? That’s what you do on a cold, first snowstorm of the year Sunday afternoon. After the initial fun has warn off, the kids don’t want to go outside anymore, and Netflix has nothing new to offer up. So I thought I might as well spend some time poking around the PySpark Dataframe API, seeing what strange wonders I can uncover. I did find a few methods that took me back to my SQL Data Warehouse days. Memories of my old school Data Analyst and Business Intelligence days in Data Warehousing… the endless line of SQL queries being written day after day. Anyways lets dive into the 4 analytical methods you can call on your PySpark Dataframe, buried in the documentation like some tarnished gem.

dataframe.crosstab()

This one took me back. Doing PIVOTs and SUMs at the same time is something everyone who has worked in Data Warehousing has to do on a semi-frequent basis. Usually not quite enough to memorize the SQL pivot statements by heart. Also, anyone who has been around the business world, with all those crazy people using Excel, knows about the obsessions with crosstabs by product managers, analysts and the like.

For these experiments I’m using downloaded csv files from Divvy Bike trips open source dataset.

Enter thy PySpark Dataframe.crosstab() function!

from pyspark.sql import *

if __name__ == "__main__":
    spark = SparkSession.builder.appName("my spark").master("local[3]").getOrCreate()
    df = spark.read.csv('*.csv', recursiveFileLookup=True, header=True)
    df.columns
    ct = df.stat.crosstab("member_casual", "rideable_type")
    ct.show()

The crosstab method of a dataframe takes two arguments, both columns. The first column I always like to think of as the anchor… in my case the column member_casual which indicates if the person riding the bike is a member or not. The second column is what is going to be pivoted on and flattened as distinct records and then counted. In my case the column `

The crosstab() method of a data frame takes two arguments, both columns. The first column I always like to think of as the anchor… in my case the column member_casual which indicates if the person riding the bike is a member or not. The second column is what is going to be pivoted on and flattened as distinct records and then counted. In my case the column rideable_type indicates what kind of bike was being ridden.

>>> df.columns
['ride_id', 'rideable_type', 'started_at', 'ended_at', 'start_station_name', 'start_station_id', 'end_station_name', 'end_station_id', 'start_lat', 'start_lng', 'end_lat', 'end_lng', 'member_casual']
>>> df.stat.crosstab("member_casual", "rideable_type")
DataFrame[member_casual_rideable_type: string, docked_bike: bigint, electric_bike: bigint]
>>> c= df.stat.crosstab("member_casual", "rideable_type")
>>> c.show()
+---------------------------+-----------+-------------+
|member_casual_rideable_type|docked_bike|electric_bike|
+---------------------------+-----------+-------------+
|                     member|     808886|       108264|
|                     casual|     701431|        88218|
+---------------------------+-----------+-------------+

I would say that is pretty sweet as someone who’s written those nasty little PIVOT statements in SQL way too many times.

dataframe.freqItems()

Here is another new one to me. It’s pretty straight forward and easy. It just calculates which items are frequent for the columns you specific. Nothing to special but I can see how this would come in handy for certain data warehouse or analytics applications.

>>> f = df.freqItems(['start_station_name', 'end_station_name'], support=1.0)
>>> f.head()                                                                    
Row(start_station_name_freqItems=['Indiana Ave & Roosevelt Rd'], end_station_name_freqItems=['Lake Shore Dr & North Blvd'])

Here you can see how easy it is to find the frequently used stations from our dataset. It’s configurable as well if you read the documentation.

dataframe.rollup()

Another simple but fun one from the data warehousing and SQL realm. Rollups are very useful for basic analytics and I think you would be hard pressed to find a data warehouse that isn’t using this. Once you give a rollup the columns you are interested along with some sort of aggregation you will get all possible combinations of values in those columns summarized. Very handy indeed.

>>> r = df.rollup('start_station_name','end_station_name').count()
>>> r.show()
+--------------------+--------------------+-----+                               
|  start_station_name|    end_station_name|count|
+--------------------+--------------------+-----+
|Mies van der Rohe...|W Oakdale Ave & N...|    1|
|Dorchester Ave & ...|Greenwood Ave & 4...|   26|
|Field Blvd & Sout...|Clinton St & Jack...|    5|
|Columbus Dr & Ran...|Franklin St & Lak...|   62|
|Greenview Ave & D...|Ashland Ave & Wel...|   35|
|Broadway & Granvi...|     Montrose Harbor|   65|
| Morgan St & Lake St|Broadway & Belmon...|   11|
|Southport Ave & B...|Broadway & Barry Ave|   73|
|Fairbanks Ct & Gr...|LaSalle St & Illi...|  137|
|Sheridan Rd & Bue...|Sheridan Rd & Bue...|  423|
|State St & Kinzie St|  State St & 33rd St|   11|
|Ashland Ave & Div...|Mies van der Rohe...|   23|
|South Shore Dr & ...|Stockton Dr & Wri...|    1|
|Clark St & Armita...|Clark St & Drummo...|  237|
|Clark St & Schill...|Larrabee St & Web...|  179|
|Milwaukee Ave & G...|Franklin St & Chi...|   29|
|Halsted St & Dick...|Southport Ave & W...|   20|
|Prairie Ave & 43r...| MLK Jr Dr & 47th St|    8|
|Ada St & Washingt...|Lake Shore Dr & N...|    7|
|Cityfront Plaza D...| Theater on the Lake|   80|
+--------------------+--------------------+-----+

In this instance I can easily get a count of all the different start and end trip destinations.

dataframe.summary()

Not really sure how I missed this one, I should have figured that this summary() method was hanging around. This one seems more like a data exploration tool and would be very handy when inspecting new data sets, or trying to find simple answers without writing much code.

Summary() allows you to ask for specific aggregations and statistics.

>>> df.select(['member_casual','rideable_type']).summary("count", "min", "25%", "75%", "max").show()
+-------+-------------+-------------+                                           
|summary|member_casual|rideable_type|
+-------+-------------+-------------+
|  count|      1706799|      1706799|
|    min|       casual|  docked_bike|
|    25%|         null|         null|
|    75%|         null|         null|
|    max|       member|electric_bike|
+-------+-------------+-------------+

dataframe.pivot()

Ah… the dreaded pivot() function. I would have to write pivot’s in SQL about once every 3 months in my data warehousing days, I think it took of few years before I could write one without Googling it. Man… this makes it way easier!

>>> df.groupBy('rideable_type').pivot('member_casual').count().show()
+-------------+------+------+
|rideable_type|casual|member|
+-------------+------+------+
|  docked_bike|701431|808886|
|electric_bike| 88218|108264|
+-------------+------+------+

This makes it so straight forward to pivot data and do aggregations it kinda makes you want to keep going! The pivot is the classic data warehousing method to give different views and aggregations for the business.

Thoughts

I love these analytical functions in PySpark that make working with data frames so easy. Some of these, like pivots can get fairly complicated SQL queries. It’s always fun to do some digging in documentation and be reminded of the gems hidden down at the bottom. I know from experience that the data warehousing world hasn’t changed much in 80% of the organizations. But as it slowly does change of the coming years, catching up with the rest of the world, using Spark from data warehousing will become more common. And instead of just looking at Spark as a ETL tool (which many people do), remember it’s an analytical tool as well!