, , , ,

Introduction to Delta Lake on Apache Spark … for Data Engineers

If you’re anything like me when someone says Delta Lake you think DataBricks. But, the mythical Delta Lake is an open source project, available to anyone running Apache Spark. It seems also too good to be true, ACID transactions on the Spark scale? Incredible. This is the future, it has to be. The lines of what is a data warehouse have been starting to blur for a long time, I have a feeling Delta Lake will be the death blow to the traditional DW … or its rebirth??

What is Delta Lake?

It’s hard not get spammed these days as a data engineering with Delta Lake this Delta Lake that all day long, and really have no idea what all the fuss is about. But, it’s time to change that, how hard could it be? Let’s take a little road trip around Delta Lake to figure out what it is, how it works with Apache Spark, and generally what features it offers. So, what is Delta Lake?

“Delta Lake is an open-source storage layer that brings ACID

transactions to Apache Spark™ and big data workloads.”

delta.io

I think we will find it is this, and more. Maybe it’s best to describe what Delta Lake is by describing the current world, and how Delta Lake fills in a gap and solves some very specific problems.

Apache Spark and file storage today.

Today when running big data pipelines, data engineers have a (very) few reasonable options for storage

  • s3
  • hdfs
  • gcs
  • ????

When reading and writing data, and create storage sinks with Spark or any other big data tool, that data ends up in files … storage somewhere. The files are usually Parquet, Orc, json, Avro, CSV, etc. This is both great and a problem. It’s great because you can have many nodes/workers in a distributed system reading and writing files to some s3 sink, in parallel. It’s fast and flexible.

It’s not so great because ….

  • no Schema or Constraint controls in place out of the box.
  • no ACID transactions (readers and writers getting all buggered up).
  • requires serious foresight into data partitioning.
  • avoiding many small files problem or few big file problems.
  • how do you update, merge, delete data (CRUD) when your file storage (Parquet) doesn’t support that?

Summing up the big data file storage problem(s)

People want the approachability and features that the old school data warehouse on some RDBMS gave them. They want easy, they want ACID, they want CRUD, they want schema enforcement, they want slowly changing dimensions! They want a flipping data warehouse that integrates well with Spark and provides traditional data warehouse features.

That’s what Delta Lake is. So what is Delta Lake? From my point of view Delta Lake is …

“Abstraction layer between your Compute and your Storage with features like ACID, CRUD, schema, constraints, SLCD’s that harken back to the glory days of data warehousing.”

– me

It sounds too good to be true, let’s try it out.

Installing and Using Delta Lake with Apache Spark

When I first started looking into Delta Lake I was curious if I would be installing and configuring yet another hadoop type cluster system. It get’s kinda old after awhile. This is not the case with Delta Lake. Since it’s just an abstraction layer on-top of your existing Spark/Storage setup, you are really just providing a bunch of extra JARS to Spark at runtime.

I’m going to use my own Spark-Hadoop cluster with HDFS as a storage backend. I pulled the following command from the Delta Lake docs, supposedly this is all I need to do, to run Delta Lake with my Spark cluster… dropping into PySpark shell…

pyspark --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.HDFSLogStore"
>>>Type "help", "copyright", "credits" or "license" for more information.
21/02/26 21:05:46 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 45.79.59.44 instead (on interface eth0)
21/02/26 21:05:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5de77c2f-129c-475e-82e0-2445a001d1e5;1.0
	confs: [default]
	found io.delta#delta-core_2.12;0.8.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/0.8.0/delta-core_2.12-0.8.0.jar ...
	[SUCCESSFUL ] io.delta#delta-core_2.12;0.8.0!delta-core_2.12.jar (271ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4/4.7/antlr4-4.7.jar ...
	[SUCCESSFUL ] org.antlr#antlr4;4.7!antlr4.jar (141ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.7/antlr4-runtime-4.7.jar ...
	[SUCCESSFUL ] org.antlr#antlr4-runtime;4.7!antlr4-runtime.jar (51ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar ...
	[SUCCESSFUL ] org.antlr#antlr-runtime;3.5.2!antlr-runtime.jar (24ms)
downloading https://repo1.maven.org/maven2/org/antlr/ST4/4.0.8/ST4-4.0.8.jar ...
	[SUCCESSFUL ] org.antlr#ST4;4.0.8!ST4.jar (24ms)
downloading https://repo1.maven.org/maven2/org/abego/treelayout/org.abego.treelayout.core/1.0.3/org.abego.treelayout.core-1.0.3.jar ...
	[SUCCESSFUL ] org.abego.treelayout#org.abego.treelayout.core;1.0.3!org.abego.treelayout.core.jar(bundle) (6ms)
downloading https://repo1.maven.org/maven2/org/glassfish/javax.json/1.0.4/javax.json-1.0.4.jar ...
	[SUCCESSFUL ] org.glassfish#javax.json;1.0.4!javax.json.jar(bundle) (9ms)
downloading https://repo1.maven.org/maven2/com/ibm/icu/icu4j/58.2/icu4j-58.2.jar ...
	[SUCCESSFUL ] com.ibm.icu#icu4j;58.2!icu4j.jar (407ms)
:: resolution report :: resolve 3527ms :: artifacts dl 953ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;0.8.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org.antlr#antlr-runtime;3.5.2 from central in [default]
	org.antlr#antlr4;4.7 from central in [default]
	org.antlr#antlr4-runtime;4.7 from central in [default]
	org.glassfish#javax.json;1.0.4 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   8   |   8   |   8   |   0   ||   8   |   8   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-5de77c2f-129c-475e-82e0-2445a001d1e5
	confs: [default]
	8 artifacts copied, 0 already retrieved (15304kB/63ms)

Looks like it worked, the JARS were all downloaded.

Delta Lake + PySpark

Ok, so let’s try some of this Delta Lake + Spark magic. Apparently Delta Lake out of the box supports HDFS as a backend, so this should be easy, time will tell. The first thing I want to do is just create a table in Delta Lake, I’m going to be using the open source Divvy Bike trips data set.

>> pyspark --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

>>spark.sql("""
  CREATE TABLE bike_trips_staging (
                ride_id STRING,
				rideable_type STRING,
				started_at TIMESTAMP,
				ended_at TIMESTAMP,
				start_station_name STRING,
				start_station_id INT,
				end_station_name STRING,
				end_station_id INT,
				start_lat REAL,
				start_lng REAL,
				end_lat REAL,
				end_lng REAL,
				member_casual STRING
  )
USING DELTA
""")

staging_data = spark.read.csv("202004-divvy-tripdata.csv", header='true')
staging_data.registerTempTable('temp')
spark.sql("INSERT INTO bike_trips_staging SELECT * FROM temp")
spark.sql("SELECT * FROM bike_trips_staging LIMIT 1").show()
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+---------+-------+--------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|start_lat|start_lng|end_lat| end_lng|member_casual|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+---------+-------+--------+-------------+
|A847FADBBC638E45|  docked_bike|2020-04-26 17:45:14|2020-04-26 18:12:03|        Eckhart Park|              86|Lincoln Ave & Div...|           152|  41.8964|  -87.661|41.9322|-87.6586|       member|

Nothing special so far, but I wonder… can we break this but messing up some of the ended_at date values to be garbage … Delta Lake should make this all blow up. You can see below I updated the first record’s value for ended_at to be not a date. I re-ran the code above, waiting for an error.

I was expecting Spark and Delta Lake to blow up on that INSERT, but no such luck. As you can see below it just inserted a NULL. This I suppose is better then nothing, this helps with data integrity for sure.

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+---------+-------+--------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|start_lat|start_lng|end_lat| end_lng|member_casual|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+---------+-------+--------+-------------+
|A847FADBBC638E45|  docked_bike|2020-04-26 17:45:14|               null|        Eckhart Park|              86|Lincoln Ave & Div...|           152|  41.8964|  -87.661|41.9322|-87.6586|       member|

Delta Lake constraints.

Well, this allows us to try something else Delta Lake offers … constraints. Anyone from the data warehousing world knows how useful constraints are for dealing with bad data, catching things up front. Let’s try to ALTER our Delta Lake table to add some constraint on the date column in question.

spark.sql("ALTER TABLE bike_trips_staging ADD CONSTRAINT dateWithinRange CHECK (ended_at > '1900-01-01')")

This should just make sure that records being inserted into the Delta Lake table have valid dates. Now what happens if we run the following line again?

spark.sql("insert into bike_trips_staging SELECT * FROM tmp")
>>> : org.apache.spark.sql.delta.schema.InvariantViolationException: CHECK constraint datewithinrange (`ended_at` > '1900-01-01') violated by row with values:
 - ended_at : null

Now that’s what I’m talking about! For anyone who has worked with large data sinks on s3 or some other file system … this is an awesome feature. Without this type of check, it’s inevitable that developer(s) writing files to some store will mess something up themselves, or the data will be bad, breaking something far down the pipeline.

Delta Lake CRUD operations.

This is the other feature that I’ve been wanting to check out for a long time. Being able to run UPDATE and DELETE (CRUD) type operations on a Parquet data store seems like the holy grail. Data morphs and changes over time, the business changes, mistakes happen. The best part of the classic RDBMS data warehouse is the flexibility of CRUD.

We have a column in our data called start_station_name that is associated with a start_station_id, what if one of the station name’s was wrong in our data lake and we needed to update it? Looks like we have 222 records that would be updated.

>>> spark.sql("SELECT COUNT(*) FROM bike_trips_staging WHERE start_station_id = 86").show()
+--------+                                                                      
|count(1)|
+--------+
|     222|
+--------+

Let’s run our UPDATE statement.

spark.sql("UPDATE bike_trips_staging SET start_station_name = 'Eckhart Park, East' WHERE start_station_id = 86")
spark.sql("SELECT * FROM bike_trips_staging WHERE start_station_id = 86").show()
+----------------+-------------+-------------------+-------------------+------------------+----------------+--------------------+--------------+---------+---------+-------+--------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|start_station_name|start_station_id|    end_station_name|end_station_id|start_lat|start_lng|end_lat| end_lng|member_casual|
+----------------+-------------+-------------------+-------------------+------------------+----------------+--------------------+--------------+---------+---------+-------+--------+-------------+
|A847FADBBC638E45|  docked_bike|2020-04-26 17:45:14|2020-04-17 17:17:03|Eckhart Park, East|              86|Lincoln Ave & Div...|           152|  41.8964|  -87.661|41.9322|-87.6586|       member|
|1BF773E0838E45E4|  docked_bike|2020-04-18 11:09:34|2020-04-18 12:25:48|Eckhart Park, East|  

That is so cool, just like the good ole’ days. Almost makes you forget you’re using a fancy new Delta Lake data warehouse! According to the documentation you can run just about any SQL query you can think of. You can ALTER tables, change data types, run MERGE statements, the list goes on. So cool!

Delta Lake vacuum and compaction.

All that coolness on-top of a Parquet data store makes any savvy Data Engineer a very important question. Whatever fancy Delta Lake stuff is doing in the background, it better be taking care of all those files … because it’s easy to start having problems with “too many small files”, the classic problem in big data, or even too many big files. Data partitioning is very important.

So apparently there are a few options for controlling this with Delta Lake. You can of course CREATE tables that have a particular PARTITION column that you’ve set … and you could also periodically just read and over-write all a table’s data, using Spark’s repartition method on a dataframe.

spark.read
 .format("delta")
 .load(path)
 .repartition(numFiles)
 .write.......

Delta Lake also has two built in features…

  • auto-compaction (delta.autoOptimize.autoCompact = true)
  • optimized writes (delta.autoOptimize.optimizeWrite = true)

You can add these options at Spark runtime or even ALTER a table to set these properties.

There is also the concept of VACUUM, the idea being cleaning up unused old files. Remember above when we ran that UPDATE statement, I’m sure it created some unused Parquet files somewhere. Lets a run a VACUUM in DRY RUN mode to see how many files it would clean up. (That statement also takes a RETAIN

  >>> spark.sql("VACUUM bike_trips_staging DRY RUN").show() 
  Found 0 files and directories in a total of 1 directories that are safe to delete. 
  +----+                                                                          
 |path|
 +----+
 +----+ 

It didn’t recommend delete any files, but I’m guessing based on the documentation it doesn’t like to DELETE things not older then a week.

Heck, this thing even had table history.

>>> spark.sql("DESCRIBE HISTORY bike_trips_staging").show()
+-------+-------------------+------+--------+--------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|     operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+--------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      5|2021-02-27 21:36:36|  null|    null|        UPDATE|[predicate -> (st...|null|    null|     null|          4|          null|        false|[numRemovedFiles ...|        null|
|      4|2021-02-27 21:31:21|  null|    null|         WRITE|[mode -> Append, ...|null|    null|     null|          3|          null|         true|[numFiles -> 1, n...|        null|
|      3|2021-02-27 15:14:16|  null|    null|ADD CONSTRAINT|[name -> dateWith...|null|    null|     null|          2|          null|        false|                  []|        null|
|      2|2021-02-27 15:12:14|  null|    null|        DELETE|   [predicate -> []]|null|    null|     null|          1|          null|        false|[numRemovedFiles ...|        null|
|      1|2021-02-27 15:05:02|  null|    null|         WRITE|[mode -> Append, ...|null|    null|     null|          0|          null|         true|[numFiles -> 1, n...|        null|
|      0|2021-02-27 14:55:37|  null|    null|  CREATE TABLE|[isManaged -> tru...|null|    null|     null|       null|          null|         true|                  []|        null|
+-------+-------------------+------+--------+--------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+

Other fun Delta Lake things.

Let’s try out a few other features, just because we can. How about an ALTER table to add a new column? No problem.

spark.sql("ALTER TABLE bike_trips_staging ADD COLUMNS (COUNTRY STRING)")
>>> spark.sql("UPDATE bike_trips_staging SET COUNTRY = 'USA'").show()
+----------------+-------------+-------------------+-------------------+------------------+----------------+--------------------+--------------+---------+---------+-------+--------+-------------+-------+
|         ride_id|rideable_type|         started_at|           ended_at|start_station_name|start_station_id|    end_station_name|end_station_id|start_lat|start_lng|end_lat| end_lng|member_casual|COUNTRY|
+----------------+-------------+-------------------+-------------------+------------------+----------------+--------------------+--------------+---------+---------+-------+--------+-------------+-------+
|A847FADBBC638E45|  docked_bike|2020-04-26 17:45:14|2020-04-17 17:17:03|Eckhart Park, East|              86|Lincoln Ave & Div...|           152|  41.8964|  -87.661|41.9322|-87.6586|       member|    USA|
+----------------+-------------+-------------------+-------------------+------------------+----------------+--------------------+--------------+---------+---------+-------+--------+-------------+-------+

You can even create a VIEW!

spark.sql("""
CREATE OR REPLACE VIEW rideable_type_summary AS
SELECT rideable_type, COUNT(ride_id) as ride_count
FROM bike_trips_staging
GROUP BY rideable_type;
""")
spark.sql("SELECT * FROM rideable_type_summary").show()
+-------------+----------+                                                      
|rideable_type|ride_count|
+-------------+----------+
|  docked_bike|     84776|
+-------------+----------+

If I had more time and the weather wasn’t so nice I might even try out the SCD (slowly changing dimensions) and classic SCD2 where all changes are tracked in a table. Check it out here.

Delta Lake musings.

I think everyone using Spark and big data sinks with Parquet and other files should be using Delta Lake. I’m sure there are reason(s) why people don’t, maybe? It kinda seems like a no brainer. Data quality and integrity are so hard to tackle in data sinks with distributed processing systems, it appears with about as little effort as possible, you can have your cake and eat it too.

It’s impressive when using Spark you can just add a few lines at the top and get the power of a Delta Lake at your finger tips. Being able to combine the classic approach and design of traditional data warehousing with constrains, schema integrity, CRUD and ACID operations , with the power of Spark and Big Data is almost too good to be true.

What I can image being a blocker for some people is the use of spark.sql() , I know it’s harder to write unit-testable and reusable code many times when using ANSI type SQL statements. It seems there are large number of Spark people’s who love using the DataFrame API for everything.

When you move towards classic data warehousing concepts that have been used in SQL for decades, it makes sense to use spark.sql() statements, which I personally enjoy, but others do not.