DataStax Developer Blog

Using the Cassandra Bulk Loader

By Sylvain Lebresne -  August 1, 2011 | 26 Comments

Bulk loading data in Cassandra has historically been difficult. Although Cassandra has had the BinaryMemtable interface from the very beginning, BinaryMemtable is hard to use and provides a relatively minor throughput improvement over normal client writes.

Cassandra 0.8.1 introduces a new tool to solve this problem: sstableloader

Using sstableloader

Overview

sstableloader is a tool that, given a set of sstable data files, streams them to a live cluster. It does not simply copy the set of sstables to every node, but only transfers the relevant part of the data to each, conforming to the replication strategy of the cluster.

There are two primary use cases for this new tool:

  • Bulk loading external data into a cluster: for this you will have to first generate sstables for the data to load, as we will see later in this post.
  • Loading pre-existing sstables, typically snapshots, into another cluster with different node counts or replication strategy.

Example

Let us start with the second use case to demonstrate how sstableloader is used. For that, consider the following scenario: you have a one node test cluster populated with data that you want to transfer into another, multi-node cluster.

A brute-force solution would be to copy all the sstables of the source node to every node in the multi-node destination cluster, restart each node, and then run nodetool cleanup on them. This works, but is obviously inefficient, especially if the destination cluster has a lot of nodes.

With sstableloader, you first need the sstables–only the -Data and -Index components are required, the others (-Statistics and -Filter) will be ignored–to be in a directory whose name is the name of the keyspace of the sstables. This is how they will be stored in either the main data directory, or a snapshot. Then, assuming sstableloader is configured to talk to your multi-node cluster:

$ ls TestKeyspace/
TestCF-g-1-Data.db TestCF-g-2-Data.db TestCF-g-3-Data.db
TestCF-g-1-Index.db TestCF-g-2-Index.db TestCF-g-3-Index.db
$ sstableloader TestKeyspace
Starting client (and waiting 30 seconds for gossip) ...
Streaming revelant part of testKeyspace/TestCF-g-1-Data.db TestKeyspace/TestCF-g-2-Data.db TestKeyspace/TestCF-g-3-Data.db to [/127.0.0.1, /127.0.0.2, /127.0.0.3]

progress: [/127.0.0.1 3/3 (100)] [/127.0.0.2 3/3 (100)] [/127.0.0.3 3/3 (100)] [total: 100 - 24MB/s (avg: 18MB/s)]
Waiting for targets to rebuild indexes ...

Configuration

To learn the topology of the cluster, the number of nodes, which ranges of keys each node is responsible for, the schema, etc., sstableloader uses the Cassandra gossip subsystem. It thus requires a directory containing a cassandra.yaml configuration file in the classpath. (If you use sstableloader from the Cassandra source tree, the cassandra.yaml file in conf will be used.)

In this config file, the listen_address, storage_port, rpc_address and rpc_port should be set correctly to communicate with the cluster, and at least one node of the cluster you want to load data in should be configured as seed. The rest is ignored for the purposes of sstableloader.

Because the sstableloader uses gossip to communicate with other nodes, if launched on the same machine that a given Cassandra node, it will need to use a different network interface than the Cassandra node. But if you want to load data from a Cassandra node, there is a simpler solution: you can use the JMX->StorageService->bulkload() call from said node.

This method simply takes the absolute path to the directory where the sstables to load are, and it will load those as sstableloader would. However, since the node running sstableloader will be both source and destination for the streaming, this will put more load on that particular node, so we advise loading data from machines that are not Cassandra nodes when loading into a live cluster.

Note that the schema for the column families to be loaded should be defined beforehand, using you prefered method: CLI, thrift or CQL.

Other considerations

  • There is no requirement that the column family into which which data is loaded be empty. More generally, it is perfectly reasonable to load data into a live, active cluster.
  • To get the best throughput out of the sstable loading, you will want to parallelize the creation of sstables to stream across multiple machines. There is no hard limit on the number of sstable loader that can run at the same time, so you can add additional loaders until you see no further improvement.
  • At the time of this writing, sstableloader does not handle failure very well. In particular, if a node it is sending to dies, it will get stuck (a progress indicator is displayed so you will be able to tell when that happens and check if one of your node is indeed dead). Until this is fixed, if that happens, you will have to stop the loader and relaunch it. If you know that the transfer has successfully ended on some of the other nodes, you can use the -i flag to skip those nodes during the retry.

Bulk-loading external data: a complete example

The setup

If you want to bulk-load external data that is not in sstable form using sstableloader, you will have to first generate sstables. To do so, the simplest solution is the new Java class SSTableSimpleUnsortedWriter introduced in Cassandra 0.8.2. To demonstrate how it is used, let us consider the example of bulk-loading “user profile” data from a csv file. More precisely, we consider a csv file of the following form:

# uuid, firstname, lastname, password, age, email

5bd8c586-ae44-11e0-97b8-0026b0ea8cd0, Alice, Smith, asmi1975, 32, alice.smith@mail.com
4bd8cb58-ae44-12e0-a2b8-0026b0ed9cd1, Bob, Miller, af3!df8, 28, bob.miller@mail.com
1ce7cb58-ae44-12e0-a2b8-0026b0ad21ab, Carol, White, cw1845?, 49, c.white@mail.com
...

From this csv, we want to populate two column families that can have been created (using the CLI) with:

create keyspace Demo;
use Demo;
create column family Users
    with key_validation_class=LexicalUUIDType
    and comparator=AsciiType
    and column_metadata=[
        { column_name: 'firstname', validation_class: AsciiType }
        { column_name: 'lastname', validation_class: AsciiType }
        { column_name: 'password', validation_class: AsciiType }
        { column_name: 'age', validation_class: LongType, index_type: KEYS }
        { column_name: 'email', validation_class: AsciiType }];

create column family Logins
    with key_validation_class=AsciiType
    and comparator=AsciiType
    and column_metadata=[
        { column_name: 'password', validation_class: AsciiType },
        { column_name: 'uuid', validation_class: LexicalUUIDType }];

In other words, the column family Users will contain user profiles: the key is a uuid identifying the user, the columns are the user properties. We also added a secondary index on the ‘age’ property, mainly to show that this is supported by the bulk-loading process.

The second column family, Logins, associates the user email (note that this example assumes that user emails are unique) to its password and identifier. It is this column family that would typically be queried when a user login to check its credentials and allow to find its identifier to retrieve the profile data (a possibly simpler/better design would be to use a secondary index on the email column on Users. We don’t do this here to show how to load multiple column families together).

Creating sstables

A complete Java example of how to create the relevant sstables from the csv file using the SSTableSimpleUnsortedWriter class can be found here.

To compile this file the Cassandra jar (>= 0.8.2) needs to be in the classpath (javac -cp <path_to>/apache-cassandra-0.8.2.jar DataImportExample.java). To run it, the Cassandra jar needs to be present as well as the jar of the librairies used by Cassandra (those in the lib/ directory of Cassandra source tree). Valid cassandra.yaml and log4j configuration files should also be accessible; typically, this means the conf/ directory of the Cassandra source tree should be in the classpath–see here for a typical launch script that sets all those. As of 0.8.2, you will need to set the data_file_directories and commitlog_directory directives in said cassandra.yaml to accessible directories, but not ones of an existing Cassandra node. (This will be fixed in 0.8.3, but in the meantime using /tmp for both is a good idea.) The only useful property you need to set up for SSTableSimpleUnsortedWriter is the partitioner you want to use.

Let us run through the important parts of this example:

  • Creation of the sstable writers:

    SSTableSimpleUnsortedWriter usersWriter = new SSTableSimpleUnsortedWriter(
            directory,
            keyspace,
            "Users",
            AsciiType.instance,
            null,
            64);
    SSTableSimpleUnsortedWriter loginWriter = new SSTableSimpleUnsortedWriter(
            directory,
            keyspace,
            "Logins",
            AsciiType.instance,
            null,
            64);

    The directory and keyspace parameters are the directory where to put the sstables (a Java File) and the keyspace of the column families (a String), respectively. Next, there are the column family name and the comparator and sub-columns comparator–here, we don’t use super columns so the sub-columns comparator is null.

    The last parameter is a “buffer” size: sstables need to have rows sorted according to the partitioner. For RandomPartitioner, this means that row should be ordered by the MD5 of their key. Since there is no chance data will come in that order, SSTableSimpleUnsortedWriter buffers whatever input it gets in memory and “flush” everything in one sstable once the buffer is full. The buffer size is in MB (here 64MB) and actually corresponds to serialized space. That is, the resulting sstables will be approximately 64MB size. Note that the “live” size on the Java heap can be larger, so setting this parameter too large is not advisable, and in any case there is little performance advantage to use a very high value.

  • Populate with each csv entry:

    for (...each csv entry...)
    {
        ByteBuffer uuid = ByteBuffer.wrap(decompose(entry.key));
        usersWriter.newRow(uuid);
        usersWriter.addColumn(bytes("firstname"), bytes(entry.firstname), timestamp);
        usersWriter.addColumn(bytes("lastname"), bytes(entry.lastname), timestamp);
        usersWriter.addColumn(bytes("passsword"), bytes(entry.password), timestamp);
        usersWriter.addColumn(bytes("age"), bytes(entry.age), timestamp);
        usersWriter.addColumn(bytes("email"), bytes(entry.email), timestamp);


        loginWriter.newRow(bytes(entry.email));
        loginWriter.addColumn(bytes("password"), bytes(entry.password), timestamp);
        loginWriter.addColumn(bytes("uuid"), uuid, timestamp);<
    }
    usersWriter.close();
    loginWriter.close();

    In this excerpt, entry is a parsed csv entry. Each call to newRow() starts a new row that is populated with the column added by addColumn(). Though not demonstrated here, it is equally simple to add super, expiring or counter columns; the exact API is described here.

    Note that the order of additions of rows and of columns inside rows does not matter. It is also possible to “restart” a row multiple times or to add the same column multiple times, in which case the usual conflict resolution rules between columns apply.

    Finally, each writer should be closed, otherwise the resulting sstables will not be complete.

Once compiled and run with a csv file as argument, this example program will create sstables in the Demo directory. Those sstables can then be loaded into a live cluster using sstableloader as described in the previous section: sstableloader Demo/.

Other considerations

  • SSTableSimpleUnsortedWriter never flushes to disk between two calls of newRow(). As a consequence, all data inserted between two of those calls must fit in memory. If you have a huge row for which this does not hold, you can call newRow() regularly, using the same row key, to avoid buffering everything.
  • The methods of the simple writer expect ByteBuffers for the row key, column name and column value. Converting data to bytes is your responsibility; this is the raison d’être of the bytes() method in the example above.


Comments

  1. Tom Davidson says:

    Any work on getting this to work with Hadoop? It would be cool to have an Hadoop OutputFormat that could write SStables. Then you could just copy the files from HDFS and use sstableloader to load them.

  2. Thamizhannal says:

    Has this bulk upload supported from Hadoop M/R?

  3. Jonathan Ellis says:

    There is no OutputFormat yet, but it’s easy to use use the SSTableSimpleUnsortedWriter described above in a reduce stage.

    1. sai says:

      Hi,Can anyone let me know how to load data into dynamic column families using sstableloader??

  4. Jonathan Ellis says:

    I’ve created https://issues.apache.org/jira/browse/CASSANDRA-3045 to make this automatic.

  5. Christopher J. Bottaro says:

    cluster_name in the cassandra.yaml that sstableloader uses is required to match the cluster_name of the cluster you are importing to.

    The Cassandra logs will show a message of level WARN, but sstableload will fail with message: “no live member found in the cluster”

  6. Manish says:

    Hello,

    When I try to load the data in my cluster with replication factor of two, I got following error.
    java.lang.RuntimeException: Got an unknow host from describe_ring()

    I have created the separate folder which is exact copy of the cassandra installation folder for using sstableloader utility.I have set the listen_address,rpc_address to same machine adress.
    Am I missing something.Please help!!!

  7. Samarth Gahire says:

    I would like to thank the Cassandra community for this SStable Generator and Loader utility as it increased the efficiency of our system almost by 75% (The loading time). It helped a lot.

  8. xiongxiong says:

    why i got this:

    Exception in thread “main” java.lang.NullPointerException
    at org.apache.cassandra.io.sstable.AbstractSSTableSimpleWriter.newRow(AbstractSSTableSimpleWriter.java:96)
    at DataImportExample.main(DataImportExample.java:71)

    cassandra1.1

  9. Apara says:

    I get the same exception, anyone have some solution??
    Exception in thread “main” java.lang.NullPointerException
    at org.apache.cassandra.io.sstable.AbstractSSTableSimpleWriter.newRow(AbstractSSTableSimpleWriter.java:96)
    at DataImportExample.main(DataImportExample.java:60)

  10. Apara says:

    I found the reason for this error, that is because I had null the IPartitioner argument in creating new SSTableSimpleUnsortedWriter(, , ,). Since I am using cassendra 1.1.1 the constructor with 6 args is not available as in this post.

    1. Joseph says:

      Can you post the solution for Cassandra 1.1.1?

  11. Ameya says:

    I am trying to bulk load using the sstableloader. My files get loaded OK (I think – judging from the progress bar), but then I get this message and it just hangs indefinitely:

    “Waiting for targets to rebuild indexes …”

    Full output of the loader is as follows:
    ———————–
    Starting client (and waiting 30 seconds for gossip) …
    Streaming revelant part of /app/sstable/bin/tuiru/packagetours-hd-4-Data.db /app/sstable/bin/tuiru/packagetours-hd-2-Data.db /app/sstable/bin/tuiru/packagetours-hd-1-Data.db /app/sstable/bin/tuiru/packagetours-hd-3-Data.db to [/10.40.14.93]

    progress: [/10.40.14.93 4/4 (100)] [total: 100 - 3MB/s (avg: 17MB/s)]
    Waiting for targets to rebuild indexes …
    ————

    Any help would be greatly appreciated. Thanks!

  12. Manu Zhang says:

    Is there a way to bulk load data into Memtable first? I really want my data to reside in memory.

  13. Ash1 says:

    Hi, Even though the streaming of SSTables is very fast , I find that generation of SStables is quite slow (for me atleast) for very large files (CSV, 4GB+). I am using a Dual Core computer with 2 GB ram. Could it be because of the system spec or any other factor?

    Thanks

  14. Pradeep Mantha says:

    I am trying to use the DataImportExample, but it seems it doesn’t work with the latest 1.1.6 version. I updated the partitioner in the new constructor of SSTableSimpleUnsortedWriter class to NULL and then not sure, whats the problem with addColumn function…

    I get the following error

    Exception in thread “main” java.lang.NullPointerException
    at org.apache.cassandra.io.sstable.AbstractSSTableSimpleWriter.addColumn(AbstractSSTableSimpleWriter.java:115)
    at org.apache.cassandra.io.sstable.AbstractSSTableSimpleWriter.addColumn(AbstractSSTableSimpleWriter.java:130)

    Could you please help me out.

    thanks
    pradeep

  15. Sachin says:

    I am trying to use the DataImportExample, but it seems it doesn’t work with the latest 1.2 version.

    I have a CSV file with 1 million records in it.

    First it says you have to use murmur3partitioner and not randomPartitioner in 1.2 in
    IPartitioner partitioner = new RandomPartitioner();

    So I changed it to
    IPartitioner partitioner = new murmur3Partitioner();

    Then
    SSTableSimpleUnsortedWriter usersWriter = new SSTableSimpleUnsortedWriter(directory,partitioner,keyspace,ColumnFamily,AsciiType.instance,null,64);

    this says that the method not available with 6 parameters, so I remove the last 64.

    After making the change, I get the following error java.util.NoSuchElementException
    in userWriter.addColumn after adding 129128 records.

    So I reduced the no. of records in CSV file to only 10000, then the same error is shown for (java.util.NoSuchElementException) in
    userWriter.close()

    I am not able to create SSTables in 1.2 at all. Has anybody tried this?

    1. ELDHOSE RAJAN says:

      I created the SSTables. But I used the SSTable loader too. But, not able to do a select * on the column family.
      Its giving a rpc timeout error.
      Did you get your SSLoader buolk upload working..

  16. Josh says:

    Hi Sachin,
    I have the same issue, Datastax, can you please update this article? I’m running in circles trying to figure this out :)

  17. kitty says:

    Hi, I am using 1.1.9.3 cassandra version. In the above example, added partitioner(random partitioner) in constructer argument list, and followed all the steps. On running the script, I keep getting error below:
    Error instantiating snitch class ‘com.datastax.bdp.snitch.DseDelegateSnitch’.
    Fatal configuration error; unable to start server. See log for stacktrace.

    I am executing it from location /usr/lib/datastax/dse-3.0.1/resources/cassandra

    This is my script file

    #!/bin/sh

    # paths to the cassandra source tree, cassandra jar and java
    CASSANDRA_HOME=”.”
    CASSANDRA_JAR=”$CASSANDRA_HOME/../../lib/dse-3.0.1.jar”
    JAVA=`which java`

    # Java classpath. Must include:
    # – directory of DataImportExample
    # – directory with cassandra/log4j config files
    # – cassandra jar
    # – cassandra depencies jar
    CLASSPATH=”.:$CASSANDRA_HOME/conf:$CASSANDRA_JAR”

    for jar in $CASSANDRA_HOME/lib/*.jar; do
    CLASSPATH=$CLASSPATH:$jar
    done

    $JAVA -ea -cp $CLASSPATH -Xmx256M \
    -Dlog4j.configuration=log4j-tools.properties \
    DataImportExample “/home/myuser/cassandra_artifacts/demodata.csv”

  18. ELDHOSE RAJAN says:

    I am using cassandra 1.2 for Bulk upload.
    I run the sstableloader to get the following output

    Streaming revelant part of /home/ubuntu/VIQ-Cloud/software/apache-cassandra-1.2.5/data/vng_sugar_cookie_prod/ja_job_master/vng_sugar_cookie_prod-ja_job_master-ic-1-Data.db to [/127.0.0.1]

    progress: [/127.0.0.1 1/1 (100)] [total: 100 – 0MB/s

    However when I try to query the columnfamily from cqlsh I get the following error :
    Request did not complete within rpc_timeout.
    my rpc address in cassandra port is 0.0.0.0
    I am using the murmur3partitioner unlike in sample code as this is the default partitioner with cassandra 1.2.
    I am stuck at this point for 2 days.
    It’d be great if you could update this page with the sample code and instructions for latest 1.2 release..

    Thanks

    1. Abhijit says:

      HI ,

      Can you provide the command that you have used in CLI.?

      Thanks
      Abhijit

      1. MJafri says:

        ELDHOSE RAJAN , I am getting the exact same problem as you!… Any solution to it yet?

    2. MJafri says:

      I’ve found the solution to this on the web. The problem is that CQL3′s default table is with compaction off..

      http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/Bulk-loader-with-Cassandra-1-2-5-td7588224.html

    3. MJafri says:

      In my case, the solution was as simple as defining:

      CREATE TABLE toy ( id int PRIMARY KEY, x int ) WITH COMPACT STORAGE and compaction = {‘class’ : ‘LeveledCompactionStrategy’ }

      instead of

      CREATE TABLE toy ( id int PRIMARY KEY, x int )

      The rpc_timeout problem doesn’t happen with a table defined this way,.

  19. Naveen says:

    Hi,

    I am able to generate sstables but getting some “Could not retrieve endpoint ranges:” issue while running sstableloader command with CQL 3.0 version. Can someone please help me with this issue?

    Thanks,
    Naveen

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>