DataStax Developer Blog

Support CQL3 tables in Hadoop, Pig and Hive

By Alex Liu -  July 22, 2013 | 26 Comments

The evolutions of Cassandra querying mechanism


Thrift API

The first generation is through the thrift API such as batch_mutate, get, get_slice, insert, multiget_slice, remove and other methods. It’s a low level querying mechanism which directly queries on the Cassandra baseline storage such as ColumnOrSuperColumn, key, SlicePredicate, ColumnParent and more which makes it’s not efficient to develop application on top of it though we have a few of Cassandra client libraries which simplify the user experience of querying Cassandra storage.

CQL

The fist step to address the inefficience of low level thrift querying API is CQL which is introduced in Cassandra 0.8 release. It creates a modern query language with schema and it’s fully backward compatible with thrift column families. It uses higher level database like query language to query Cassandra storage, so tradition database developers feel much easier to adopt Cassandra.

CQL3

The third generation is CQL3, It “transposes” wide rows and unpacks them are into named columns. CQL3 under the hood uses composite columns and composite keys. CQL3 also has other features such as collections, binary cql protocol and many other features.

The first generation of Cassandra Hadoop driver

The first generation is based on the thrift column family based first generation of Cassandra querying mechanism. Because of CQL is backward compatible to thrift column family, the thrift column family based Hadoop support works pretty well with CQL. But it doesn’t provide any higher level abstraction for composite keys and composite columns. The decomposing composite keys or composite columns is done at client side code such as UDF. To decompose the composite keys and composite columns, the developer must use the Cassandra internal data type java classes which is hard for many application developers. The second generation of Cassandra Hadoop driver addresses this issue by using CQL3 as a high level abstraction layer to access Cassadnra storage.

The second generation of Cassandra Hadoop driver

The second generation is based on CQL3. The decomposing composite keys and composite columns is done at CQL3 under the hood. It also uses CQL3 paging mechanism to page through wide rows in a more natural way.

We creates a few new classes: CqlPagingInputFormat, CqlPagingRecordReader, CqlOutputFormat and CqlRecordWriter.

Input format

The input format is Map<String, ByteBuffer>, Map<String, ByteBuffer>. The first map is the name to value mapping of partition keys (columns) and cluster columns. The second map is the name to value mapping of other columns.

CQL3 pagination

Up to 1.2.6 release CQL3 doesn’t provide an auto pagination, so I use the following algorithm to page through the CQL3 tables. The basic idea is to use CQL3 query on the partition columns and cluster columns and store the value of partition columns and cluster columns when the page ends, then use those values to compose the CQL3 query for next page.

Let say we have a table with the following keys

PRIMARY KEY (m, n , o, p)
where m is partition column.
n, o, p are the cluster columns

We want to run the following query
select * from test

The first query is

SELECT * from test
WHERE token(m) > start_token_of_split
AND token(m) < end_token_of_split
LIMIT 1000

We store the last value of m, n, o, p as m_end, n_end, o_end and p_end after the first query is done.

The next query is

SELECT * from test
WHERE token(m) = token(m_end)
AND n = n_end
AND o = o_end
AND p > p_end
LIMIT 1000

If it reaches the end of p, the next query

SELECT * from test
WHERE token(m) = token(m_end)
AND n = n_end
AND o > o_end
LIMIT 1000

else, we use the next value of p as p_end1 for the next query

SELECT * from test
WHERE token(m) = token(m_end)
AND n = n_end
AND o = o_end
AND p > p_end1
LIMIT 1000

Until it reaches the end of n, the next query is

SELECT * from test
WHERE token(m) > token(m_end)
AND token(m) < end_token_of_split
LIMIT 1000

then we continue the same loop until it reaches the end of split.

For the tables has more than one columns in the partition columns

PRIMARY KEY((m, n), o, p)
where m and n are partition columns.
o and p are clustering columns.

We use the following query

SELECT * from test
WHERE token(m, n) > token(m_end, n_end)
AND token(m, n) < end_token_of_split
LIMIT 1000

If cluster columns are defined as descending, the above queries should change to use less comparing operator.

CQL3 auto paging through native protocol had been developed, so we will have this auto paging mechanism plugged into CqlPagingRecordReader soon.

Input parameters


Some input parameters can be configured through CqlConfigHelper.

CqlConfigHelper.setInputColumns -- select specific columns
CqlConfigHelper.setInputCQLPageRowSize -- the number rows per page
CqlConfigHelper.setInputWhereClauses -- the where clauses on the indexed columns

Output format

The input format is Map<String, ByteBuffer>, List<ByteBuffer>. The map is the name to value mapping of partition keys (columns) and cluster columns. The list is the list of values of other columns. CqlRecordWriter takes the values of columns and bind them to the prepared CQL query.

Output parameters


The output CQL query be configured through CqlConfigHelper.

CqlConfigHelper.setOutputCql

The first generation of Pig Cassandra driver

The firs generation Pig Cassandra driver is based on the thrift column family based first generation of Hadoop Cassandra driver which uses CassandraStorage class.

CQL3 table support

The first generation uses thrift describe_keyspace to get the metadata of the column families. Because CQL3 table is not shown in the result list of decribe_keyspace thrift API call, the first generation Pig Cassandra driver can't query on the CQL3 tables. We query on system tables of system.schema_columnfamilies and system.schema_columns to retrieve the metadata of CQL3 tables to fix that. Because CQL3 uses composite columns and composite keys under the hood, it's not efficient to use the first generation Pig Cassandra driver for CQL3 tables.

The second generation of Pig Cassandra driver

The second generation is based on the CQL3 based second generation of Hadoop Cassandra driver which uses CqlStorage class.

CQL3 table support

Because the second generation of Hadoop Cassandra driver uses CQL3 under the hood, we can easy decompose the composite keys and composite columns by using CQL3 queries.

The url format for the CqlStorage is

cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
[&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
[&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]]

Where

page_size -- the number of rows per page
columns -- the select columns of CQL query
where_clause -- the where clause on the index columns, it needs url encoding
split_size -- number of rows per split
partitioner -- cassandra partitioner
use_secondary -- enable pig filter partition push down
output_query -- the CQL query for write in a prepared statement format

Schema


Input schema is

(value, value, value) where each value schema has the name of the column name, and value of the column value.

Output schema is

(((name, value), (name, value)), (value ... value), (value...value))
where the first tuple is the map of partition columns and clustering columns.
the rest tuples are the list of binded values for the output prepared CQL query

Pig partition filter push down

Set use_seconday to true to enable it.
We generate the where_clause from Pig partition filter queries, and set it to CqlConfigHelp.setInputWhereClauses. The partition filter queries are pushed down to CqlPagingRecordRead which sends back less data to Pig.

The first generation of Hive Cassandra driver

The first generation is based on the first generation of Hadoop Cassandra driver which uses the thrift column families. We need use the second generation of Hadoop Cassandra driver to improve the query on composite columns which CQL3 table use under the hood.

The second generation of Hive Cassandra driver

The second generation uses the second generation of Hadoop Cassandra driver to query on CQL3 tables. Basically It set the input and output CQL query and map the input and output value to Hive data type.

All metadata are retrieved from system tables of system.schema_columnfamilies and system.schems_columns.

All CQL3 tables have auto generated Hive tables using CqlStorageHandler which has the following parameters


cassandra.page.size -- the number of rows per page for CqlPagingRecordReader
cassandra.cql3.type -- the CQL data types for the mapped Hive columns
cassandra.columns.mapping -- the CQL columns for the mapped Hive columns

The push down condition will be implemented the similar way as Pig partition filter push down. We will also expand the default mappings to include collections.



Comments

  1. Alex Holmansky says:

    Great writeup! Could you add some information on how CQL3 collections are handled by the new drivers – especially in Pig and Hive?

  2. Alex Liu says:

    Collections are decomposed by using the validator in Pig the same way as CassandraStorage. Collection in Hive is by default mapped to binary, so it has the same issue as CassandraStorageHandler which needs client side UDF to decompose it. I am working to improve the collection support in Hive, basic idea is to map C* collection directly to Hive collection data type. I will update the blog once it’s done.

  3. Sam Johnson says:

    Is it possible to use cql3 map reduce to only get a slice of timeline data as input. Basically I have a CF like this
    CREATE TABLE testcf {
    uid varchar, time timestamp, event varchar,
    PRIMARY KEY (uid, timestamp);

    Now I want the input to the map reduce to use only timelines between 2 given dates. But when I try to set the inputWhereClause it doesn’t work

    CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), ” time>=1374785223228 AND time<=1375118518985");

    I get the following error – Caused by: InvalidRequestException(why:Invalid restrictions found on event_timestamp)

    Any suggestions on how to do this?

    1. Alex Liu says:

      Where clause only applies to column with secondary index, it doesn’t apply to primary key column. That’s reason why it throws InvalidRequestException.

      It needs read through all columns for the row.

      1. Sam Johnson says:

        Is this a limitation in the cql3 mapreduce framework. You can do this with thrift map reduce using slice predicate to send only a specific timeline set to the mapper. Is there any support to do this in CQL3 or plans to do it in the future?

        1. Alex Liu says:

          The CqlPagingRecordReader depends on Cql query functions, so it’s limit to what Cql offers. I will open a ticket for it, hopefully it can be implemented in the future

  4. James Schappet says:

    Alex, thanks for all the hard work, seems like this is starting to come together.

    I am trying to write data to Cassandra CQL 3 Table using:

    STORE G INTO ‘cql://keyapse/col_family?output_query=not sure what goes here’ USING CqlStorage();

    Can you give me an example of what the output_query would look like:

    ie. &output_query=UPDATE col_family SET col1=$0, col2=$3 WHERE KEY=$2

    1. Alex Liu says:

      output_query=UPDATE col_family SET col1=?, col2=?

      you may need encode it.

      1. James Schappet says:

        I got a good answer to this question on StackOverFlow.

        1. Alex Liu says:

          It will be using url encoding for next release. The post on StackOverFlow is working for now.

  5. Alex Holmansky says:

    So I tried the following experiment:

    CREATE KEYSPACE test1 WITH replication = {
    'class': 'SimpleStrategy',
    'replication_factor': '1'
    };

    USE test1;

    create table test (
    col1 text,
    col2 text,
    col3 text,
    col4 text,
    col5 text,
    primary key(col1,col2,col3)
    );

    Then I tried to load some data from a CSV file and store it into this table using Pig like this:

    recs = load 'cfs://test/test_data.csv' using PigStorage(',') as (col1:chararray, col2:chararray, col3:chararray, col4:chararray, col5:chararray);

    insert_recs = foreach recs generate TOTUPLE(TOTUPLE( TOTUPLE('col1',col1), TOTUPLE('col2',col2), TOTUPLE('col3', col3)), TOTUPLE(col4, col5));

    store insert_recs into 'cql://test1/test' using CqlStorage();

    This doesn’t work for me. What am I missing?
    Thanks!

    1. Alex Liu says:

      The correct script should be

      store insert_recs into ‘cql://test1/test?output_query=update test set col4 @ #, col5 @ #’ using CqlStorage();

      or url encode “update test set col4 = ? , col5 = ?” as output_query depends the release.

  6. James Schappet says:

    Is it or would it be possible to have support for the ‘now()’ function?

    This way we could get a TimeUUID generated field without passing it in.

    1. Alex Liu says:

      yes, We will create some helper UDF for it.

  7. Cyril Scetbon says:

    As it’s not included in Cassandra trunk, what is the link pointing to the 2nd generation of the Hive Cassandra driver ?

    1. Alex Liu says:

      It’s in DSE 3.1 release which is a commercial platform integrating Cassandra with Hive, Solr and other open source projects. It’s free to use for development.

  8. karthigaimuthu says:

    Does pig require CFS to store and process the intermediate data to and from cassandra.With core distribution of apache pig I can’t perform any analytics on top of the hadoop using HDFS with Pig.

    1. Alex Liu says:

      The Pig driver comes with Cassandra doesn’t need CFS. It’s a general driver which should work for HDFS.

  9. James Schappet says:

    Alex, With the 1.2.8 token based paging, does pig go through all the tokens for a given cluster or just the tokens for the given column family?

    I have created a small version of my table in the same cluster (<1000 rows) to do some testing with PIG, but it's still taking the same time to load data, regardless of using the big table 5m rows or the small table <1000 rows.

    There is also another table in the Cluster with over 100m rows.

    1. Alex Liu says:

      It goes through all the tokens for a split (range of tokens) of a column family, the default split size is 64k.

      The default page size is 1000 rows, so you need to have more rows to test the performance

      1. Christopher Smith says:

        I’m finding cases where with very large tables it does not appear to be splitting the loads. I’m not specifying any of the tuning parameters in my loads, but I’m wondering… how does the split know what range of tokens will work out to 64K rows, or is supposed to be splitting on a range of 64K tokens?

      2. Christopher Smith says:

        I’m finding cases where with very large tables it does not appear to be splitting the loads. I’m not specifying any of the tuning parameters in my loads, but I’m wondering… how does the split know what range of tokens will work out to 64K rows, or is supposed to be splitting on a range of 64K tokens?

  10. Student says:

    Hey,

    I want to store a CQL3-Map from Pig. What is the data format I have to choose in Pig? The CQL3-Table is:

    CREATE TABLE test
    (firstname VARCHAR,
    surname VARCHAR,
    averagekm DOUBLE,
    firstyear INT,
    lastyear INT,
    km MAP,
    PRIMARY KEY (firstname, surname)
    );

  11. aaron says:

    Hi I am wondering if cql3 has limit feature.

    For example: lets say I have a table with 1000 rows. So, when I run my mapreduce project, I want the mapper to only fetch “x’ number of rows, lets say I want mapper to read only 100 rows from the table.

    I was thinking “CqlConfigHelper.setInputCQLPageRowSize” will do the trick but pagerowsize reads all the rows from the table but reads 100 rows at a time.

    Any help would be great on how I can achieve this.

    Thank you

  12. Diallous says:

    Hi,
    I would like to know how to use slice_start and slice_end in load comand from pig Load ‘…’ Using cassandraStotage().
    for a simple user table like this :
    CREATE TABLE user (
    id int,
    email text,
    name text,
    PRIMARY KEY (id)
    )
    which have values :
    (1, ‘first’, ‘first@mail.com’)
    (2, ‘second’, ‘second@mail.com’)
    (3, ‘third’, ‘third@mail.com’)
    (4, ‘fourth’, ‘fourth@mail.com’)
    (5, ‘fifth’, ‘fifth@mail.com”)

    If I want to query data (names & emails) of user table using pig load function and where_clause (id in 2, 3 ,4)

    grunt> myrows = LOAD ‘cassandra://myKeyspace/user?slice_start=?&slice_end=?&limit=?&reversed=true’ USING CassandraStorage();
    [&output_query=][&where_clause=]

    Somebody can help me on the exact syntaxe to create this query in pig

    grunt > myRows = grunt> myrows = LOAD ‘cassandra://myKeyspace/user?…………………………………………’ USING CassandraStorage();

    Thanks!

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>