DataStax Developer Blog

Atomic batches in Cassandra 1.2

By Jonathan Ellis -  October 12, 2012 | 11 Comments

In Cassandra, a batch allows the client to group related updates into a single statement. If some of the replicas for the batch fail mid-operation, the coordinator will hint those rows automatically.

But there is one failure scenario that the classic batch design does not address: if the coordinator itself fails mid-batch, you could end up with partially applied batches.

In the past Cassandra has relied on the client to deal with this by retrying the batch to a different coordinator. This is usually adequate since writes in Cassandra, including batches, are idempotent — that is, performing the same update multiple times is harmless.

But, if the client and coordinator fail at the same time — say, because the client is an app server in the same datacenter, and suffers a power failure at the same time as the coordinator — then there is no way to recover other than manually crawling through your records and reconciling inconsistencies.

Some particularly sophisticated clients have implemented a client-side commitlog to handle this scenario, but this is a responsibility that logically belongs on the server. That is what atomic batches bring in Cassandra 1.2.

Using atomic batches

Batches are atomic by default starting in 1.2. Unfortunately, the price for atomicity is about a 30% hit to performance compared to the old non-atomic batches. So, we also provide BEGIN UNLOGGED BATCH for when performance is more important than atomicity guarantees.

1.2 also introduces a separate BEGIN COUNTER BATCH for batched counter updates. Unlike other writes, counter updates are not idempotent, so replaying them automatically from the batchlog is not safe. Counter batches are thus strictly for improved performance when updating multiple counters in the same partition.

(Note that we mean “atomic” in the database sense that if any part of the batch succeeds, all of it will. No other guarantees are implied; in particular, there is no isolation; other clients will be able to read the first updated rows from the batch, while others are in progress. However, updates within a single row are isolated.)

Under the hood

Atomic batches use a new system table, batchlog, defined as follows:


CREATE TABLE batchlog (
    id uuid PRIMARY KEY,
    written_at timestamp,
    data blob
);

When an atomic batch is written, we first write the serialized batch to the batchlog as the data blob. After the rows in the batch have been successfully written (or hinted), we remove the batchlog entry. (There are thus some similarities to how Megastore uses a Bigtable ColumnFamily as a transaction log, but atomic batches are much, much more performant than Megastore writes.)

The batchlog table is node-local, along with the rest of the system keyspace. Instead of relying on normal Cassandra replication, StorageProxy special-cases the batchlog. This lets us make two big improvements over a naively replicated approach.

First, we can dynamically adjust behavior depending on the cluster size and arrangement. Cassandra prefers to perform batchlog writes to two different replicas in the same datacenter as the coordinator. This is not for durability — since we only need to record the batch until it’s successfully written — so much as fault tolerance: if one batchlog replica fails, we don’t need to wait for it to timeout before retrying to another. But, if only one replica is available, Cassandra will work with that without requiring an operator to manually adjust replication parameters.

Second, we can make each batchlog replica responsible for replaying batches that didn’t finish in a timely fashion. This saves the complexity of requiring coordination between coordinator and replicas, and having to “failover” replay responsibility if the coordinator is itself replaced. (Again, since writes are idempotent, having multiple replicas of the batch replayed occasionally is fine.)

We are also able to make some performance optimizations based on our knowledge of the batchlog’s function. For instance, since each replica has local responsibility to replay failed batches, we don’t need to worry about preserving tombstones on delete. So in the normal case when a batch is written to and removed from the batchlog in quick succession, we don’t need to write anything to disk on memtable flush.

Availability

Atomic batches are feature complete in Cassandra 1.2beta1, which is available for download on the Apache site; we’re projecting a final release by the end of the year.



Comments

  1. Maciej Miklas says:

    Great post!

    I have one question to this part:
    “atomic” in the database sense that if any part of the batch succeeds, all of it will.

    If I understood it right, first you persist batch data in System CF and execute inserts. If some of them fail, async process will retry operation based on serialized batch data.

    Now it is possible that this coordinator node(s) totally fails, and serialized batch is gone. In this case operation cannot succeed as whole and persisted data stays in inconsistent state. Based on this observation the statement “if any part of the batch succeeds, all of it will” is only correct if we assume that coordinator node(s) does not crash until batch data is successfully inserted.

  2. Stefan Fleiter says:

    The coordinator node can crash without the risk of half applied atomic batches:

    Cassandra does perform batchlog writes to two different replicas in the same datacenter as the coordinator,
    this is done before starting the batch operations itself.

    Each batchlog replica is responsible for replaying batches that didn’t finish in a timely fashion.
    This is possible because Cassandra operations can be repeated without risk of loosing data.

  3. Maciej Miklas says:

    It is possible that all batchlog replicas crash before retry operation is successfully fished. This will leave data in inconsistent state.

    For example: batch inserts data into 5 rows.
    1) batch log is stored on batchlog replicas
    2) two first rows are inserted
    3) client gets response – all ok (or warning)
    4) nodes responsible for remaining 3 rows are dead
    5) batch log replicas crash and batchlog is gone
    6) 3 remaining rows will never be inserted

    ACID system will try to insert all data, and in case of error execute full rollback – this is “all or none”

    Cassandra will try to insert data, and in error case it will not execute rollback, but start async retry operation – but there is not guarantee, that this process will be successful.

  4. Spud says:

    Sorry, but this is rather lame. This wouldn’t help our processing because our clients on crash recovery need to know if a previous update is done or not. It’s yet more confusion. (Does a quorum read wait for this batchlog to be played?). Ugh, I’d rather implement it all myself on my client so it’s done _right_.

    I think you guys should instead fix repair – have it use the log instead of sstables – to make performance stable and predictable. (Not sudden load spikes every week). We are on the fence about using cassandra because performance of the system seems impossible to predict, due to weekly repair spikes.

  5. Karl says:

    “if the coordinator itself fails mid-batch, you could end up with partially applied batches.”

    So you’ve changed that to, “If some network timeouts happen – making the coordinator give up – and the one replica who got the log crashes after partial apply, you’ll end up with partially applied batches. At some asynchronous point in the future which you don’t really know about. And you’ll take a 30% hit on writes. And we don’t use paxos or know what A(tomic) means.” Ok then.

  6. Jonathan Ellis says:

    Maciej, Karl: if the coordinator AND both batchlog replicas all crash at precisely the right time — not the same time; first the coordinator would have to crash after writing the batchlog and beginning to write to replicas, and then the batchlog replicas would have to crash *with catastrophic data loss* (batchlog writes are commitlog’d and durable) in the next few minutes before replay — it’s technically possible to have a partial batch write.

    But this is a specific example of “if you lose enough machines/disks you will have data in inconsistent state,” which is true for RDBMS as well. For any reasonable application I can think of, requiring three machines to fail with data loss within ten minutes is an adequately high bar.

    Spud: We’re working on repair as well. You can already repair “bite sized” parts of data (CASSANDRA-3912) to smooth out the impact; for 1.3 we’re planning to make this automatic (CASSANDRA-2699).

  7. Naren says:

    Nice feature but 30% performance hit by default is not good. I would prefer to have this as non-default behavior cause a number of applications out their are either designed around non-atomicity or have their own implementations.

  8. Jonathan Ellis says:

    Most (all?) production applications right now are built against Thrift or maybe CQL2. The default is only changed (from 1.1) in CQL3.

  9. Karl says:

    I’m still not buying the ’3 machines have to catastrophically crash with data loss’ here. First, it sounds like only one batchlog replica is required (say network congestion prevents talking to a second one). Secondly, will a coordinator never give up (timeout) on a batch if it can’t talk to the ‘real’ replicas? Basically, the docs on this feature could be improved for us skeptics.

    I could see how you could make it so 3 have to crash – you’re basically doing two phase commit with ‘assume commit’ and relying on timestamps for ‘eventual consistency’.

    Unfortunately, that would not help our transaction processing use case. I would much rather do 99% of our write volume unlogged, and then do an atomic test-and-set write of a certain row to commit it – and update counters – or abort. If cassandra had a built in paxos atomic immediately consistent write feature for the cases when we need it, we would buy a support contract yesterday.

  10. Kurt Schurenberg says:

    Are batches atomic only in CQL? Or are they also atomic using Thrift (or Hector)?

  11. Sireesha says:

    What kind of exception or error will be thrown by cassandra when the batch delete fails for some reason?

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>