Soda-Core. Data Quality at Scale.
Ever since playing with Great Expectations with Spark some time ago, I’ve been on the lookout for more Data Quality at-scale tools. The market still has a long way to go with these tools, not enough options, hard to use, and the typical Data Engineering travails. I came across soda-core recently, a self-proclaimed…
“Data reliability testing for SQL- and Spark- accesssible data.“
soda-core docs
Doing anything at scale, well … that’s usually the problem. Data Quality and Observability are topics were hear a lot about these days. The reality often doesn’t meet the expectations most of the time. Even Great Expectations, being awesome, can get complicated real quick-like. Let’s hope that soda-core pair with Spark can show us some real promise. Code available on GitHub.
Introduction to soda-core
soda-core
is an open-source cli
and Python
library for data-quality testing, data observability, and data monitoring, at least that’s what the documentation says.
“When it runs a scan on a dataset, Soda Core executes the checks to find invalid, missing, or unexpected data. When your Soda Checks fail, they surface the data that you defined as “bad”.”
soda-core
soda-core
comes with the following packages and integrations, which is a nice list. For our exploration and testing, we will try out the soda-core-spark-df
.
soda-core-athena
soda-core-bigquery
soda-core-db2
soda-core-postgres
soda-core-redshift
soda-core-snowflake
soda-core-spark-df
soda-core-sqlserver
soda-core-mysql
soda-core-trino
What are the steps to using soda-score
?
- pip install
soda-score
with the needed packages. - write a
configuration.yml
with the info needed for your datasources
. (exceptSpark
DataFrame sources) - write data quality checks in
checks.yml
for the data. - run a
scan
to get the results.
This honestly reminds me of Great Expectations, hopefully with a little better and easier implementation. Time will tell.
soda-core + PySpark
Let’s try out soda-score
with Spark
and see what happens. I’m going to be looking for the things like installation, ease and straightforwardness of configuration, how to write data quality checks, check results, and other such real-life things. Data Quality still is in need of easy-to-use and implement Data Quality tooling, hopefully soda-score
will be our savior.
We will be using just some local Spark installation, and some divvy bike trips data in my AWS bucket. Since we are going to be running our checks against a DataFrame
we don’t need to use a configuration.yml
file.
Oh great, here comes the first of the fun. Python dependency fun.
pip install soda-spark
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
conda-repo-cli 1.0.4 requires pathlib, which is not installed.
anaconda-project 0.9.1 requires ruamel-yaml, which is not installed.
sphinx 4.0.1 requires MarkupSafe<2.0, but you have markupsafe 2.0.1 which is incompatible.
awscli 1.18.190 requires botocore==1.19.30, but you have botocore 1.20.112 which is incompatible.
awscli 1.18.190 requires PyYAML<5.4,>=3.10; python_version != "3.4", but you have pyyaml 5.4.1 which is incompatible.
awscli 1.18.190 requires s3transfer<0.4.0,>=0.3.0, but you have s3transfer 0.4.2 which is incompatible.
Oh boy, finally got that figured out. That will make your hair fall out or turn grey. Below is the script I am trying to run, just to see if soda works. After some more messing around …
from pyspark.sql import DataFrame, SparkSession
from sodaspark import scan
spark = SparkSession.builder.getOrCreate()
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.access.key", "")
hadoop_conf.set("fs.s3a.secret.key", "")
df = spark.read.csv('s3a://confessions-of-a-data-guy/*csv', header='true')
df.show()
danielbeach@Daniels-MacBook-Pro Desktop % /usr/local/Cellar/apache-spark/3.1.1/bin/spark-submit sodaDQ.py
And … another error. Sigh.
Traceback (most recent call last):
File "/Users/danielbeach/Desktop/sodaDQ.py", line 2, in <module>
from sodaspark import scan
File "/Users/danielbeach/.venv/lib/python3.8/site-packages/sodaspark/scan.py", line 14, in <module>
from sodasql.scan.scan import Scan
File "/Users/danielbeach/.venv/lib/python3.8/site-packages/sodasql/scan/scan.py", line 34, in <module>
from sodasql.scan.warehouse import Warehouse
File "/Users/danielbeach/.venv/lib/python3.8/site-packages/sodasql/scan/warehouse.py", line 17, in <module>
from sodasql.telemetry.soda_telemetry import SodaTelemetry
File "/Users/danielbeach/.venv/lib/python3.8/site-packages/sodasql/telemetry/soda_telemetry.py", line 18, in <module>
from sodasql.telemetry.soda_exporter import SodaConsoleSpanExporter, SodaOTLPSpanExporter
File "/Users/danielbeach/.venv/lib/python3.8/site-packages/sodasql/telemetry/soda_exporter.py", line 10, in <module>
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
File "/Users/danielbeach/.venv/lib/python3.8/site-packages/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py", line 40, in <module>
from opentelemetry.exporter.otlp.proto.http.trace_exporter.encoder import (
File "/Users/danielbeach/.venv/lib/python3.8/site-packages/opentelemetry/exporter/otlp/proto/http/trace_exporter/encoder/__init__.py", line 19, in <module>
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
File "/Users/danielbeach/.venv/lib/python3.8/site-packages/opentelemetry/proto/collector/trace/v1/trace_service_pb2.py", line 14, in <module>
from opentelemetry.proto.trace.v1 import trace_pb2 as opentelemetry_dot_proto_dot_trace_dot_v1_dot_trace__pb2
File "/Users/danielbeach/.venv/lib/python3.8/site-packages/opentelemetry/proto/trace/v1/trace_pb2.py", line 14, in <module>
from opentelemetry.proto.common.v1 import common_pb2 as opentelemetry_dot_proto_dot_common_dot_v1_dot_common__pb2
File "/Users/danielbeach/.venv/lib/python3.8/site-packages/opentelemetry/proto/common/v1/common_pb2.py", line 36, in <module>
_descriptor.FieldDescriptor(
File "/Users/danielbeach/.venv/lib/python3.8/site-packages/google/protobuf/descriptor.py", line 560, in __new__
_message.Message._CheckCalledFromGeneratedFile()
TypeError: Descriptors cannot not be created directly.
If this call came from a _pb2.py file, your generated code is out of date and must be regenerated with protoc >= 3.19.0.
If you cannot immediately regenerate your protos, some other possible workarounds are:
1. Downgrade the protobuf package to 3.20.x or lower.
2. Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python (but this will use pure-Python parsing and will be much slower).
Now what the crud am I supposed to do? A protobuff error? Maybe if I install an older version of soda
. I ended up installing back through the history of releases for soda-score
until I finally found a working release from 2021
, this one is good to go.
pip3 install soda-spark==0.2.2
It’s hard to take something seriously when you can’t pip install it reliably on Python 3.8
, on a Mac
for crying out loud. They stop worrying about trying to sell everyone Soda Cloud and put some effort into their open-source version. If I can’t trust your open-source version and the quality of work, why would I buy your product?
Hold on one sec, I gotta dismount my high horse so we can continue on.
Writing some soda-core code.
The next step is to figure out how to define a Scan
, which I can run against my DataFrame
. There is sort of a poor example here in the docs, but for the life of me, I could not locate anywhere in the docs somewhere that gave me a list or view of all the available checks etc that I can define in a Scan
… which makes it kind of hard to make one. Go figure.
At last, I found what I was looking for … an explanation of Metrics and Checks, which consists of
- metrics ( or “property” of your data set)
- threshold ( or “value” for your metric)
It appears that Soda offers a variety of ways to write Checks
, the can be …
- Standard Checks ( fixed threshold ex. row count )
- Uniqueness Checks ( is this in that ex. check that product_code’s exist in product table )
- User-Defined Checks ( CTE’s and SQL queries that define checks )
So far I would have to say I enjoy the Data Quality (DQ) implementation from Soda
, it’s actually easier to understand and reason about than Great Expectations, by far, in my opinion, and they appear to be just as powerful, especially with the custom SQL
option. Let’s write a few on our sample DataFrame
.
Writing Data Quality checks with Soda-Core and Spark.
Here is the example yaml
file I wrote, with just some basic data checks.
table_name: tripdata
metrics:
- row_count
tests:
- row_count > 0
columns:
ride_id:
valid_format: string
sql_metrics:
- sql: |
SELECT COUNT(DISTINCT rideable_type) as rideable_types
FROM tripdata
tests:
- rideable_types = 2
As you can see from the above yaml
, it’s pretty much what one would expect, very straight forward. Things like do a row_count
and make sure it’s not zero. Check this column and make sure it’s a string
. Run this SQL
query and make sure it produces the correct results. This gives me a good idea that you can get very complicated or very simple in your data checks. It’s a good sign.
Here is the code to read the scan
and get the results
.
from pyspark.sql import DataFrame, SparkSession
from sodaspark import scan
from pathlib import Path
spark = SparkSession.builder.getOrCreate()
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.access.key", "AKIA6OLVUKD4OD7OZDAL")
hadoop_conf.set("fs.s3a.secret.key", "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")
df = spark.read.csv('s3a://confessions-of-a-data-guy/*csv', header='true')
scan_definition = Path('checks.yaml').read_text()
scan_result = scan.execute(scan_definition, df)
print(scan_result.measurements)
print(scan_result.test_results)
By the Hammer of Thor … it works.
22/09/14 16:22:07 INFO DAGScheduler: Job 3 finished: first at /Users/danielbeach/.venv/lib/python3.8/site-packages/sodaspark/scan.py:150, took 33.249748 s
[Measurement(metric='schema', column_name=None, value=[{'name': 'ride_id', 'type': 'string', 'dataType': 'string', 'nullable': True,
'logicalType': 'text', 'semanticType': 'text'}, {'name': 'rideable_type', 'type': 'string', 'dataType': 'string', 'nullable': True, 'logicalType': 'text', 'semanticType': 'text'}, {'name': 'started_at', 'type': 'string', 'dataType': 'string', 'nullable': True,
'logicalType': 'text', 'semanticType': 'text'}, {'name': 'ended_at', 'type': 'string', 'dataType': 'string', 'nullable': True, 'logicalType': 'text', 'semanticType': 'text'}, {'name': 'start_station_name', 'type': 'string', 'dataType': 'string', 'nullable': True, 'logicalType':
'text', 'semanticType': 'text'}, {'name': 'start_station_id', 'type': 'string', 'dataType': 'string', 'nullable': True, 'logicalType': 'text', 'semanticType': 'text'}, {'name': 'end_station_name', 'type': 'string', 'dataType': 'string', 'nullable': True, 'logicalType': 'text',
'semanticType': 'text'}, {'name': 'end_station_id', 'type': 'string', 'dataType': 'string', 'nullable': True, 'logicalType': 'text', 'semanticType': 'text'}, {'name': 'start_lat', 'type': 'string', 'dataType': 'string', 'nullable': True, 'logicalType': 'text',
'semanticType': 'text'}, {'name': 'start_lng', 'type': 'string', 'dataType': 'string', 'nullable': True, 'logicalType': 'text', 'semanticType': 'text'}, {'name': 'end_lat', 'type': 'string', 'dataType': 'string', 'nullable': True, 'logicalType': 'text', 'semanticType': 'text'}, {'name': 'end_lng', 'type': 'string', 'dataType': 'string', 'nullable': True, 'logicalType': 'text', 'semanticType':
'text'}, {'name': 'member_casual', 'type': 'string', 'dataType': 'string', 'nullable': True, 'logicalType': 'text',
'semanticType': 'text'}], group_values=None), Measurement(metric='row_count', column_name=None, value=3013482, group_values=None), Measurement(metric='rideable_types', column_name=None, value=3, group_values=None)]
[TestResult(test=Test(id='{"expression":"row_count > 0"}', title='test(row_count > 0)', expression='row_count > 0', metrics=['row_count'],
column=None), passed=True, skipped=False, values={'expression_result': 3013482, 'row_count': 3013482}, error=None, group_values=None)]
We can see in our test results above, for example, that our row_count
test passed. It’s easy to see how you could integrate, with very little effort soda-core
DQ checks into your pipeline, or even have DQ checks running in their own pipeline.
The checks are easy and straightforward to write, you can be simple, like row count, or be complex and write custom SQL checks as in our example. You could easily alert on if the tests passed or not, or any of the results for that matter, by doing some simple Slack integrations etc.
I can see how useful this code would be for gathering metrics as well and writing off those metrics into their own dataset with a Dashboard for monitoring.
Musings on soda-core + Spark.
I would have to say I’m pleasantly surprised. Soda-core, far and away, is the easiest Data Quality tool I’ve run across yet. Easy to install and use, I mean just pip, after all, the definitions are easy to understand and implement with yaml
, and you have the option to be easy or fancy with the metrics and checks you write.
I’m fairly certain I would pick soda-core
over Great Expectations any day. Why?
- easy installation and administration.
- concepts are way more intuitive.
- writing checks is easy.
- provides the same functionality and end result with less effort.
I’m utterly surprised there has not been more chitter chatter in the Data Engineering community over this tool. With soda-core
being open source and free, with no overhead besides an pip
install, and its straightforward implementation … this tool would be a game changer for many Data teams.
Soda-core is my new unsung hero.
Totally agree. I have not yet tried the newer version with Soda CL, but this has been by far the easiest. We have built a simple framework to run recon using soda and deploy our quality pipelines in a rapid way. Once I had at look at this, I immediately ditch great expectations 🙂 Found it difficult to explain or make it simpler for faster adoption
Hear hear.