Apache Cassandra 1.1 Documentation

Hadoop Integration

This document corresponds to an earlier product version. Make sure you are using the version that corresponds to your version.

Latest Cassandra documentation | Earlier Cassandra documentation

Hadoop integration with Cassandra includes support for:

  • MapReduce
  • Apache Pig
  • Apache Hive

For more detailed information, see Hadoop Support.

Starting with Cassandra 1.1, the following low-level features have been added to Cassandra-Hadoop integration:

  • Secondary index support for the column family input format - Hadoop jobs can now make use of Cassandra secondary indexes.
  • Wide row support - Previously, wide rows had, for example, millions of columns that could not be accessed by Hadoop. Now they can be read and paged through in Hadoop.
  • BulkOutputFormat - Better efficiency when loading data into Cassandra from a Hadoop job.

Secondary Index Support

Hadoop jobs can now make use of indexes and can specify expressions that direct Cassandra to scan over specific data and send the data back to the map job. This scheme pushes work down to the server instead transferring data back and forth.

You can overload the ConfigHelper.setInputRange class, which specifies input data to use a list of expressions that verify the data before including it in the Hadoop result set.

IndexExpression expr =
  new IndexExpression(
    ByteBufferUtil.bytes("int4"),
    IndexOperator.EQ,
    ByteBufferUitl.bytes(0)
  );

ConfigHelper.setInputRange(
  job.getConfiguration(),
  Arrays.asList(expr)
);

The Cassandra WordCount example has been revised to demonstrate secondary index support. It is available in the example directory of the Apache Cassandra project repository.

Wide Row Support

The Cassandra HDFS interface includes a new parameter to support wide rows:

ConfigHelper.setInputColumnFamily(
  job.getConfiguration(),
  KEYSPACE,
  COLUMN_FAMILY,
  true
);

If the Boolean parameter is true, column family rows are formatted as individual columns. You can paginate wide rows into column-slices, similar application query pagination. For example, the paging can occur in thousand-column chunks instead of one row (at least) chunks.

Bulk Loading Hadoop Data

When loading data into Cassandra from a Hadoop job, you can manage the Hadoop output using a new class: BulkOutputFormat. Use this class to set the input column family format.

ConfigHelper.setInputColumnFamily(
  BulkOutputFormat.class);

Alternatively, you can still use the ColumnFamilyOutputFormat class.

Setting the output format class to BulkOutputFormat instead of ColumnFamilyOutputFormat improves throughput of big jobs. You bulk load large amounts of data and MapReduce over it to format it for your Cassandra application. The Hadoop job streams the data directly to the nodes in a binary format that Cassandra handles very efficiently. The downside is that you no longer see the data arriving incrementally in Cassandra when you send the data to the nodes in bulk chunks.

The new options (and defaults), which can be used for both classes are:

  • OUTPUT_LOCATION (system default)
  • BUFFER_SIZE_IN_MB (64)
  • STREAM_THROTTLE_MBITS (unlimited)

You should not need to change the defaults for these options.