, , , ,

Reducing Complexity with Databricks + Delta Lake COPY INTO

As the years drag by in Data Engineering, there are a few things that I have come to appreciate more and more. One of those topics that is close to number one on the list is complexity reduction. Today’s modern data stacks are filled to the brim with technologies and tools, full to the brim, and overflowing. So many tools with such wonderful features, sometimes all the magic comes with a downside. Complexity. Complexity can turn something wonderful into a nightmare.

Reducing (not avoiding) complexity seems to be one of the main tenets I work on these days when designing resilient, reliable, and repeatable data pipelines that can process terabytes of data. One of those tools is COPY INTO feature of Databricks + Delta Lake.

Complexity Reduction in Big Data Pipelines

Why is complexity reduction so important in today’s fast-moving world of Data Lake and Big Data pipelines? Because large datasets come with enough trials and tribulations of their own, without adding more to the mix. It really seems like problems and bugs that are easy to fix at a small scale can become almost insurmountable at scale. Common problems at scale …

  • Resource (compute CPU/memory/storage) Management
  • Data Quality (see Great Expectations)
  • Pipeline Runtimes
  • Metadata Management
  • Knowing when things go wrong.

I mean we could just keep writing for hours about different problems, but in Big Data, as with any technical solution, accidental or required complexity can take problems like those listed above and compound them. It’s critical to try to correctly manage and reduce complexity to enable the approachability of architecting efficient and usable data pipelines.

One of those areas of complexity that is common to Data Lakes and Warehouses pipelines is the “L” in ETL or ELT … aka the load step.

Data Loads and sneaky complexity.

When you’re new to Data Engineering it can be tempting to think that the load portion of your data pipelines is the easiest and most simple part. I mean, non of your business logic is being applied most likely, you’re just worried about simply loading data later for transformation and use. I mean read file or data, push file or data to some data shink, easy, done, move on with life, right? Not.

Such simplistic views of data loading will most likely come back to haunt you. What are some of those sneaky complexity killers that will come get you in your sleep?

  • How do you load only new data?
  • How do you not duplicate data?
  • How do you track what has been, or has not been loaded?
  • How can data loads be idempotent?

Things are never as simple as they seem! From the very start of your data pipelines, the data load, we have to be thinking about complexity, controlling it, yet solving all our use cases and needs. We never want to load the same data more than once, we don’t want duplicates, we need to be able to run the pipeline multiple times, it needs to be idempotent.

COPY INTO with Delta Lake + Databricks.

With all that being said, and more left unsaid, there are some wonderful tools and features that allow us to remove some of this complexity in data loads. One of those features is COPY INTO that is provided with Delta Lake + Databricks. It’s simple to use, powerful, and incredibly flexible.

“Loads data from a file location into a Delta table. This is a re-triable and idempotent operation—files in the source location that have already been loaded are skipped.”

docs.

The COPY INTO is a SparkSQL statement that enables you to load data (from pretty much any location) into a Delta table with ease.

How does COPY INTO reduce load complexity?

COPY INTO allows a data file to be loaded only once, and no other work is required from you! No code, no tracking files in a database … it just works. For many data pipelines, this enables a reduction in the amount of code written immediately! No complexity with processed files stored in a database, no complex movement of files before and after processing!

Also, the COPY INTO statement is compact, allowing for complex data loading techniques in a single straightforward SparkSQL format.

Features of COPY INTO

Let’s review the basics of a Delta Lake COPY INTO statement, and how to use them for more complex data loads as well (Remember, this is SparkSQL).

%sql
COPY INTO database.my_wonderful_staging_table
FROM 's3a://my-remote-s3-bucket/incoming_files'
FILEFORMAT = CSV
FORMAT_OPTIONS('header' = 'true', 'sep' = '\t');

So what do we have for the basics?

  • target Delta table (database.my_wonderful_staging_table)
  • FROM indicates the location of data you want to load (even remote s3 buckets for example).
  • FILEFORMAT is what kind of file.
  • FORMAT_OPTIONS is where we can indicate normal file options like headers, separators etc.

This is quite amazing that we can write such a small piece of SparkSQL code, point it at a file location and a Delta table, and with those few lines of code, we now have an idempotent data load operation that will only load new data files that show up in the FROM location … no matter how many times we run it!

Talk about complexity reduction.

But my Data Loads are more complex than that!

I know I know, how often are data loads that easy? I agree, but never fear, the COPY INTO statement is able to support more complex data loads. One of those features is support for a SELECT statement to be applied to the loaded FROM data.

%sql
COPY INTO database.my_wonderful_staging_table
(
   SELECT
      transaction_id,
      to_timestamp(transaction_date, 'M/d/yyyy H:mm') AS order_date,
      transaction_amount AS amount,
      product_id,
      product_category AS product_name,
      input_file_name() as load_file
   FROM 's3a://my-remote-s3-bucket/incoming_files'
)
FILEFORMAT = CSV
FORMAT_OPTIONS('header' = 'true', 'sep' = '\t');

This is super powerful for a number of reasons, since we are using SparkSQL we have all the spark.sql.functions at our fingertips, allowing more complex ingestion and transformation if needed. In my simple example above, you can see I’m using to_timestamp and input_file_name, to do some minor modifications and data enrichment.

The ability to use SELECT statements and spark.sql.functions during the COPY INTO process is powerful yet simple and straightforward.

Also, there are a wide range of supported FILEFORMATs, pretty much all the major file formats are covered.

  • Parquet
  • CSV
  • TEXT
  • JSON
  • AVRO
  • ORC
  • BINARYFILE

More inserting COPY INTO features.

Just like late-night TV … but wait there is more! COPY INTO also has a few other nice features to help extend functionality.

  • FILES (specify files)
  • PATTERN (match some pattern)
  • COPY_OPTIONS

If you’re interested in loading some set of specific files that can be done with the FILES option.

%sql
COPY INTO database.my_wonderful_staging_table
FROM 's3a://my-remote-s3-bucket/incoming_files'
FILEFORMAT = CSV
FILES ('text_file_1.txt', 'text_file_2.csv')
FORMAT_OPTIONS('header' = 'true', 'sep' = '\t');

The PATTERN option is also very powerful and helpful when source data files might be messy in their organization.

%sql
COPY INTO database.my_wonderful_staging_table
FROM 's3a://my-remote-s3-bucket/incoming_files'
FILEFORMAT = CSV
PATTERN = 'my_sub_folder/file_*.csv'
FORMAT_OPTIONS('header' = 'true', 'sep' = '\t');

Also, when all else fails, and you had to do some clean up in your Data Lake and need to force some files back into your Delta Table …. the FORCE option inside COPY_OPTIONS is invaluable.

%sql
COPY INTO database.my_wonderful_staging_table
FROM 's3a://my-remote-s3-bucket/incoming_files'
FILEFORMAT = CSV
FILES ('text_file_1.txt', 'text_file_2.csv')
FORMAT_OPTIONS('header' = 'true', 'sep' = '\t')
COPY_OPTION('force' = 'true');

I hope your being to see how powerful COPY INTO can be, the features, and the complex data ingestions that can happen in a simple and straight forward manner. It can be hard to find ways to reduce complexity in Data Engineering pipelines, but using the COPY INTO approach on Delta Lake + Databricks provides the power of Spark with the simplicity of SQL in one simple statement.

Musings on COPY INTO and complexity reduction for data loads.

I’m a huge fan of using tools and features that truly reduce the complexity of data pipelines. When I can manage less lines of code, that is always a win in my book. Even if I have to give up features, many times it can be worth it for the complexity reduction aspect. With COPY INTO I get amazing features like idempotency, with the ability to use complex spark.sql.functions all in one easy to understand and use statement.

For example, when you’re receiving millions of files, the simple loading of data in your Data Lake or Warehouse all the sudden takes on a more serious note. How do you ensure you capture all the new incoming files, what if something breaks and the ingest pipeline has to be re-run? Such complexity has to be dealt with and can keep Data Engineers up a night.

Delta Lake in itself is a wonderful tool, adding features like COPY INTO make data pipeline ingestion easy and fun. I’m always on the lookout for complexity reduction and simple features that give me an edge in my data pipeline architecture … and this is one of them.