email iconemail phone iconcall

Using the Cassandra Bulk Loader, Updated

By Yuki Morishita -  September 26, 2014 | 29 Comments

Learn more about Apache Cassandra

We introduced sstableloader back in 0.8.1, in order to do bulk loading data into Cassandra.
When it was first introduced, we wrote a blog post about its usage along with generating SSTable to bulk load.

Now, Cassandra version 2.1.0 was released, and bulk loading has been evolved since the old blog post.
Let's see how the change makes our life easier than before.

What's changed?

Specific changes are:

  • sstableloader no longer participates in gossip membership to get schema and ring information. Instead, it just contacts one of the nodes in the cluster and ask for it. This allows you to bulk load from the same machine where cassandra is running, since it no longer listens at the same port with cassandra.
  • Internally, streaming protocol is re-designed. You can stream data more efficiently than before.
  • New CQLSSTableWriter is introduced(CASSANDRA-5894). You can now create SSTables using familiar CQL.

In the old post, we showed two scenarios where sstableloader is used. Let's see how the changes work in those scenes.
I use Apache Cassandra ver 2.1.0 through out this example, from cluster to running sstableloader.

Example 1 - Loading existing SSTables

Usage of sstableloader has not changed much, but because it has to contact the node to get schema for loading SSTables, you have to specify the address(es) of the node by using -d option.

So for example, you want to bulk load to

$ bin/sstableloader -d 127.0.0.1 ~/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f
Established connection to initial hosts
Opening sstables and calculating sections to stream
Streaming relevant part of /data/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f/Keyspace1-Standard1-ka-6-Data.db /data/Keyspace1/Standard1-cb5e6f30458811e49349511b628b066f/Keyspace1-Standard1-ka-5-Data.db to [/127.0.0.1, /127.0.0.2, /127.0.0.3]
progress: [/127.0.0.1]0:2/2 100% [/127.0.0.2]0:2/2 100% [/127.0.0.3]0:2/2 100% total: 100% 0  MB/s(avg: 5 MB/s)
Summary statistics:
   Connections per host:         : 1
   Total files transferred:      : 6
   Total bytes transferred:      : 98802914
   Total duration (ms):          : 9455
   Average transfer rate (MB/s): : 5
   Peak transfer rate (MB/s):    : 11

As you can see, some stats are printed out after the bulk load.

Example 2 - Loading external data

Previously, we had example that creates SSTables from CSV using UnsortedSimpleSSTableWriter and uses sstableloader to load it to Cassandra cluster in the old post.
Schema there is created with thrift, and it has a simple, flat table structure.

For this updated post, let's do more complex scenario with new CQLSSTableWriter.
We will create real data from Yahoo! Finance to load historical prices of stocks in time-series manner.

Schema definition

If we take a look at CSV file for Yahoo!(YHOO), it has 7 fields in it.

Date,Open,High,Low,Close,Volume,Adj Close
2014-09-25,39.56,39.80,38.82,38.95,35859400,38.95
...

Let's use ticker symbol as our partition key, and 'Date' field as clustering key.
So schema looks like:

CREATE TABLE historical_prices (
    ticker ascii,
    date timestamp,
    open decimal,
    high decimal,
    low decimal,
    close decimal,
    volume bigint,
    adj_close decimal,
    PRIMARY KEY (ticker, date)
) WITH CLUSTERING ORDER BY (date DESC);

We use CLUSTERING ORDER BY to query recent data easily.

Generating SSTable using CQLSSTableWriter

How do you bulk load data to such a schema? If you choose to use UnsortedSimpleSSTableWriter as we did in the old post, you have to manually construct each cell of complex type to fit to your CQL3 schema. This requires you to have deep knowledge of how CQL3 works internally.
Enter CQLSSTableWriter.

All you need is DDL for table you want to bulk load, and INSERT statement to insert data to it.

// Prepare SSTable writer 
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
// set output directory 
builder.inDirectory(outputDir)
       // set target schema 
       .forTable(SCHEMA)
       // set CQL statement to put data 
       .using(INSERT_STMT)
       // set partitioner if needed 
       // default is Murmur3Partitioner so set if you use different one. 
       .withPartitioner(new Murmur3Partitioner());
CQLSSTableWriter writer = builder.build();
 
// ...snip... 
 
while ((line = csvReader.read()) != null)
{
    // We use Java types here based on 
    // https://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/DataType.Name.html#asJavaClass%28%29 
    writer.addRow(ticker,
                  DATE_FORMAT.parse(line.get(0)),
                  new BigDecimal(line.get(1)),
                  new BigDecimal(line.get(2)),
                  new BigDecimal(line.get(3)),
                  new BigDecimal(line.get(4)),
                  Long.parseLong(line.get(5)),
                  new BigDecimal(line.get(6)));
}
writer.close();

You can see complete example on my github.

After you generating SSTable, you can just use sstableloader to target cluster as described before.

There are still some limitations in CQLSSTableWriter, like you cannot use it in parallel, or user defined types are not supported yet.
But we keep improving so stay tuned to Apache JIRA.

Wrap up

Generating SSTable and bulk loading have been improved over the past release. There are many new features available to make your life easier.
Start experimenting by yourself today!









DataStax has many ways for you to advance in your career and knowledge.

You can take free classes, get certified, or read one of our many white papers.



register for classes

get certified

DBA's Guide to NoSQL







Comments

  1. Gary says:

    I built my own following the example, and it generates the sstables and we can load them no problem. But for some reason the fields in the loaded table in C* are in the wrong order. But we’ve triple checked the order of everything and they all seem correct.

    1. Yuki Morishita says:

      > But for some reason the fields in the loaded table in C* are in the wrong order.

      Fields in CQL tables are sorted in certain order internally, so it can be different from the order you defined in CREATE TABLE.

  2. Pierre says:

    it looks a lot simpler than the previous version. The question is, when to use bulk data loading ? Can we use bulk loading in production on already filled table or is it just for testing purposes ? If yes, I have to insert 1k to 100k rows at a time, is it worthy to use ssloader in this case or should I just stick with INSERT INTO ?

    1. Yuki Morishita says:

      > when to use bulk data loading ?

      It can be used like migrating from another data source, restoring sstables from backup, etc.

      > Can we use bulk loading in production on already filled table

      Yes. Loaded SSTables are visible after bulk load succeeds.

      > I have to insert 1k to 100k rows at a time, is it worthy to use ssloader in this case or should I just stick with INSERT INTO ?

      Bulk loading is much faster than CQL insert if you want to put 100k rows.

      1. Pierre says:

        >> Can we use bulk loading in production on already filled table

        > Yes. Loaded SSTables are visible after bulk load succeeds.

        Sorry maybe I malformed my question :

        I have a cluster with an already filled table with a lot of records.

        I have new datas to insert ~100k rows, which may contains some rows which are going to override some previous records (eg same primary key).

        Can I use ssloader to put the new datas in this non empty table or should I stick to INSERT INTO.

        1. Yuki Morishita says:

          > Can I use ssloader to put the new datas in this non empty table or should I stick to INSERT INTO.

          What you have to keep in mind is that each data has timestamp and if it’s newer than existing data, then it overrides.

          CQLSSTableWriter set timestamp to the time when you execute “writer.addRow” and INSERT INTO does the same by default.
          You can set any timestamp using “INSERT INTO … USING TIMESTAMP ” in both cases.

          Remember, it is the time when you write SSTable, not when you stream using sstableloader.

          1. Andreas says:

            When migrating historical data it will be a large difference between the log date (when the row was originally written in another system) and the internal timestamp.

            If rows are never updated or deleted, does it matter at all that the timestamps will be skewed?

            Wouldn’t this be reasonable exstension? To create historical timestamps if needed?

  3. Pierre says:

    I gave a try to the API, I have 2 questions : first, I have a memory leaks due to threads created when I use forTable() function of the builder :

    System.setProperty("cassandra.config","file:///var/tmp/cassandra.properties");
    System.setProperty("cassandra.storagedir","/var/tmp/cassandra-tmp");
    File tmpDirBotLogs = new File("/var/tmp/sst-" + UUID.randomUUID());
    tmpDirBotLogs.mkdirs();
    CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();

    builder
    .inDirectory(tmpDirBotLogs)
    .forTable(SCHEMA_BOT_LOGS)
    ;

    They are created by static code in ClientState.java (DatabaseDescriptor.getAuthenticator().protectedResources() I think), what are their roles and how to close them ?

    Second question, after I added my rows, and closed the writer I have my sst file ready to be sent to the cluster. You suggest to use sstloader, but I would like to know how to send them directly with java code.

  4. Pierre says:

    I looked at BulkLoad.java and it was easy to set the few lines of codes to upload my created sstable from my application :


    ExtClient client = new ExtClient(new HashSet(Arrays.asList(InetAddress.getByName("127.0.0.1"))));
    SSTableLoader loader = new SSTableLoader(
    new File("/var/tmp/keyspace/sst-4b456c09-2d21-4112-be00-babfa6356e62/"),
    client,
    new OutputHandler.LogOutput()
    );
    loader.stream().get();

    I tested on localhost, and it was EXTREMLY fast.

    However :

    Again I face the unstoppable threads problem, after calling loader.stream() it creates a bunch of threads but I don’t have an handle on them so they can’t be stopped when they are no more used. Moreover, If I need to call multiple times loader.stream() it will create a new connection each times and multiple threads each times, so we have again a memory leak (nearly the same).

    Some suggestions : We need a cleaner API, something like a Client or a Connection manager/pool we can reuse across the sstable builder (CQLSSTableWriter.builder()) AND the sstable loader (because both needs a connection to the cluster). With a shutdown method on it so we can stop the created threads. This could also facilitate the implementation of multithreading/parallelized upload of sst table to the cluster.

    That’s really promising, I really prefer this way to insert data, it’s so fast. I was used to use LOAD DATA INFILE with mysql instead of “Insert” statement when I had a lot of data. Here we have the same performance gain with Cassandra.

  5. Pierre says:

    The threads error was due to configuration not in client mode by default. I solved the problem with this :

    static { org.apache.cassandra.config.Config.setClientMode(true);
    }

    Now I don’t need anymore to provide a conf file and the server threads (metrics and co) aren’t started anymore.

    So forgot everything about thread pool/memleaks and whatever, when it’s configured in client mode, it works like a charm.

  6. Rajnish says:

    Hi, Thanks for sharing this tutorial, it is very useful and worked for me. I have doubt after sstableloader runner return the “Summary statistics”.

    I have checked in the OpsCenter that, it is still doing the compaction, and taking so long to do that. Can we do something to improve the performance on this?

  7. Emmanuel says:

    Doesn’t work on dse 4.6. writer.addRow throws a NullPointerException. Changing dependency to cassandra-2.0.10 makes it work.

    1. Yuki Morishita says:

      We had problem in open source Cassandra, but it was fixed to be released in next 2.0.14 version(https://issues.apache.org/jira/browse/CASSANDRA-8808).

      1. Emmanuel says:

        Will wait for it, thanks. Stack was however here a bit different than the one in the 8808 JIRA.


        Exception in thread "main" java.lang.ExceptionInInitializerError
        at org.apache.cassandra.db.ColumnFamilyStore.createColumnFamilyStore(ColumnFamilyStore.java:408)
        at org.apache.cassandra.db.ColumnFamilyStore.createColumnFamilyStore(ColumnFamilyStore.java:393)
        at org.apache.cassandra.db.Keyspace.initCf(Keyspace.java:312)
        at org.apache.cassandra.db.Keyspace.(Keyspace.java:269)
        at org.apache.cassandra.db.Keyspace.open(Keyspace.java:111)
        at org.apache.cassandra.db.Keyspace.open(Keyspace.java:89)
        at org.apache.cassandra.cql3.statements.UpdateStatement.addUpdateForKey(UpdateStatement.java:109)
        at org.apache.cassandra.io.sstable.CQLSSTableWriter.rawAddRow(CQLSSTableWriter.java:218)
        at org.apache.cassandra.io.sstable.CQLSSTableWriter.addRow(CQLSSTableWriter.java:138)
        at org.apache.cassandra.io.sstable.CQLSSTableWriter.addRow(CQLSSTableWriter.java:113)
        at BulkLoad.main(BulkLoad.java:128)
        Caused by: java.lang.NullPointerException
        at org.apache.cassandra.db.Directories.(Directories.java:77)
        ... 11 more

        1. Yuki Morishita says:

          The cause is the same, accesssing Keyspace requires cassandra.yaml to be loaded, which is not what we expected when using CQLSSTableWriter, thus NPE. CASSANDRA-8808 fixes it.

  8. Bing W says:

    Just curious – have you thought about integrating bulk loading into CQL like most other conventional RDBMS? Or even like Oracle’s SQL*Loader, which is a separate command line tool. Or that is already being planned/worked on? Thanks, Bing

  9. Prasad K says:

    The github code uses org.apache.cassandra.* – is there a javadoc for the cassandra packages, so that i can browse through available classes and their functions. Also curious if the custom developed libraries integrate with datastax drivers (i am looking at calling functions like NOW(), DATEOF(), etc. and passing user defined types from a java driver to cassandra

  10. Matt says:

    I converted a CSV file to an sstable and when I try the sstableloader I am getting this error.

    java.lang.NumberFormatException: For input string “TOX.txt”

    I have run the conversion a couple of times, always clearing my data folder, and it seems to complain about different file names each time.

    Any ideas what is wrong?

    1. Matt says:

      Here is my full stack trace:

      [datastax@localhost ~]$ sstableloader -d localhost datawh/line_items
      Established connection to initial hosts
      Opening sstables and calculating sections to stream
      For input string: “TOC.txt”
      java.lang.NumberFormatException: For input string: “TOC.txt”
      at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
      at java.lang.Integer.parseInt(Integer.java:492)
      at java.lang.Integer.parseInt(Integer.java:527)
      at org.apache.cassandra.io.sstable.Descriptor.fromFilename(Descriptor.java:276)
      at org.apache.cassandra.io.sstable.Descriptor.fromFilename(Descriptor.java:235)
      at org.apache.cassandra.io.sstable.Component.fromFilename(Component.java:120)
      at org.apache.cassandra.io.sstable.SSTable.tryComponentFromFilename(SSTable.java:160)
      at org.apache.cassandra.io.sstable.SSTableLoader$1.accept(SSTableLoader.java:84)
      at java.io.File.list(File.java:1155)
      at org.apache.cassandra.io.sstable.SSTableLoader.openSSTables(SSTableLoader.java:78)
      at org.apache.cassandra.io.sstable.SSTableLoader.stream(SSTableLoader.java:162)
      at org.apache.cassandra.tools.BulkLoader.main(BulkLoader.java:106)
      [datastax@localhost ~]$

      1. Matt says:

        Might be an issue with Cassandra 2.2.0. The files it was creating did not have the keyspace and table name as a part of the sstable files.

        The files looked like this: la-1-big-Data.db
        They should look like this: datawh-line_items-la-1-Data.db

        I ran this on all of the files that were created to rename them appropriately:

        String fileName = file.getName();
        int second = fileName.indexOf("-", 3);
        int third = fileName.indexOf("-", second+1);
        String newFileName = KEYSPACE + "-" + tableName + "-" + fileName.substring(0, second) + fileName.substring(third);
        File newFile = new File(DEFAULT_OUTPUT_COPY_DIR + File.separator + KEYSPACE + File.separator + tableName + File.separator + newFileName);

        try {
        Files.copy(file, newFile);
        } catch (IOException e) {
        e.printStackTrace();
        }

        This could also just be a compatibility issue between Cassandra 2.2.0 that I am running locally and 2.1.8 that I am running on my server.

        1. Yuki Morishita says:

          FIle name format is changed in 2.2, and it does not contain keyspace name and table name anymore. Notice “la” indicates SSTable version.

          You cannot stream 2.2 generated table to 2.1.8.

  11. saihareesh says:

    I have a production cassandra live cluster
    Can i use the same for concurrent reads and writes ,while streaming of sstabels is happening on the same cluster.
    Will there be any problem or performance issue in doing so. Please suggest

  12. Paul Weiss says:

    Is it possible to call the sstableloader from java instead using the command line program? I have a process that uses the CQLSSTableWriter and generates the sstable files but am looking for an end to end process that bulk loads without any manual intervention.

    Ideally would like to avoid forking another process so I can properly check for errors.

    Thanks

  13. Sam T says:

    I had a couple of questions regarding the write directory. The code is writing to ./data – so of course this is a relative path.
    1. How do we find the actual /data directory of a Cassandra host (I couldn’t figure this out).

    2. Can I write to ‘any directory’ and just load to the target Cassandra host/cluster using the SSTableLoader?

    Thanks
    Sam

    1. Sam T says:

      I figured this out, you can write anywhere on the system, and simply load to the running Cassandra host.

  14. Sam T says:

    Trying to run the code, it works fine on my local Cassabdra (2.2.8).
    But when I run it on DataStax ver 4.8.11, ( likely on Cassandra 2.1), I get error:
    java.lang.illegalargumentexception: Unknown key space
    at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.getStatement

    It is able to create /data/keysapce/tablename directories. Based on documentation this exception occurs: if directory doesn’t exist or is not writable. That does not seem the case.
    Also again the same code works fine on local Cassandra.

  15. Aram says:

    Noob question: how and where do I download the correct jar file that I need in order to be able to “import org.apache.cassandra.io.sstable.CQLSSTableWriter;”?

  16. Akshay Moharil says:

    Can we use sstable loader to load snapshots in a partially upgraded cluster ? My two nodes are on DSE4.8 and one node is on DSE4.5 .

  17. bruceliang says:

    I use 4 nodes to bulkload SStables into cassandra,but one of the node sometimes cpu is almost 100% which hold by cassandra.And the load speed become very slow,after a long time the speed will recover to normal,Why?
    I use cassandra 2.1.14.

Comments

Your email address will not be published. Required fields are marked *




Subscribe for newsletter: