DataStax Developer Blog

Cassandra File System Design

By Jake Luciani - February 10, 2012 | 11 Comments

The Cassandra File System (CFS) is an HDFS compatible filesystem built to replace the traditional Hadoop NameNode, Secondary NameNode and DataNode daemons. It is the foundation of our Hadoop support in DataStax Enterprise.

The main design goals for the Cassandra File System were to first, simplify the operational overhead of Hadoop by removing the single points of failure in the Hadoop NameNode. Second, to offer easy Hadoop integration for Cassandra users (one distributed system is enough).

In order to support massive files in Cassandra we came up with a novel approach that relies heavily on the fundamentals of Cassandra’s architecture, both Dynamo and BigTable.

CFS Data Model

CFS is modeled as a Keyspace with two Column Families in Cassandra.  The Keyspace is where replication settings are, so unlike HDFS, you can’t change replication per file.  You can, however, accomplish the same with multiple CFS Keyspaces. The two Column Families represent the two primary HDFS services. The HDFS NameNode service, that tracks each files metadata and block locations, is replaced with the “inode” column family.  The HDFS DataNode service, that stores file blocks, is replaced with the “sblocks” Column Family. By doing this we have removed three services of the traditional Hadoop stack and replaced them with one fault tolerant scalable component.

The ‘inode’ Column Family contains meta information about a file and uses a DynamicCompositeType comparator.  Meta information includes: filename, parent path, user, group, permissions, filetype and a list of block ids that make up the file. For block ids it uses TimeUUID so blocks are ordered sequentially naturally.  This makes supporting HDFS append() simple.

Secondary indexes are used to support operations like “ls” and “rmdir” the corresponding CQL looks something like:

select filename from inode where parent_path=‘/tmp’;
select filename from inode where filename > ‘/tmp’ and filename < ‘/tmq’ and sentinel = ‘x’;

The sentinel is needed because of how cassandra implements secondary indexes. It chooses the predicate with the most selectivity and filters from there.

Since file identifiers are accessed by secondary indexes it can use a UUID as the row key, giving us good distribution of data across the cluster. To summarize, an entire files meta information is stored under one wide row key.

The second column family ‘sblocks’ stores the actual contents of the file.

Each row represents a block of data associated with an inode record.  The row key is a block TimeUUID from a inode record. The columns are time ordered compressed sub-blocks that, when decompressed and combined, equal one HDFS block.  Let’s walk through the read and write paths to make this all more clear.

CFS Write Path

Hadoop has the “dfs.block.size” parameter to tell how big a file block should be per file write. When a file comes in it writes the static attributes to the inode column family.

Then allocates a new block object and reads dfs.block.size worth of data.   As that data is read it splits it into sub blocks of size “cfs.local.subblock.size”.  those sub-blocks are compressed using google snappy compression.  Once a block is finished the block id is written to the inode row and the sub blocks are written to cassandra with the block id as the row key and the sub block ids as the columns.  CFS splits a block into sub-blocks since it relies on thrift, which does not support streaming so it must ensure it doesn’t OOM the node by sending 256MB or 512Mb of data at once. To Hadoop though, the block looks like a single block so it doesn’t cause any change in the job split logic of map reduce.

CFS Read Path

When a read comes in for a file or part of a file (let’s assume Hadoop looked up the the uuid from the secondary index) it reads the inode info and finds the block and subblock to read.  CFS then executes a custom thrift call that returns either the specified sub-block data or, if the call was made on a node with the data locally, the file and offset information of the Cassandra SSTable file with the subblock.  It does this since during a mapreduce task the jobtracker tries to put each computation on the node with the actual data. By using the SSTable information it is much faster, since the mapper can access the data directly without needing to serialize/deserialize via thrift.

One question that comes up is why does CFS compress sub-blocks and not use the ColumnFamily compression in Cassandra 1.0?  The reason is by compressing and de-compressing on the client side it cuts down of the network traffic between nodes.

Integrating with Hadoop

Hadoop makes it very simple to hook a custom file system implementation in and everything just works. Only the following change is needed to core-site.xml

<property>
 <name>fs.cfs.impl</name>
 <value>com.datastax.bdp.hadoop.cfs.CassandraFileSystem</value>
</property>

Now it’s possible to execute the following commands:

hadoop fs -copyFromLocal /tmp/giant_log.gz cfs://cassandrahost/tmp

or even

hadoop fs distcp hdfs:/// cfs:///

Comments

  1. Jeff Darcy says:

    This looks very interesting. Will you be elaborating at some point on your strategies for dealing with partial-block and/or contending writes? That’s where the most difficult problems lie, and so (I would guess) they’re also where the most interesting solutions are.

  2. Jake Luciani Jake Luciani says:

    Jeff, HDFS doesn’t have posix semantics at all. For the most part this should be used primarily with MapReduce jobs. Or as some have used it to deliver tiles for map UI.

    As for partial block this is fine since we only write the complete block, if there is a exception during write it gives up. if the file is only 1k in reality then we only store that much. This happens in append()

  3. Jeff Schmidt says:

    Do you see CFS as possibly being the path forward for Solandra? That is, can we use a stock Solr distribution and point it to configuration files and indexes within CFS and get high performance, replication, multiple data center access etc.?

    It would be very useful to have different Solr indexes in separate keyspaces, since the configurations can be different. Last I checked, Solandra puts all indexes in one keyspace.

    This is exciting work. Thanks for sharing.

  4. André Cruz says:

    This is interesting. I’m using Cassandra in a similar way but neither compaction strategy seems suited to deal with large amounts of data in a single CF. What compaction strategy will be used in a system like this?

    In my tests compaction starts lagging too much if the insertion of data is at a high rate.

  5. Craig Perry says:

    Very interesting!

    If an existing stored file is updated in this implementation, am i right in thinking that potentially a read(consistency level 1) can return a corrupt file?

    This would be due to the merge-on-read of the sub-blocks potentially producing a file made from different “versions” of the sub-blocks, owing to the eventual consistency property of Cassandra.

    If i’m thinking correctly, does this mean we could potentially solve this by read(quorum) for all sub-blocks? That wouldn’t be optimal performance wise though.

    Would a better solution be to include a “version” metadata column, populated with a UUID. We could then always read(1) for speed, safe in the knowledge we can detect mis-matched blocks for a file and re-request read(1)s until we can get a full file.

    Would there be a performance penalty including this version uuid column as a 3rd column in the composite key?

    Newbie disclaimer / apology: brand new to Cassandra, sorry for all the q’s :o )

  6. Jake Luciani says:

    @Craig

    in HDFS and subsequently in CFS blocks are immutable, you can append new blocks to a file but you never edit what’s in a block. So the problem you describe can’t happen.

  7. Foo says:

    Is the source available for this project? Have you done benchmarks?

  8. Ben says:

    Hey guys, is CFS a datastax enterprise feature, or is it available to all?

    -Ben

  9. Tyler Hobbs says:

    @Ben

    CFS is a DataStax Enterprise feature.

  10. CS says:

    Does CFS integrate with WebHDFS, HTTPFs (Hoop)? If so, are there recommended versions for integration with DSE 2.1.1 / DSE 2.2?

  11. Puneet says:

    I had a question.

    How is isolation guaranteed for a directory rename operation ?
    A rename could touch multiple rows in the “inode” column family, and I believe Cassandra gives atomicity/isolation guarantees only on operations contained within a single row.

    Thanks,

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>