Building the Data Lake

This post is part of a Series on the Lambda Architecture. It was written in collaboration with Boxever and first posted on Medium.

Overview

As we discussed in our previous posts, we had a single monolithic Cassandra database holding tens of billions of data points across terabytes of data. As this database was used for all our REST APIs including streaming ETL there were certain latency requirements it had to satisfy. Performing any kind of analytical or batch workload against the database meant that the cluster latency went through the roof and we were effectively running a DDOS attack on ourselves. We needed to move our data to a storage system more suitable to batch and analytical workloads. As we covered in the our post of Technology, Storage and Data Format Choices, S3 was the storage system we decided to go with. However, getting the data out of Cassandra to S3 proved challenging. In this post, we cover the options available to us and how we solved it.

How did we get the data out?

There are a number of ways you can solve this problem which we discuss briefly.

Astyanax All Rows Reader

Use the All Rows Reader from Astyanax to read all the records for a given table from the database. Think of this like a streaming row cursor (iterator) in as efficient a manner as Cassandra can manage (see details here). A critical drawback of this approach for achieving any sort of scale with our use case is that if you must stop reading every record, or an exception occurs, then you must start the process all over again from the very beginning. If you add, remove or repair a node during this operation you have to start again. One must also throttle reading to avoid affecting the cluster latency. In our case, this resulted in an ETA of a few weeks if everything worked perfectly and no restarts were required.

Run Hadoop or Spark as a Sidecar

Configure a distributed processing framework like Hadoop or Spark to run alongside the Cassandra process - essentially as a side car service for accessing the data (checkout the datastax overview of this approach here). Apart from being a difficult architecture to setup, it is next to impossible to guarantee that you will not impact your serving latencies and it is quite risky from a resource contention perspective. Hence, it is typically recommended to set up another replica cluster with this architecture which we discuss next.

Separate Data Center with Sidecar

Setup a separate analytics data center which is a replica of the datacenter you’re trying to analyse. This allows you to perform heavy queries against the separate data center without impacting the primary serving database. If you have a large Cassandra cluster - this can be a very expensive option! Even when you achieve this setup you still need to get the data out of Cassandra in a timely manner. As we mention in the first option, the all rows read is not a good strategy which means you need to use something like the Spark sidecar setup that is recommended in above. This is still slow and extremely expensive to run. If you are only accessing a portion of your data then this may make sense if you can swallow the costs.

Read the Raw SSTables

Read the raw Cassandra database file - i.e. the SSTable files. First, you backup your SSTables to S3, HDFS or some other file based datastore (which you should be doing anyway for disaster recovery and is trivial due to the immutable nature of SSTables which means you can simply rsync the new or removed tables). You then read the SSTables contained in your backups directly using a distributed processing job. This is non trivial though as you must understand the SSTable storage format and how the Cassandra engine computes the “current” state of the entity from the SSTable change log. However, this is the only really feasible and long term option for achieving our goals. It provides us with

Various other companies have hit this exact same problem before and decided to solve it the same way. Netflix and FullContact are the two we took the most inspiration from. The Netflix implementation, known as Aegisthus, did not work well for our use case as it did not split large compressed SSTables. We had individual SSTable files well over 100GB which meant that these would be processed single threaded and cause severe long tail and unbalanced processing. FullContact had solved the splitting of the compressed SSTables via a custom Hadoop InputFormat. They have an excellent blog post about this work here which I suggest anyone with a curious mind on the matter reads. However, it was not as complete a solution as Aegisthus which included the reducer logic and a generic JSON output format. So we utilised work from both and combined it into a Spark Job. We had some conflicts with the versions of hadoop which were easily solved by bumping the FullContact library to use Hadoop 2.2.0 (we forked the patch here). We also created a spark job for the distributed indexing of the SSTables required by the FullContact input format.

We then ran this job on a large EMR cluster running spot instances to process all of our behavioural data in a couple of hours! Given we were originally estimating weeks to months to get our data out this was a huge win. It also meant that it was extremely easy for us to rebuild or redo should anything go wrong.

Data Format

The output format from Aegisthus is a generic JSON format. Each record is output as a JSON record with each column represented as a key-value pair within the record where the key is the column name and the value is textual representation of the column value. However, the final output format we required was the v2 Domain Models used in our platform. Therefore, we needed to convert from this raw representation of a Cassandra row to our Domain Models. If one was reading directly from Cassandra using the Astyanax or DataStax Driver this would be performed by the Data Access Object (DAO) converters. However, we were not reading directly against Cassandra and did not want to have to duplicate all of our DAO logic as it would be time consuming, very error prone and a huge maintenance burden. As an elegant solution to this particular problem, we wrote an implementation of the the Resultset interface used by the driver library. In our case this was Astyanax due to the version of Cassandra were working with and so we wrote an implementation of com.netflix.astyanax.model.ColumnList. The implementation worked by accepting the raw Cassandra data structure output by Aegisthus in the construction of the custom ColumnList instance. It implemented all of the interface methods used to read the various column values based on their data types from this input data structure. This enabled us to convert from the Cassandra data model to our v2 Domain Model using our existing DAO converters which already worked against the abstract ColumnList interface.

As previously discussed in Querying the Data Lake, to make this data truly useful from an ad hoc analytics perspective one must transform the JSON data into a columnar format such as Parquet or ORC. A follow up blog post to this series will detail how we achieved millisecond query latencies on datasets with billions of records using Parquet, S3 and Presto.

Additional Cassandra Observations

After we had extracted and processed all of the data from the SSTables which was of interest, we noticed some mismatches in the estimated number of SSTable rows and the actual number of records once processed. In the majority of our largest SSTables, the Cassandra row count estimate was roughly twice the number of actual records. It turns out that most of our largest column families had not been compacted correctly and due to the manner in which Cassandra stores writes and the fact that we updated a column on each row once due to legacy reasons (i.e. insert and update is two writes). This explains the mismatch in numbers we saw and highlighted that we were storing a huge amount of totally redundant information in our primary cluster.

It is worth noting that the version of the Cassandra cluster you are running can have a big impact here. Both the Aegisthus and the Full Contact libraries have not been updated to work with the latest v3.x of Cassandra and given the Cassandra storage engine has changed considerably with 3.x you can expect to have some trouble upgrading. In fact, this is an effort we are evaluating at the moment. Hopefully, we can contribute back to the projects with a solution when we implement it.

Conclusion

While this post in our series may appear to be trivial and short relative to the other posts, it should not be underestimated just how important solving this problem was for our ability to re architect our platform. There was an immediate freedom in having all of our data out of our single monolithic Cassandra cluster and onto the more batch and analytics friendly storage medium of S3. We could finally obtain exact counts of our data. In addition, we automatically provided our data science team with a powerful new platform where they could easily run analytics and machine learning models at scale via Spark, Zeppelin and EMR directly against S3.

Next in the series - The Changelog