, , , ,

Intro to Apache Cassandra for Data Engineers

Who who? Apache Cassandra, who?

Hmm… yet another distributed database …. will it ever end? Probably not. It’s hard to keep up with them all, even the old ones. That brings me to Apache Cassandra. Of all the popular big data distributed databases Cassandra seems to be kind of that student who always sits in the back row and never says anything… you forget they are there…. until someone says their name….. Apache Cassandra. I honestly didn’t even know what space Cassandra fit in before trying to install and use it… so this should fun. What Is Cassandra? Distributed NoSQL.

What’s the deal with Apache Cassandra?

So what is Apache Cassandra? It’s a big data, distributed, NoSQL, wide column database. What a mouthful. You know I just don’t hear much about Apache Cassandra, I’m guessing there is a few reason for this.

  • The Cassandra documentation itself says that it was partly based on AWS DynamoDB… so many companies are on AWS.. they probably just use it?
  • There are other tools for NoSQL like MongoDB that are well known and widely used.
  • “…eventually consistent semantics.” I’ve seen a lot of projects give the boot to eventually consistent systems… seems like ACID is king.

Who knows, maybe there are other reason I will find when installing and trying to use Cassandra.

What else should we know about Apache Cassandra?

I guess one of the bigger features is Cassandra Query Language (CQL), basically a SQL like language. It will be interesting to check this out, not sure why you wouldn’t just try to support ANSI SQL like so many other projects. I guess CQL is better than nothing? We shall find out.

Cassandra is a wide column datastore, and is NoSQL of course, but the concepts are surprisingly similar. Let’s talk about the pieces and parts of Cassandra as I understand them.

We have columns which are pieces of data (of some type) that combined together make a row. This row (being a collection of columns) is identified by a MANDATORY primary key. These primary keys are made up of partitions. A collection of partitions is called a …. table.

Here are data types.

ASCII | BIGINT | BLOB | BOOLEAN | COUNTER | DATE | DECIMAL | DOUBLE | DURATION | FLOAT | INET | INT | SMALLINT | TEXT | TIME | TIMESTAMP | TIMEUUID | TINYINT | UUID | VARCHAR | VARINT

Also very important to understand….lest you think Cassandra will fit your relational database needs….

“Cassandra does not support relational data modeling intended for relational databases.”

Aka…. you can’t do cross partition joins and queries.

“In Cassandra, data modeling is query-driven.”

Everything we do in Cassandra with data is all about the query, how you are going to try to get the data back. Best to keep that in mind.

Now obviously there is a lot more to Cassandra, but let’s move on for now.

Installing Apache Cassandra

So usually I go to the trouble of installing my own cluster for a new-to-me piece of tech… but I’m going to skip on the install part. Usually I can learn a decent amount about the system I’m about to use by installing it. But in this case when the install instructions are….

apt-get install oracle-java8-set-default
apt-get install cassandra
service cassandra status

Apparently the file cassandra.yaml in the install holds all the configuration values. Reading around things it doesn’t seem like there is too much unusual about this stuff. cluster_name, storage_port, listen_address, data_file_directories …. blah blah blah. Pretty much standard stuff I’ve seen for most big data distributed cluster systems.

In lieu of going through the three steps of installing, let’s just use the official Docker file.

docker pull cassandra

And run it…

docker run -it cassandra:latest /bin/bash
// once in the container
>> cassandra -R
//output
INFO  [MigrationStage:1] 2020-12-10 02:30:14,409 ViewManager.java:137 - Not submitting build tasks for views in keyspace system_traces as storage service is not initialized
INFO  [MigrationStage:1] 2020-12-10 02:30:14,421 ColumnFamilyStore.java:427 - Initializing system_traces.events
INFO  [MigrationStage:1] 2020-12-10 02:30:14,436 ColumnFamilyStore.java:427 - Initializing system_traces.sessions
INFO  [MigrationStage:1] 2020-12-10 02:30:14,444 ViewManager.java:137 - Not submitting build tasks for views in keyspace system_distributed as storage service is not initialized
INFO  [MigrationStage:1] 2020-12-10 02:30:14,453 ColumnFamilyStore.java:427 - Initializing system_distributed.parent_repair_history
INFO  [MigrationStage:1] 2020-12-10 02:30:14,463 ColumnFamilyStore.java:427 - Initializing system_distributed.repair_history
INFO  [MigrationStage:1] 2020-12-10 02:30:14,473 ColumnFamilyStore.java:427 - Initializing system_distributed.view_build_status
INFO  [MigrationStage:1] 2020-12-10 02:30:14,478 ViewManager.java:137 - Not submitting build tasks for views in keyspace system_auth as storage service is not initialized
INFO  [MigrationStage:1] 2020-12-10 02:30:14,486 ColumnFamilyStore.java:427 - Initializing system_auth.resource_role_permissons_index
INFO  [MigrationStage:1] 2020-12-10 02:30:14,495 ColumnFamilyStore.java:427 - Initializing system_auth.role_members
INFO  [MigrationStage:1] 2020-12-10 02:30:14,505 ColumnFamilyStore.java:427 - Initializing system_auth.role_permissions
INFO  [MigrationStage:1] 2020-12-10 02:30:14,514 ColumnFamilyStore.java:427 - Initializing system_auth.roles
INFO  [main] 2020-12-10 02:30:14,544 StorageService.java:1492 - JOINING: Finish joining ring

Once this docker container was running I opened another shell and exec into it. Something like docker exec -it a3d1c3a50d49 /bin/bash

Trying out Apache Cassandra

So now if you are into your container running Cassandra all you have to do is drop into cqlsh

Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.9 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>

I’m having flashbacks to Hive….

Also, I am of course going to try out the Python client for Cassandra, and wanted to check the version installed in the official Docker image.

root@0ea5dfa836c4:/# python --version
Python 2.7.18
root@0ea5dfa836c4:/# python3 --version
bash: python3: command not found
root@0ea5dfa836c4:/#

CURSES CURSES. No Python3 installed? Come on man. Stinkers.

pip install cassandra-driver
bash: pip: command not found

Let me get this straight… you put python2 in your Dockerfile not python3 and you don’t put pip? Because that makes so much sense. Bunch of turkeys.

apt-get update
apt-get install python3.8 python3-pip

Success this time.

pip3 install cassandra-driver
Collecting cassandra-driver
  Downloading cassandra_driver-3.24.0-cp38-cp38-manylinux1_x86_64.whl (3.6 MB)
     |████████████████████████████████| 3.6 MB 1.4 MB/s 
Collecting six>=1.9
  Downloading six-1.15.0-py2.py3-none-any.whl (10 kB)
Collecting geomet<0.3,>=0.1
  Downloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Collecting click
  Downloading click-7.1.2-py2.py3-none-any.whl (82 kB)
     |████████████████████████████████| 82 kB 2.6 MB/s 
Installing collected packages: six, click, geomet, cassandra-driver
Successfully installed cassandra-driver-3.24.0 click-7.1.2 geomet-0.2.1.post1 six-1.15.0

Use python client for Cassandra to write data.

First pull down a csv file of data to use.

wget https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2018_Q3.zip
unzip Divvy_Trips_2018_Q3.zip

We are going to use the Divvy bike trips data… with the following data points.

['trip_id', 'start_time', 'end_time', 'bikeid', 'tripduration', 'from_station_id', 'from_station_name', 'to_station_id', 'to_station_name', 'usertype', 'gender', 'birthyear']

Apparently creating a Cassandra table is pretty much like what you would expect from Hive or SQL.

CREATE TABLE trip_data (
    trip_id INT PRIMARY KEY,
    start_time timestamp,
    end_time timestamp,
    bikeid int,
    tripduration decimal,
    from_station_id int,
    from_station_name text,
    to_station_id int,
    to_station_name text,
    usertype text,
    gender text,
    birthyear INT
);

Welp, first error.

cqlsh> CREATE TABLE trip_data (
   ...     trip_id INT PRIMARY KEY,
   ...     start_time timestamp,
   ...     end_time timestamp,
   ...     bikeid int,
   ...     tripduration decimal,
   ...     from_station_id int,
   ...     from_station_name text,
   ...     to_station_id int,
   ...     to_station_name text,
   ...     usertype text,
   ...     gender text,
   ...     birthyear INT
   ... ) ;
InvalidRequest: Error from server: code=2200 [Invalid query] message="No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename"
cqlsh>

[Invalid query] message=”No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename”

Well… that is interesting… nothing I read in the documentation said something about a keyspace. You know considering this is a basic requirement before you doing anything with Cassandra you think they might have mentioned it in the intro documentation. Found something here about it.

“A keyspace is the top-level database object that controls the replication for the object it contains at each datacenter in the cluster.”

Basically it appears to be what you would think of as a schema in a normal database… with some extra distribution properties that make sense for distributed system. Let’s try this again.

CREATE KEYSPACE trip WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};
CREATE TABLE trip.trip_data (
       trip_id INT PRIMARY KEY,
       start_time timestamp,
       end_time timestamp,
       bikeid int,
       tripduration decimal,
       from_station_id int,
       from_station_name text,
       to_station_id int,
       to_station_name text,
       usertype text,
       gender text,
       birthyear INT
    ) ;

Success that time. Now let’s write a little python script to insert our CSV file into this table.

from cassandra.cluster import Cluster
import csv

cluster = Cluster()
session = cluster.connect()

with open('Divvy_Trips_2018_Q3.csv') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader, None)
    for row in csv_reader:
        session.execute("""INSERT INTO trip.trip_data ("trip_id", "start_time", "end_time",
                               "bikeid", "tripduration", "from_station_id",
                               "from_station_name", "to_station_id", "to_station_name",
                               "usertype", "gender", "birthyear")
                            VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);""",
                        (int(row[0]),
                         row[1],
                         row[2],
                         int(row[3]),
                         float(row[4].replace(",", "")),
                         int(row[5]),
                         row[6],
                         int(row[7]),
                         row[8],
                         row[9],
                         row[10],
                         0
                         ))

Couple things to note. If you use a INSERT statement like this and don’t double quote (“”) your column names you specific it will fail. I have no idea why.

Also, you the data types have to match exactly what the table data types are or it will fail. You will notice me above casting to int, float, etc. See below.

Traceback (most recent call last):
  File "cassandra-test.py", line 11, in <module>
    session.execute("""INSERT INTO trip.trip_data ("trip_id", "start_time", "end_time",
  File "cassandra/cluster.py", line 2618, in cassandra.cluster.Session.execute
  File "cassandra/cluster.py", line 4877, in cassandra.cluster.ResponseFuture.result
cassandra.InvalidRequest: Error from server: code=2200 [Invalid query] message="Invalid STRING constant (86,168.0) for "tripduration" of type decimal"

Either way my data went in without error, let’s try some CQL.

cqlsh> SELECT * FROM trip.trip_data;

 trip_id  | bikeid | birthyear | end_time                        | from_station_id | from_station_name                   | gender | start_time                      | to_station_id | to_station_name                     | tripduration | usertype
----------+--------+-----------+---------------------------------+-----------------+-------------------------------------+--------+---------------------------------+---------------+-------------------------------------+--------------+------------
 20974116 |    696 |         0 | 2018-09-30 09:14:07.000000+0000 |              85 |               Michigan Ave & Oak St | Female | 2018-09-30 08:49:37.000000+0000 |           226 |            Racine Ave & Belmont Ave |       1470.0 | Subscriber

Interesting. I think it’s funny they call this a NoSQL system… I get it, no JOINS allowed… but still. All those cool NoSQL people apparently like something about SQL.

cqlsh> SELECT from_station_id, from_station_name, avg(tripduration) as avg FROM trip.trip_data GROUP BY from_station_id, from_station_name;
InvalidRequest: Error from server: code=2200 [Invalid query] message="Group by is currently only supported on the columns of the PRIMARY KEY, got from_station_id"

Well stinker, I missed that part of the documentation. Apparently we can only GROUP BY values that part of the primary key?? I can see with Cassandra that data modeling and extremely important, best know your data and how you will need to use it ahead of time! I guess it’s time to drop and re-create my table and insert the data again. Oh boy, can you imagine if you got this wrong in prod?

DROP TABLE trip.trip_data;
CREATE TABLE trip.trip_data (
       trip_id INT,
       start_time timestamp,
       end_time timestamp,
       bikeid int,
       tripduration decimal,
       from_station_id int,
       from_station_name text,
       to_station_id int,
       to_station_name text,
       usertype text,
       gender text,
       birthyear INT,
       PRIMARY KEY (trip_id, bikeid, from_station_id, from_station_name, tripduration)
    );

Time to run some queries again, this is where it starts to get wierd.

cqlsh> SELECT from_station_id, AVG(tripduration) FROM trip.trip_data GROUP BY from_station_id;
InvalidRequest: Error from server: code=2200 [Invalid query] message="Group by currently only support groups of columns following their declared order in the PRIMARY KEY"
cqlsh> SELECT trip_id, AVG(tripduration) FROM trip.trip_data GROUP BY trip_id;

 trip_id  | system.avg(tripduration)
----------+--------------------------
 20974116 |                   1470.0
 19637297 |                    187.0
 20866535 |                   1260.0
 19728664 |                    538.0
 19695043 |                    909.0
 19852719 |                    205.0
 19787201 |                    653.0
 19686966 |                    483.0
 20954029 |                   1062.0
 20890420 |                   1196.0
 20797487 |                    885.0
 20446854 |                   1642.0
 19490354 |                    396.0
 19744312 |                    805.0
 19978183 |                    354.0
 20521085 |                    978.0
 19605702 |                    514.0
 20782549 |                    396.0
 20972421 |                    235.0
 19983797 |                    875.0
 20804708 |                   1390.0

“Group by currently only support groups of columns following their declared order in the PRIMARY KEY

This seems rather amazing to me. I can’t image. I have to think that production Cassandra users either spend a year figuring out exactly how to lay out their data, or apparently the one run one query on their data set? Your GROUP BY columns must follow the order of creation in your table? Hmmm…

So I’m guessing I’m either going to have to use a materialized view or possibly a secondary index?

CREATE INDEX stationidx ON trip.trip_data (from_station_id);
cqlsh> SELECT from_station_id, AVG(tripduration) FROM trip.trip_data GROUP BY from_station_id;
InvalidRequest: Error from server: code=2200 [Invalid query] message="Group by currently only support groups of columns following their declared order in the PRIMARY KEY"

Ok, no go on the secondary index, let’s try the materialized view.

cqlsh> CREATE MATERIALIZED VIEW trip.trip_duration_avg AS
   ...        SELECT from_station_id, from_station_name, tripduration FROM trip.trip_data
   ...        PRIMARY KEY (from_station_id, from_station_name, tripduration);
InvalidRequest: Error from server: code=2200 [Invalid query] message="Primary key column 'from_station_id' is required to be filtered by 'IS NOT NULL'"

Man, this system is trippy.

cqlsh> CREATE MATERIALIZED VIEW trip.trip_duration_avg AS        SELECT from_station_id, from_station_name, tripduration FROM trip.trip_data WHERE from_station_id IS NOT NULL AND from_station_name IS NOT NULL AND tripduration IS NOT NULL        PRIMARY KEY (from_station_id, from_station_name, tripduration);
InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot create Materialized View trip_duration_avg without primary key columns from base trip_data (bikeid,trip_id)"

Oh save me.

cqlsh> CREATE MATERIALIZED VIEW trip.trip_duration_avg AS        SELECT from_station_id, from_station_name, tripduration FROM trip.trip_data WHERE from_station_id IS NOT NULL AND from_station_name IS NOT NULL AND tripduration IS NOT NULL AND bikeid IS NOT NULL AND trip_id IS NOT NULL       PRIMARY KEY (from_station_id, from_station_name, tripduration, bikeid, trip_id);

Warnings :
Materialized views are experimental and are not recommended for production use.

cqlsh>

Well that’s nice to know, experimental indeed. Never would have guessed. Back to our query I guess.

cqlsh> SELECT from_station_id, from_station_name, avg(tripduration) as avg FROM trip.trip_duration_avg GROUP BY from_station_id, from_station_name;

 from_station_id | from_station_name                           | avg
-----------------+---------------------------------------------+----------
              23 |                     Orleans St & Elm St (*) |   1167.0
             114 |                Sheffield Ave & Waveland Ave |   1783.1
              53 |                         Wells St & Huron St |   1008.2
             110 |                       Dearborn St & Erie St |   1243.7
              91 |                Clinton St & Washington Blvd |    744.4
             128 |                     Damen Ave & Chicago Ave |   1109.9
             251 |                  Clarendon Ave & Leland Ave |   1693.2
             310 |                   Damen Ave & Charleston St |   1156.7
             247 |                          Shore Dr & 55th St |   1970.7
             214 |                       Damen Ave & Grand Ave |   1080.1
             429 |                 Cottage Grove Ave & 67th St |   2289.9
             117 |                    Wilton Ave & Belmont Ave |   1131.7
             547 |                   Ashland Ave & Pershing Rd |   1981.6
             144 |                   Larrabee St & Webster Ave |   1301.7
             567 |                            May St & 69th St |  25933.9
             120 |            Wentworth Ave & Cermak Rd (Temp) |   2322.4
             504 |                Campbell Ave & Fullerton Ave |   1258.1
             219 |                     Damen Ave & Cortland St |   1901.8
             475 |                Washtenaw Ave & Lawrence Ave |   1382.5
             140 |                 Dearborn Pkwy & Delaware Pl |   1187.5
             308 |                      Seeley Ave & Roscoe St |   1084.9
             483 |               Avondale Ave & Irving Park Rd |   1603.2

Talk about picky…. I guess that’s what happens when you call something NoSQL but then try to replicate SQL.

Cassandra musings.

You know, even after all that I think I’m a fan of Cassandra. From what I’ve read this system is build to work across multiple data centers across more petabytes of data then you can count. What I could appreciate about the system was the appearance of not being complicated to setup and use. From what I’ve read and seen it seems to be a great choice for a system that can handle ingesting tons of semi-structured data, and then quickly running some analytics on that data.

I’ve got to say though, it appears you should spend half your life thinking about the data, and how it will be queried before hand. It’s seems very rigid in its application given the above errors where I learned table creation and primary keys are apparently incredibly important. I’m sure there are great reasons to use Cassandra when the size and scale call for it, but I’m not quite sure why I would pick Cassandra over Hive. I get the whole NoSQL thing, but from all appearances interacting with CQL…. it makes you want to treat it like normal SQL. That’s what is confusing for me.

If anyone has experience using Cassandra and how it differs from Hive, that input would be great appreciated.

1 reply
  1. David Stevenson
    David Stevenson says:

    I found this after the DataStax folks retweeted it.
    My confession is I never really liked RDBMS/SQL much because I started out as as programmer and was working with objects as in OO, and so I never thought about data in the relational way.

    Recently I looked into Cassandra and found it to be a nice fit for my default way of thinking, but yes I certainly found it initially confusing with many similar seeming terms in Cassandra being different in semantics to SQL.

    One of the key things is the “primary key” – with Cassandra being a distributed database too as well as NoSQL, the way it works is different from SQL. A Cassandra Primary Key consists of a “partition key” part as well as optional clustering columns, and the order they are declared in determined which type the columns are.
    I went through the DataStax academy online training courses, and understanding that and why it is important in Cassandra was one of the key takeaways. Being a distributed database, to keep queries fast it’s important that data that needs to be accessed at the same time is stored together on the same cluster node. The partition key part of the primary key is what specified that storage.

    Features like aggregation, materialized views and secondary indexes are not really the bread and butter of Cassandra – they can be useful in certain situations, but if one wants to do flexible queries with aggregations etc as a primary use case, then I guess sticking with SQL makes sense.

    I think Cassandra is a fine choice as what Martin Fowler calls “aggregate oriented” databases (this is my application, domain-driven design perspective), and when large scale distributed data is involved, and required queries can be considered ahead of time, as you said.

    I enjoyed this post!

Comments are closed.