Cassandra supports running Hadoop MapReduce jobs against the Cassandra cluster. In a properly configured cluster, MapReduce jobs can retrieve data from Cassandra and then output results either back into Cassandra, or into a file system.
The rest of this page describes how to set up a cluster for Hadoop integration, how to manipulate input and output for a MapReduce job in Cassandra, and how to get more information on these topics.
To configure a Cassandra cluster for Hadoop integration, overlay a Hadoop cluster over your Cassandra nodes. This involves installing a TaskTracker on each Cassandra node, and setting up a JobTracker and HDFS data node.
When a Hadoop TaskTracker runs on the same servers as the Cassandra nodes, each TaskTracker is sent Tasks for data in the local Cassandra node. This causes tremendous gains in efficiencies and processing times, as the Cassandra nodes receive only queries for which they are the primary replica, avoiding the overhead of the Gossip protocol.
Install a Hadoop TaskTracker on each of your Cassandra nodes to allow the Hadoop JobTracker to assign tasks to the Cassandra nodes that contain data for those tasks.
Running Hadoop TaskTrackers on Cassandra requires you to update the HADOOP_CLASSPATH in <hadoop>/conf/hadoop-env.sh to include the Cassandra libraries. For example, add an entry like the following in the hadoop-env.sh on each of the task tracker nodes:
export HADOOP_CLASSPATH=/opt/cassandra/lib/*:$HADOOP_CLASSPATH
One server must be reserved for the Hadoop namenode/JobTracker. Furthermore, at least one node in the cluster must be configured as a Hadoop datanode. This instance of HDFS is required to act as a distributed cache to store information like static data and JAR dependencies for the MapReduce job. Typically this is a small volume of data, but it is required by Hadoop operations.
The class org.apache.cassandra.hadoop.ColumnFamilyInputFormat allows you to read data stored in Cassandra from a Hadoop MapReduce job. Its companion class org.apache.cassandra.hadoop.ColumnFamilyOutputFormat allows you to write the results of a Hadoop MapReduce job back into Cassandra.
To use these formats in a Hadoop MapReduce job, make sure that they are properly set in your code as the format class for input and/or output. For example:
job.setInputFormatClass(ColumnFamilyInputFormat.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
In the MapReduce job, Cassandra rows or row fragments (pairs of key + SortedMap of columns) can be input to Map tasks for processing, as specified by a SlicePredicate that describes which columns to fetch from each row. For example:
ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
A helpful resource for understanding Hadoop integration is the “word count” example provided in cassandra/contrib/word_count (note: the contrib folder is available only in the Cassandra source download, not the binary download). To learn more about using Cassandra with Hadoop MapReduce jobs, examine the documentation and code in the word count example, refer to the Cassandra Wiki, and see the DataStax developer blog on this topic.