Bulk loading data in Cassandra has historically been difficult. Although Cassandra has had the BinaryMemtable interface from the very beginning, BinaryMemtable is hard to use and provides a relatively minor throughput improvement over normal client writes.
Cassandra 0.8.1 introduces a new tool to solve this problem: sstableloader
sstableloader is a tool that, given a set of sstable data files, streams them to a live cluster. It does not simply copy the set of sstables to every node, but only transfers the relevant part of the data to each, conforming to the replication strategy of the cluster.
There are two primary use cases for this new tool:
Let us start with the second use case to demonstrate how sstableloader is used. For that, consider the following scenario: you have a one node test cluster populated with data that you want to transfer into another, multi-node cluster.
A brute-force solution would be to copy all the sstables of the source node to every node in the multi-node destination cluster, restart each node, and then run nodetool cleanup on them. This works, but is obviously inefficient, especially if the destination cluster has a lot of nodes.
With sstableloader, you first need the sstables–only the -Data and -Index components are required, the others (-Statistics and -Filter) will be ignored–to be in a directory whose name is the name of the keyspace of the sstables. This is how they will be stored in either the main data directory, or a snapshot. Then, assuming sstableloader is configured to talk to your multi-node cluster:
$ ls TestKeyspace/
TestCF-g-1-Data.db TestCF-g-2-Data.db TestCF-g-3-Data.db
TestCF-g-1-Index.db TestCF-g-2-Index.db TestCF-g-3-Index.db
$ sstableloader TestKeyspace
Starting client (and waiting 30 seconds for gossip) ...
Streaming revelant part of testKeyspace/TestCF-g-1-Data.db TestKeyspace/TestCF-g-2-Data.db TestKeyspace/TestCF-g-3-Data.db to [/127.0.0.1, /127.0.0.2, /127.0.0.3]
progress: [/127.0.0.1 3/3 (100)] [/127.0.0.2 3/3 (100)] [/127.0.0.3 3/3 (100)] [total: 100 - 24MB/s (avg: 18MB/s)]
Waiting for targets to rebuild indexes ...
To learn the topology of the cluster, the number of nodes, which ranges of keys each node is responsible for, the schema, etc., sstableloader uses the Cassandra gossip subsystem. It thus requires a directory containing a cassandra.yaml configuration file in the classpath. (If you use sstableloader from the Cassandra source tree, the cassandra.yaml file in conf will be used.)
In this config file, the listen_address, storage_port, rpc_address and rpc_port should be set correctly to communicate with the cluster, and at least one node of the cluster you want to load data in should be configured as seed. The rest is ignored for the purposes of sstableloader.
Because the sstableloader uses gossip to communicate with other nodes, if launched on the same machine that a given Cassandra node, it will need to use a different network interface than the Cassandra node. But if you want to load data from a Cassandra node, there is a simpler solution: you can use the JMX->StorageService->bulkload() call from said node.
This method simply takes the absolute path to the directory where the sstables to load are, and it will load those as sstableloader would. However, since the node running sstableloader will be both source and destination for the streaming, this will put more load on that particular node, so we advise loading data from machines that are not Cassandra nodes when loading into a live cluster.
Note that the schema for the column families to be loaded should be defined beforehand, using you prefered method: CLI, thrift or CQL.
If you want to bulk-load external data that is not in sstable form using sstableloader, you will have to first generate sstables. To do so, the simplest solution is the new Java class SSTableSimpleUnsortedWriter introduced in Cassandra 0.8.2. To demonstrate how it is used, let us consider the example of bulk-loading “user profile” data from a csv file. More precisely, we consider a csv file of the following form:
# uuid, firstname, lastname, password, age, email
5bd8c586-ae44-11e0-97b8-0026b0ea8cd0, Alice, Smith, asmi1975, 32, alice.smith@mail.com
4bd8cb58-ae44-12e0-a2b8-0026b0ed9cd1, Bob, Miller, af3!df8, 28, bob.miller@mail.com
1ce7cb58-ae44-12e0-a2b8-0026b0ad21ab, Carol, White, cw1845?, 49, c.white@mail.com
...
From this csv, we want to populate two column families that can have been created (using the CLI) with:
create keyspace Demo;
use Demo;
create column family Users
with key_validation_class=LexicalUUIDType
and comparator=AsciiType
and column_metadata=[
{ column_name: 'firstname', validation_class: AsciiType }
{ column_name: 'lastname', validation_class: AsciiType }
{ column_name: 'password', validation_class: AsciiType }
{ column_name: 'age', validation_class: LongType, index_type: KEYS }
{ column_name: 'email', validation_class: AsciiType }];
create column family Logins
with key_validation_class=AsciiType
and comparator=AsciiType
and column_metadata=[
{ column_name: 'password', validation_class: AsciiType },
{ column_name: 'uuid', validation_class: LexicalUUIDType }];
In other words, the column family Users will contain user profiles: the key is a uuid identifying the user, the columns are the user properties. We also added a secondary index on the ‘age’ property, mainly to show that this is supported by the bulk-loading process.
The second column family, Logins, associates the user email (note that this example assumes that user emails are unique) to its password and identifier. It is this column family that would typically be queried when a user login to check its credentials and allow to find its identifier to retrieve the profile data (a possibly simpler/better design would be to use a secondary index on the email column on Users. We don’t do this here to show how to load multiple column families together).
A complete Java example of how to create the relevant sstables from the csv file using the SSTableSimpleUnsortedWriter class can be found here.
To compile this file the Cassandra jar (>= 0.8.2) needs to be in the classpath (javac -cp <path_to>/apache-cassandra-0.8.2.jar DataImportExample.java). To run it, the Cassandra jar needs to be present as well as the jar of the librairies used by Cassandra (those in the lib/ directory of Cassandra source tree). Valid cassandra.yaml and log4j configuration files should also be accessible; typically, this means the conf/ directory of the Cassandra source tree should be in the classpath–see here for a typical launch script that sets all those. As of 0.8.2, you will need to set the data_file_directories and commitlog_directory directives in said cassandra.yaml to accessible directories, but not ones of an existing Cassandra node. (This will be fixed in 0.8.3, but in the meantime using /tmp for both is a good idea.) The only useful property you need to set up for SSTableSimpleUnsortedWriter is the partitioner you want to use.
Let us run through the important parts of this example:
SSTableSimpleUnsortedWriter usersWriter = new SSTableSimpleUnsortedWriter(
directory,
keyspace,
"Users",
AsciiType.instance,
null,
64);
SSTableSimpleUnsortedWriter loginWriter = new SSTableSimpleUnsortedWriter(
directory,
keyspace,
"Logins",
AsciiType.instance,
null,
64);
The last parameter is a “buffer” size: sstables need to have rows sorted according to the partitioner. For RandomPartitioner, this means that row should be ordered by the MD5 of their key. Since there is no chance data will come in that order, SSTableSimpleUnsortedWriter buffers whatever input it gets in memory and “flush” everything in one sstable once the buffer is full. The buffer size is in MB (here 64MB) and actually corresponds to serialized space. That is, the resulting sstables will be approximately 64MB size. Note that the “live” size on the Java heap can be larger, so setting this parameter too large is not advisable, and in any case there is little performance advantage to use a very high value.
for (...each csv entry...)
{
ByteBuffer uuid = ByteBuffer.wrap(decompose(entry.key));
usersWriter.newRow(uuid);
usersWriter.addColumn(bytes("firstname"), bytes(entry.firstname), timestamp);
usersWriter.addColumn(bytes("lastname"), bytes(entry.lastname), timestamp);
usersWriter.addColumn(bytes("passsword"), bytes(entry.password), timestamp);
usersWriter.addColumn(bytes("age"), bytes(entry.age), timestamp);
usersWriter.addColumn(bytes("email"), bytes(entry.email), timestamp);
loginWriter.newRow(bytes(entry.email));
loginWriter.addColumn(bytes("password"), bytes(entry.password), timestamp);
loginWriter.addColumn(bytes("uuid"), uuid, timestamp);<
}
usersWriter.close();
loginWriter.close();
Note that the order of additions of rows and of columns inside rows does not matter. It is also possible to “restart” a row multiple times or to add the same column multiple times, in which case the usual conflict resolution rules between columns apply.
Finally, each writer should be closed, otherwise the resulting sstables will not be complete.
Once compiled and run with a csv file as argument, this example program will create sstables in the Demo directory. Those sstables can then be loaded into a live cluster using sstableloader as described in the previous section: sstableloader Demo/.
Any work on getting this to work with Hadoop? It would be cool to have an Hadoop OutputFormat that could write SStables. Then you could just copy the files from HDFS and use sstableloader to load them.
Has this bulk upload supported from Hadoop M/R?
There is no OutputFormat yet, but it’s easy to use use the SSTableSimpleUnsortedWriter described above in a reduce stage.
I’ve created https://issues.apache.org/jira/browse/CASSANDRA-3045 to make this automatic.
cluster_name in the cassandra.yaml that sstableloader uses is required to match the cluster_name of the cluster you are importing to.
The Cassandra logs will show a message of level WARN, but sstableload will fail with message: “no live member found in the cluster”
Hello,
When I try to load the data in my cluster with replication factor of two, I got following error.
java.lang.RuntimeException: Got an unknow host from describe_ring()
I have created the separate folder which is exact copy of the cassandra installation folder for using sstableloader utility.I have set the listen_address,rpc_address to same machine adress.
Am I missing something.Please help!!!
I would like to thank the Cassandra community for this SStable Generator and Loader utility as it increased the efficiency of our system almost by 75% (The loading time). It helped a lot.