DataStax Developer Blog

Advanced Time Series with Cassandra

By Tyler Hobbs -  March 28, 2012 | 14 Comments

Cassandra is an excellent fit for time series data, and it’s widely used for storing many types of data that follow the time series pattern: performance metrics, fleet tracking, sensor data, logs, financial data (pricing and ratings histories), user activity, and so on.

A great introduction to this topic is Kelley Reynolds’ Basic Time Series with Cassandra. If you haven’t read that yet, I highly recommend starting with it. This post builds on that material, covering a few more details, corner cases, and
advanced techniques.

Indexes vs Materialized Views

When working with time series data, one of two strategies is typically employed: either the column values contain row keys pointing to a separate column family which contains the actual data for events, or the complete set of data for each event is stored in the timeline itself. The latter strategy can be implemented by serializing the entire event into a single column value or by using composite column names of the form <timestamp>:<event_field>.

With the first strategy, which is similar to building an index, you first fetch a set of row keys from a timeline and then multiget the matching data rows from a separate column family. This approach is appealing to many at first because it is more normalized; it allows for easy updates of events, doesn’t require you to repeat the same data in multiple timelines, and lets you easily add built-in secondary indexes to your main data column family. However, the second step of the data fetching process, the multiget, is fairly expensive and slow. It requires querying many nodes where each node will need to perform many disk seeks to fetch the rows if they aren’t well cached. This approach will not scale well with large data sets.

Column family diagram for the index pattern

The top column family contains only a timeline index; the bottom, the actual data for the events.

The second strategy, which resembles maintaining a materialized view, provides much more efficient reads. Fetching a time slice of events only requires reading a contiguous portion of a row on one set of replicas. If the same event is tracked in multiple timelines, it’s okay to denormalize and store all of the event data in each of those timelines. One of the main principles that Cassandra was built on is that disk space is very cheap resource; minimizing disk seeks at the cost of higher space consumption is a good tradeoff. Unless the data for each event is very large, I always prefer this strategy over the index strategy.

Diagram of the "materialized view" pattern

All event data is serialized as JSON in the column values.

Reversed Column Comparators

Since Cassandra 0.8, column comparators can easily be reversed. This means that if you’re using timestamps or TimeUUIDs as column names, you can choose to have them sorted in reverse chronological order.

If the majority of your queries ask for the N most recent events in a timeline or N events immediately before a point in time, using a reversed comparator will give you a small performance boost over always setting reversed=True when fetching row slices from the timeline.

Timeline Starting Points

To support queries that ask for all events before a given time, your application usually needs to know when the timeline was first started. Otherwise, if you aren’t guarenteed to have events in every bucket, you cannot just fetch buckets further and further back in time until you get back an empty row; there’s no way to distinguish between a bucket that just happens to contain no events and one that falls before the timeline even began.

To prevent uneccessary searching through empty rows, we can keep track of when the earliest event was inserted for a given timeline using a metadata row. When an application writes to a timeline for the first time after starting up, it can read the metadata row, find out the current earliest timestamp, and write a new timestamp if it ever inserts an earlier event. To avoid race conditions, add a new column to the metadata row each time a new earliest event is inserted. I suggest using TimeUUIDs with a timestamp matching the event’s timestamp for the column name so that the earliest timestamp will always be at the beginning of the metadata row.

After reading only the first column from the metadata row (either on startup or the first time it’s required, refreshing periodically), the application can know exactly how far in the past it should look for events in a given timeline.

High Throughput Timelines

Each row in a timeline will be handled by a single set of replicas, so they may become hotspots while the row holding the current time bucket falls in their range. It’s not very common, but occasionally a single timeline may grow at such a rate that a single node cannot easily handle it. This may happen if tens of thousands of events are being inserted per second or at a lower rate if the column values are large. Sometimes, by reducing the size of the time bucket enough, a single set of replicas will only have to ingest writes for a short enough period of time that the throughput is sustainable, but this isn’t always a feasible option.

In order to spread the write load among more nodes in the cluster, we can split each time bucket into multiple rows. We can use row keys of the form <timeline>:<bucket>:<partition>, where partition is a number between 1 and the number of rows we want to split the bucket across. When writing, clients should append new events to each of the partitions in round robin fashion so that all partitions grow at a similar rate. When reading, clients should fetch slices from all of the partition rows for the time bucket they are interested in and merge the results client-side, similar to the merge step of merge-sort.

If some timelines require splitting while others do not, or if you need to be able to adjust the number of rows a timeline is split across periodically, I suggest storing info about the splits in a metadata row for the timeline in a separate column family (see the notes at the end of this post). The metadata row might have one column for each time the splitting factor is adjusted, something like {<timestamp>: <splitting_factor>}, where timestamp should align with the beginning of a time bucket after which clients should use the new splitting factor. When reading a time slice, clients can know how many partition rows to ask for during a given range of time based on this metadata.

A diagram of the high throughput timeline layout

The "jbellis" timeline has increased its splitting factor over time; it currently spans three rows for each time bucket.

Variable Time Bucket Sizes

For some applications, the rate of events for different timelines may differ drastically. If some timelines have an incoming event rate that is 100x or 1000x higher than other timelines, you may want to use a different time bucket size for different timelines to prevent extremely wide rows for the busy timelines or a very sparse set of rows for the slow timelines. In other cases, a single timeline may increase or decrease its rate of events over time; eventually, this timeline may need to change its bucket size to keep rows from growing too wide or too sparse.

Similar to the timeline metadata suggestion for high throughput timelines (above), we can track time bucket sizes and their changes for individual timelines with a metadata row. Use a column of the form {<timestamp>: <bucket_size>}, where timestamp aligns with the start of a time bucket, and bucket_size is the bucket size to use after that point in time, measured in a number of seconds. When reading a time slice of events, calculate the appropriate set of row keys based on the bucket size during that time period.

A diagram of a timeline with changing time bucket sizes

At time 1332959000, the "jbellis" timeline switched from using 1000 second time buckets to 10 second buckets.

Notes on Timeline Metadata

When using timeline metadata for high throughput timelines or variable bucket size timelines, the metadata rows should typically be stored in a separate column family to allow for cache tuning. I suggest using a fair amount of key cache on the metadata column family if it will be queried frequently.

The timeline metadata should generally be written by a process external to the application to avoid race conditions, unless the application operates in such a fashion that this isn’t a concern. The application can read the metadata row on startup or on demand for a particular timeline; if the application is long lived, it should periodically poll the metadata row for updates. If this is done, a new splitting factor or bucket size can safely be set to start with a new time bucket that begins shortly in the future; the application processes should see the updated metadata in advance, before the new bucket begins, allowing them to change their behavior right on time.



Comments

  1. Carlos says:

    Will composite keys be efficient on large data sets in Cassandra 1.1?

    E.g.

    CREATE TABLE actions(
    user_id varchar,
    time_uuid uuid,

    PRIMARY KEY (user_id, time_uuid)
    );

  2. Tyler Hobbs Tyler Hobbs says:

    Carlos, I’m not sure what you mean. There’s no reason that I can think of that would make composites inefficient now or in Cassandra 1.1. If you are using CQL, what you’re describing is only possible in CQL 3.0, which you can choose to use in Cassandra 1.1.

    Can you be more specific with your question?

  3. Carlos says:

    I was reading “Schema in Cassandra 1.1″. In the section “Clustering, composite keys, and more”, it describes defining CFs with composite primary keys. Will composite primary keys scale well with large data sets for time series?

  4. Tyler Hobbs Tyler Hobbs says:

    Yes, they shouldn’t have any problems. The first component of the composite will be used as the row key; the remaining components become part of a composite column name. Queries for a slice of these will only read a portion of a single row, so it will be very efficient. You should have no problems scaling it.

  5. Howdy,

    Can I ask a data model question here?

    We have a book table with 20 columns, 300 million rows.

    create table book(
    book_id,
    isbn,
    price,
    author,
    titile,
    col_n1,
    col_n2,

    col_nm,
    );

    Data usage:

    We need to query data by each column, do pagination as below,

    select * from book where isbn < "XYZ" order by ISBN descending;
    select * from book where price < 992 order by price descending;
    select * from book where col_n1 < 992 order by col_n1 descending;
    select * from book where col_n2 < 992 order by col_n2 descending;

    select * from book where col_nm < 992 order by col_nm descending;

    We update book table in a high rate, at about 100 millions updates a day.

    If we choose Materialized Views approach, we have to update 20 column(s) in each Materialized View
    column family, for each base row update.
    Will the Cassandra write performance acceptable?

    Redis recommend building an index for the query on each column, that is your 1st strategy:
    http://redis.io/topics/data-types-intro
    (see section [ Pushing IDs instead of the actual data in Redis lists ]

    Should we just normalize the data, create base book table with book_id as primary key, and then
    build 20 index column family(s), use wide row column slicing approach, with index column values as column name and book_id as value?
    This way, we only need to update fewer column family that column value changed, but not all 20 Materialized Views CF(s).

    What do you think of it?

    Thanks,
    Charlie | DBA developer

  6. Bhaskar says:

    Can the 2 approaches, i.e. high-throughput timelines and varying buckt-size , be somehow combined , or used together.

    Or are they orthogonal, in that they achieve the same goal, to avoid hot-spots by distributing the data across nodes, but go about in different ways of achieving that goal.

    Secondly If the entire data is stored i.e. Materialized View approach, doesn’t that make the data difficult to search on, on account of not being able to create secondary index. i.e. How will you index some of the keys that are part of the data ?

  7. Tyler Hobbs Tyler Hobbs says:

    Those are great questions!

    The two approaches can definitely be used in conjunction; they are not mutually exclusive. They have different goals: the variable bucket size approach helps to control the total width of rows, while the high-throughput approach minimizes the hotspot effect by distributing each time bucket in the timeline across multiple nodes. If you have concerns about both of these, you can combine the two approaches.

    Regarding the materialized view approach, you are correct, you can’t use the built-in secondary indexes directly on this data. Typically, what you do instead is create a separate materialized view column family for each type of query that you need to support on that data and denormalize so that the data exists separately in each of those column families. So, if you need to be able to query by time and also query by some second attribute, you will use one column family for the timelines and a second column family that will support queries based on the second attribute efficiently.

  8. Redis4You says:

    @Charlie
    Redis is different planet. It pay no price for the disk I/O lookups.
    With Cassandra you should de-normalize, in order to do less reads as possible, without having too long rows.

  9. rikAtee says:

    In the materialized table it appears you are using super columns. Is this correct or am I misinterpreting the diagram?

    I understand super columns will be replaced ultimately with composites.

    If my interpretation of the diagram is correct, how will using composite types change the materialized table?

    will it result in something like this?

    timeline_ids = [32232,45545,76566]
    columns_names = ['foo', 'bar', 'baz']
    columns = [(id,name) for id in timeline_ids for name in columns_names]

    column_family_name.get(user, columns=columns)

    Is this the most appropriate means?

  10. Tyler Hobbs Tyler Hobbs says:

    @rikAtee I am not using super columns in the materialized view example, just storing a JSON blob. If you need to be able to update individual fields in those columns frequently *and* it’s not easy or efficient to overwrite the entire thing, you could use either supercolumns (not recommended) or composites (recommended), especially through CQL 3.

    For the composites-with-cql3 approach, I suggest checking out some of the recent blog posts on CQL 3: http://www.datastax.com/dev/blog/whats-new-in-cql-3-0, http://www.datastax.com/dev/blog/thrift-to-cql3, http://www.datastax.com/dev/blog/cql3_collections, and http://www.datastax.com/dev/blog/cql3-for-cassandra-experts.

    Whatever you end up doing, I don’t suggest trying to fetch a lot of columns by name, like your example code. Always try to use slices if you’re fetching a lot of columns.

  11. Scott says:

    We are planning a major time series database with Cassandra and wanted to thank you for this article. We made a few modifications, but one thing I would note is that using JSON for your data will take up a lot of space. There are alternatives out there but I won’t name them, a simple Google search should suffice though.

  12. hardik says:

    while using counter column family we can maually update values of counter say c1, c2 using cli

    how to achieve this using hector client,i want counter column to accept only 20(e.g.) records and than after new column c2 (with 20 rec) and than c3 and so on

    select * from timecontentcounter;

    key | column1 | value
    ————+———+——-
    2013053015 | c1 | 20
    2013053015 | c2 | 20
    In above example new column c3 should be generated to accept counter increments upto 20 records ,hector solution is needed

  13. Subodh Nijsure says:

    I am trying to store data with following schema:

    CREATE TABLE temp_humidity_data (
    asset_id text,
    date text,
    event_time timestamp,
    temprature int,
    humidity int,
    PRIMARY KEY((asset_id, date),event_time)
    )

    I have followed datastax article ‘Getting Started with Time Series Modeling’ – http://planetcassandra.org/blog/post/getting-started-with-time-series-data-modeling/

    however with this data model one thing that is not working is query that returns me data between two dates. How do I do that?

    If I do this:

    select * from temp_humidity_data
    where asset_id=’1234′ AND date >= ’2010-04-02′ AND date <= '2011-04-03';

    It gives me following error:

    code=2200 [Invalid query] message="Only EQ and IN relation are supported on the partition key (unless you use the token() function)"

    In understand there is a way to do IN operator but I don't want to put all those dates in a 'IN' operator. Is there a way to query when using the above table definition data between two dates?

    1. Tyler Hobbs Tyler Hobbs says:

      Subodh,

      With that schema, you will need to put the dates (and asset_id) into an IN clause or issue a separate query per day. (If you’re using one of the Datastax drivers, I suggest doing a separate asynchronous query for each partition, as this will put less strain on the coordinator node and still achieve a high level of concurrency.)

      If the number of events you have per-asset per-day is low (say, less than 10k), you may want to consider using a week or month instead of a single day for the partition key. If you do this, you will have fewer partitions to query. Just be careful not to let partitions grow too large. I would try to keep them under 100k rows.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>