The Elusive Idempotent Data Load/ETL
This is a topic I’ve been musing about lately. The idempotent data load has been a source of much pain and suffering in the lives of many a data engineer and data warehouse developers. Apparently somethings don’t change with the passage of time. My first job in tech was working on a data warehouse team with a classic Kimball style model on SQL Server, back then worrying how to make data loads and ETL idempotent was the task of the hour. All these years later working on data lakes in DataBricks with Spark … guess what …. still worrying about idempotent ETL and data loads.
What are idempotent data loads?
What does idempotent mean?
… if I do the same thing over and over, I get the same result.
– data patriarchs
How does the idea of a idempotent function/method apply to data warehousing and data lakes? This is where things can get a little strange when trying to practically tie this idea to ETL and data loads. It’s a difficult topic but a very important one. There are a few obvious examples, and a few not so obvious.
- when running a re-load on ETL, data should not be duplicated downstream.
- if ETL breaks or is down for a day, the next run should catch up.
- running ETL today, or historically (for a year ago) should not cause un-intended consequences.
- the integrity of facts and dimensions and their unique keys cannot be compromised, and any changes should be cascading.
So, after all that, how can we better define idempotent data loads?
Our target tables must never be compromised or require manual intervention for the inevitable and ever ranging data load and ETL scenarios. The target table(s) are idempotent in the sense that that there is an acceptable “state” they should be in at every point in time (every day) and so our ETL and data loads must have the required complexity to always produce that idempotent state.
It can’t have duplicated records, it can’t have missing records, it can’t have orphaned keys or corrupted records. Based on a set of given “raw” data that exists today, our target result/table is idempotent in the sense that there is a single state that should exist from the output of our transformations from that raw data.
Idempotent data load with DeltaLake and Spark.
Let’s use a simple example to examine how we might handle a data load using DeltaLake and PySpark. As always we will use the open source Divvy Bike Trips data set.
Let’s pretend like we get this bike trip data every hour from a source system and we are tasked with loading it into a data lake. We get CSV files at hourly intervals that look like this.
You get the idea.
Solving the first problem in idempotent data loads, duplicate records.
The first thing most data engineer’s think about when designing a ETL pipeline to bring data into a data warehouse or data lake is probably this. How do I not duplicate records under any circumstance. Sometimes this is easy, sometimes it’s not so easy, usually depends on the the data and it’s inherit complexity.
Traditionally most folks have solved this in the past with something called the Primary Key, a unique identifier for each record. Most developers have learned to not trust the data, what’s to say in our case there was a blip in the matrix and the system that sends us these bike trip files, sent us the same file twice? Or worse what if they sent us a different file with half the records from the file it already sent the day before?
Lesson 1. Find the “business” primary key of the raw data that you are ingesting. In our example we get the following headers.
ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
In this case it’s fairly obvious, we get a ride_id
and we can be fairly confident that we could check against our already loaded data to ensure this doesn’t exist. But let’s pretend we didn’t get an id like that, what else could we do?
Well, if we think about bike trips in general, I would say that someone taking a bike from a start_station_id
and going to a end_station_id
that started the ride at started_at
and ended_at
time is probably a pretty good indication of a unique bike trip, and therefore record.
Lesson 2: Always guard against duplicate data being introduced.
This is straight forward code in DeltaLake with PySpark. We simply create our own unique primary key for each record ….
.withColumn("unique_id", F.sha2(F.concat_ws("||", *primary_key), 256))
So, when we are inserting new records we can easily check to see if the row has already been imported.
already = spark.sql("SELECT unique_id FROM trip_data;")
new_records = df.join(already, on='unique_id', how="leftanti")
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, FloatType
spark = SparkSession.builder \
.config("spark.master", "local") \
.config('job.local.dir', '/Users/danielbeach') \
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.appName('DeltaTest').getOrCreate()
spark.sql("""
CREATE OR REPLACE TABLE trip_data (
unique_id STRING,
ride_id STRING,
rideable_type STRING,
started_at TIMESTAMP,
ended_at TIMESTAMP,
start_station_name STRING,
start_station_id INTEGER,
end_station_name STRING,
end_station_id INTEGER,
start_lat REAL,
start_lng REAL,
end_lat REAL,
end_lng REAL,
member_casual STRING
)
USING DELTA
LOCATION 'data_warehouse/trips'
PARTITIONED BY (started_at);
""")
df = spark.read.csv('/Users/danielbeach/Downloads/trips/*.csv', header='true')
df = df.withColumn('started_at', F.to_timestamp(F.col('started_at'))) \
.withColumn('ended_at', F.to_timestamp(F.col('ended_at'))) \
.withColumn('start_station_id', F.col('start_station_id').cast(IntegerType())) \
.withColumn('end_station_id', F.col('end_station_id').cast(IntegerType())) \
.withColumn('start_lat', F.col('start_lat').cast(FloatType())) \
.withColumn('start_lng', F.col('start_lng').cast(FloatType())) \
.withColumn('end_lng', F.col('end_lng').cast(FloatType())) \
.withColumn('end_lat', F.col('end_lat').cast(FloatType()))
primary_key = ['start_station_id', 'end_station_id', 'started_at', 'ended_at']
df = df.withColumn("unique_id", F.sha2(F.concat_ws("||", *primary_key), 256))
df.show()
already = spark.sql("SELECT unique_id FROM trip_data;")
new_records = df.join(already, on='unique_id', how="leftanti")
new_records.write \
.format("delta") \
.mode("append") \
.save("data_warehouse/trips")
Lesson 3: How can we accommodate clean data reloads, while preserving our data duplication logic?
Another critical aspect of idempotent data loads is the ability to reload data automatically, without resorting to other manual tricks. If we find out a bad file was loaded, or corrupted file, how can we reload data that already exists?
Probably one of the best ways to deal with this, is by keeping it simple. Let’s just assume if file(s) and record(s) are being sent into our data lake again, that the purpose is for them to be reloaded. The other option would be to indicate something like “reload” during the runtime configuration that could be passed into the ETL.
Luckily for us DeltaLake with Spark had a wonderful feature called MERGE INTO
that many classic RDBMS data warehouse workers will recognize.
This will solve the problem of what do we do when we don’t want duplicate data introduced, but also might want to UPDATE records if we are importing them again.
df = spark.read.csv('/Users/danielbeach/Downloads/trips/*.csv', header='true')
df = df.withColumn('started_at', F.to_timestamp(F.col('started_at'))) \
.withColumn('ended_at', F.to_timestamp(F.col('ended_at'))) \
.withColumn('start_station_id', F.col('start_station_id').cast(IntegerType())) \
.withColumn('end_station_id', F.col('end_station_id').cast(IntegerType())) \
.withColumn('start_lat', F.col('start_lat').cast(FloatType())) \
.withColumn('start_lng', F.col('start_lng').cast(FloatType())) \
.withColumn('end_lng', F.col('end_lng').cast(FloatType())) \
.withColumn('end_lat', F.col('end_lat').cast(FloatType()))
primary_key = ['start_station_id', 'end_station_id', 'started_at', 'ended_at']
df = df.withColumn("unique_id", F.sha2(F.concat_ws("||", *primary_key), 256))
df.createOrReplaceTempView("raw_trips")
spark.sql("""
MERGE INTO trip_data
USING raw_trips
ON trip_data.unique_id = raw_trips.unique_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
"""
)
Well that was easy, what’s the big deal?
Lesson 4: What about historical records and keys that have cascaded? (aka think twice about complexity)
Think how simple the above code is to be able to harden a data lake or data warehouse. There’s something to be said for simplicity, when we multiply complexity we multiply headaches and consequences downstream. That’s why I’m an advocate of always choosing the easy route when possible, and not inventing complexity just to have something to engineer.
Historical record tracking and CDC is a very common data warehousing practice, that adds a whole lot of complexity to the question of how to make data loads idempotent. Along with that using synthetic primary and surrogate keys that cascade into other tables and dataset’s also multiplies the headache of idempotent data loads to an astronomical level.
Let’s give an example. Say in our dataset we are tracking a dimension table of our station_name
and station_id
that shows changes to the name, and for arguments sake also has a synthetic key we’ve created, which is cascaded into our main table.
spark.sql("""
CREATE OR REPLACE TABLE station_dimension (
synthetic_station_id INTEGER,
station_name STRING,
source_station_id INTEGER,
active_start DATETIME,
active_end DATETIME,
current_record INTEGER
)
""")
So now we might have a table that could look something like this…
Now if you have to reload, what’s to be done? A simple MERGE statement will no longer cut it. In theory if we have a file come in on 5/2/2021
that reverse’s the above Station name change because we got corrupted data, how do we update the table and rollback the synethic_station_id
changes that probably cascaded into other spots.
I’m not going to write out all the code, but you get the idea.
- reset the historical dimension table
- gather synthetic id’s needing removal
- update cascading tables to reset id’s.
Musings on idempotent data loads
I feel like spending the time up front to decipher how different data loads and ETL can be made idempotent is a sort of a lost art. I also feel like adding complexity of CDC and primary/surrogate keys is done BEFORE thinking about how that will effect the downstream ETL that needs to be designed in an idempotent. In many cases we just add requirements because we can, and then either design ETL that fails to deal with the range of reload situations can happen, or take the time to design the the idempotent ETL when it’s really just wasted work, because those features are not necessary.
I do think that DeltaLake with Spark has breathed new life into the ability for Big Data people’s to design idempotent data lake and data warehouse loads without re-inventing the wheel.