DataStax Developer Blog

Improved Hadoop output in Cassandra 1.1

By Brandon Williams -  May 1, 2012 | 0 Comments

You may recall that in Cassandra 0.8, a new bulk loading system was created, that allowed a user to generate SSTables locally, and then stream them into the cluster. This created a great way to get data in, while not straining the cluster heavily, as doing the writes to the cluster itself would.

In reply to that article, someone asked if there was a Hadoop OutputFormat for it, which makes a lot of sense; it’s a natural fit. One did not yet exist at the time, however, and turned out to be easier said than done, for quite a few reasons:

  • The bulk loader was what is known as a ‘fat client’; that is a node that directly participates in the cluster, but does not provide storage
  • Due to being a fat client, the loader could not run on existing nodes, a very common (and desirable) case for Hadoop integration
  • Streaming in Cassandra was not yet well-equipped to handle this type of operation

Since Cassandra 1.0 was released just a couple months later, we couldn’t cram all these changes in since they were so far-reaching, but now that Cassandra 1.1 has been released, we finally have our shiny new BulkOutputFormat. Switching your old ColumnFamilyOutputFormat jobs to it is very simple:


job.setOutputFormat(BulkOutputFormat.class)

Or, if you’re a Pig user, then you can override the OutputFormat with an environment variable. You’ll also need to set an initial output address so your jobs know of a Cassandra instance to contact to get things rolling.

So, what does all this mean, and how does it work? If you have an existing Hadoop installation and want to export the data in it to Cassandra (co-located or otherwise), now you can do it more efficiently, and without putting much strain on the Cassandra cluster. Simple tests have also shown a 20-25% throughput increase. This works very similarly to the bulk loader from 0.8, with some key differences, the largest being that nodes are no longer required to become fat clients. Everything that used to require being a fat client, such as gathering ring information to know which nodes would be involved in the streaming, now runs purely via the thrift API. This is what enables a co-located setup, since being a fat client precludes a node from also running a Cassandra instance. As a secondary benefit, the bulk loader itself also no longer requires being a fat client, though this is of lesser utility since if it was a problem before, you could always invoke the JMX method to load the files from an existing Cassandra instance.

In conclusion, one might wonder why BulkOutputFormat is not the default, and why it didn’t just replace ColumnFamilyOutputFormat. After all, it does seem superior in comparison, however, there is a subtle but distinct difference in functionality between the two. If what is important to you is how fast results appear, that is to say, you want to be able to access each partial output record as soon as possible, then CFOF is still the right choice. This comes at the cost of greater load on your Cassandra cluster, since CFOF is writing via thrift, but you can access each piece of data as soon as it is written. Conversely, BOF has to generate all the data locally, and then begin streaming it to Cassandra, which then has to generate indexes and bloom filters before finally loading it and making it available. You can reason about this similarly to the classical tradeoff between latency (CFOF) and throughput (BOF), but for most people the higher throughput and lower imposed load of BOF will suit their needs best.



Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>