Databricks + Delta Lake MERGE duplicates – Deterministic vs Non-Deterministic ETL.
Is there any problem more classic to the Data Lakes and Data Warehouses than duplicate records? You would think after doing the same ETL for over a decade I could avoid the issue, apparently not. It’s good never to think too highly of one’s self, the duplicates can get us all. Today I want to talk about a wonderful feature of Databricks + Delta Lake MERGE
statements that are perfect for quietly and insidiously injecting duplicates into your Data Warehouse or Data Lake. This is a great trick to play on your unsuspecting coworkers.
Delta Lake MERGE statements.
Delta Lake is truly a wonderful tool for big data. One of the great features of Delta Lake is the MERGE
statement that is probably quite familiar to most Data Engineers and other Developers. The MERGE
statement is a simple way of meshing two data sets together and taking certain actions based on certain conditions.
Here are the basics of a MERGE statement between two data sets for SparkSQL.
MERGE INTO some_target_table
USING some_source_table
ON some_target_table.primary_key = some_source_table.primary_key
WHEN MATCHED THEN UPDATE SET
some_target_table.some_column = some_source_table.some_column
WHEN NOT MATCHED THEN INSERT
(some_column) VALUES (some_column);
We take two data sets, many times two Delta Tables, join them, then we tell the MERGE
what to do if we have a MATCH
and what to do when NOT MATCHED
.
Typically this takes the form of a UPDATE
if MATCHED
and a INSERT
if NOT MATCHED
, pretty straightforward. MERGE statements are wonderful for reducing the complexity of getting new or updated data into your Data Lake or Warehouse and have been in use a long time on many RDBMS for years.
Congratulations, you now have duplicates.
So what’s the big problem then? Well, there is a problem … and it’s a sneaky one too. Go ahead, read the documentation for MERGE
statements on Databricks website, you will find no reference to something called “deterministic vs non-deterministic.” We will dig into this later.
Apparently, there are some edge cases, and probably at certain large data sizes the deterministic problem takes over, and your trusty MERGE
statement will start introducing duplicates during the INSERT
and WHEN MATCHED
a portion of your logic!! This is probably every data developer’s worse nightmare.
Now let me be clear, these duplicates have NOTHING to do with your source or target, many times duplicates can be introduced because of problems with the JOIN
or ON
clause. This is not what I’m talking about … I’m talking about a single source record with NO match in the TARGET table getting multiple records inserted, run after run!
Hard to Troubleshoot.
Probably what makes this problem so hard is the fact that when a person sees duplicates in their data, we automatically assume that it is a data or query problem. That the duplicates are being caused by a bad join or other data in the system. But, in the “deterministic vs non-deterministic” situation, this is not the case. It’s simply happening “under the hood”, and therefore is not easy to identify where the problem is.
This problem of deterministic ETL seems to apply to DatabricksSQL MERGE
statements, so unless you sit around reading JIRA issues on Delta Lake, you would have no idea about this “problem.”
Deterministic vs Non-Deterministic
I discovered a short little blurb on an inconspicuous Databricks page when trying to unravel my own duplicate issues, due to this issue, which was news to me. Why doesn’t this issue take a semi-prominent position in the normal MERGE
documentation, probably only Gandalf knows. Here is the sum total of what they tell you.
“A
merge
operation can produce incorrect results if the source dataset is non-deterministic. This is becausemerge
may perform two scans of the source dataset and if the data produced by the two scans are different, the final changes made to the table can be incorrect. Non-determinism in the source can arise in many ways. Some of them are as follows:Reading from non-Delta tables. For example, reading from a CSV table where the underlying files can change between the multiple scans.
Using non-deterministic operations. For example,
– our friends at Databricks.Dataset.filter()
operations that uses current timestamp to filter data can produce different results between the multiple scans.
Eventually, I ran across this nice Delta discussion on this topic. Reading through it gives you a good idea of what to look for. If in theory, the data scan on the source could produce different results in short succession, then you have a non-deterministic problem. What jumped out to me right away was the DATETIME
stamps I was using to pull my SOURCE dataset used in my MERGE
. It looked like this.
SELECT *
FROM {database}.{staging_table}
WHERE {column} >= current_date() - INTERVAL {days_back} DAYS;
I supposed that my current_date()
call, along with subtracting some X number of days … most likely was “Non-Deterministic.” Although this seemed strange because my source staging table is only loaded once, and sometime after the MERGE
statement would run … meaning I guess it could be non-deterministic but seems pretty deterministic on the face of it.
Fixing the Issue.
As far as I could tell I would need to steer away from pretty much any sort of DATE
or DATETIME
pull before my MERGE
. The funny thing is I use them a lot of places, the small datasets have no problem, this duplicate issue for non-deterministic datasets only appears for large datasets. Maybe because the data scan or read takes long? I don’t know.
In my case, I was dealing with 128,043,302
records on the INPUT or SOURCE side. This would produce a whopping 28,726,745
duplicates in the TARGET table. Of course, the TARGET table has billion of rows, but the MERGE
statement is qualified with PARTITION
pruning. Yikes, every day, that’s a lot of duplicates.
There are a number of ways to fix the deterministic issue. Usually, you just have to use some other logic, although I read simple doing a CACHE()
or PERSIST()
on said source data should solve the issue. In my case, I just changed how I dealt with the dates, which are partitions always. Maybe something like this, although not good for when crossing over months
.
SELECT *
FROM {database}.{staging_table}
WHERE year in ({years}) AND month in ({months}) AND day in ({days});
You get the idea.
Musings
Moral of the story? I love new tools like Delta Lake, and in combination with Databricks it’s an unbeatable set of tooling. That being said, no matter the tool, there are always bugs for Data Engineers to watch out for, it’s just life.