DataStax Enterprise 3.1 Documentation

Using Pig

This documentation corresponds to an earlier product version. Make sure this document corresponds to your version.

Latest DSE documentation | Earlier DSE documentation

DataStax Enterprise (DSE) includes a CassandraFS enabled Apache Pig Client. Pig is a data flow language and platform for exploring big data sets. The language, Pig Latin, uses relations, which are similar to tables. The tuples in a relation correspond to the rows in a table. However, unlike a relational database table, Pig relations do not require every tuple to contain the same number of fields or fields in the same position (column) to be of the same type. Using Pig, you can devise logic for data transformations, such as filtering data and grouping relations. The transformations occur during the MapReduce phase. The Apache Pig documentation contains more information about defining and working with Pig relations.

Configure the job tracker node for the node running Pig as you would for any analytics (Hadoop) node. Use the dsetool commands to manage the job tracker. After configuration, Pig clients automatically select the correct job tracker node on startup. Pig programs are compiled into MapReduce jobs, executed in parallel by Hadoop, and run in a distributed fashion on a local or remote cluster.

Support for CQL collections

In DataStax Enterprise 3.1.4 and later, support for CQL collections has been added. Pig-supported types must be used. For example, a list cannot include columns of type decimal because the version of Pig integrated in DataStax 3.1.4 does not support this type. A list of strings or ints works fine.

CQL 3 pushdown filter

DataStax 3.1.4 includes a CqlStorage URL option, use_secondary. Setting the option to true optimizes the processing of the data by moving filtering expressions in Pig as close to the data source as possible. To use this capability:

  • Create a secondary index for the Cassandra table.

    For Pig pushdown filtering, the secondary index must have the same name as the column it is indexing.

  • Include the use_secondary option with a value of true in the url format for CqlStorage. For example:

    newdata = LOAD 'cql://ks/cf_300000_keys_50_cols?use_secondary=true'
                USING CqlStorage();
    

Formatting Pig data

These examples include how to format Pig data, which differs slightly from DataStax Enterprise 3.1.2-3.1.3. The syntax in 3.1.4 has been simplified and .value is no longer needed to specify the values to be formatted:

  • DataStax Enterprise 3.1.2-3.1.3

    grunt> insertformat= FOREACH morevalues GENERATE
             TOTUPLE(TOTUPLE('a',x.value)),TOTUPLE(y.value);
    
  • DataStax Enterprise 3.1.4

    grunt> insertformat= FOREACH morevalues GENERATE
             TOTUPLE(TOTUPLE('a',x)),TOTUPLE(y);
    

Running the Pig demo

Three examples demonstrate how to use Pig on DataStax Enterprise 3.1.4 and later to work with CQL 3 tables in Cassandra 1.2.6.x and later.

Start Pig

  1. First, start DataStax Enterprise as an analytics (Hadoop) node.

    • Packaged installations

      Set HADOOP_ENABLED=1 in the /etc/default/dse file, and then use this command to start an analytics node:

      sudo service dse start
      
    • Binary installations

      <DSE install directory>bin/dse cassandra -t
      
  2. Start the Pig shell:

    • Packaged installations

      dse pig

    • Binary installations

      From the installation directory:

      bin/dse pig

    The Pig grunt prompt appears, and you can now enter Pig commands.

Example: Save Pig relations from/to Cassandra

For Pig to access data in Cassandra, the target keyspace and table must already exist. Pig can save data from a Pig relation to a table in Cassandra and from a Cassandra table to a pig relation, but it cannot create the table.

  1. Start cqlsh.

  2. Using cqlsh, create and use a keyspace named, for example, cql3ks.

    cqlsh> CREATE KEYSPACE cql3ks WITH replication =
             {'class': 'SimpleStrategy', 'replication_factor': 1 };
    
    cqlsh> USE cql3ks;
    
  3. Create a two-column (a and b) Cassandra table named test and another two-column (x and y) table named moredata. Insert data into the tables.

    cqlsh:cql3ks> CREATE TABLE test (a int PRIMARY KEY, b int);
    cqlsh:cql3ks> CREATE TABLE moredata (x int PRIMARY KEY, y int);
    cqlsh:cql3ks> INSERT INTO test (a,b) VALUES (1,1);
    cqlsh:cql3ks> INSERT INTO test (a,b) VALUES (2,2);
    cqlsh:cql3ks> INSERT INTO test (a,b) VALUES (3,3);
    cqlsh:cql3ks> INSERT INTO moredata (x, y) VALUES (4,4);
    cqlsh:cql3ks> INSERT INTO moredata (x, y) VALUES (5,5);
    cqlsh:cql3ks> INSERT INTO moredata (x, y) VALUES (6,6);
    
  4. Using Pig, add logic to load the data (4, 5, 6) from the Cassandra moredata table into a Pig relation.

    grunt> moretestvalues= LOAD 'cql://cql3ks/moredata/' USING CqlStorage;
    
  5. Add the test table data to the relation. The key column is a chararray, 'a'.

    grunt> insertformat= FOREACH moretestvalues GENERATE
             TOTUPLE(TOTUPLE('a',x)),TOTUPLE(y);
    
  6. Save the relation to the Cassandra test table.

    grunt> STORE insertformat INTO
           'cql://cql3ks/test?output_query=UPDATE+cql3ks.test+set+b+%3D+%3F'
           USING CqlStorage;
    

    Pig uses a URL-encoded prepared statement to store the relation to Cassandra. The cql:// URL is followed by an output_query, which specifies which key should be used in the command. The rest of the arguments, the "?"s, for the prepared statement are filled in by the values related to that key in Pig.

  7. On the cqlsh command line, check that the test table now contains its original values plus the values from the moredata table:

    cqlsh:cql3ks> SELECT * from test;
    
    a | b
    --+--
    5 | 5
    1 | 1
    2 | 2
    4 | 4
    6 | 6
    3 | 3
    

Example: Handle a compound primary key

  1. Create a four-column (a, b, c, d) Cassandra table named compotable and another five-column (id, x, y, z, data) table named compmore. Insert data into the tables.

    cqlsh:cql3ks> CREATE TABLE compotable (
                    a int,
                    b int,
                    c text,
                    d text,
                    PRIMARY KEY (a,b,c)
                  );
    cqlsh:cql3ks> INSERT INTO compotable (a, b , c , d )
                    VALUES ( 1,1,'One','match');
    cqlsh:cql3ks> INSERT INTO compotable (a, b , c , d )
                    VALUES ( 2,2,'Two','match');
    cqlsh:cql3ks> INSERT INTO compotable (a, b , c , d )
                    VALUES ( 3,3,'Three','match');
    cqlsh:cql3ks> INSERT INTO compotable (a, b , c , d )
                    VALUES ( 4,4,'Four','match');
    
    cqlsh:cql3ks> create table compmore (
                    id int PRIMARY KEY,
                    x int,
                    y int,
                    z text,
                    data text
                  );
    cqlsh:cql3ks> INSERT INTO compmore (id, x, y, z,data)
                    VALUES (1,5,6,'Fix','nomatch');
    cqlsh:cql3ks> INSERT INTO compmore (id, x, y, z,data)
                    VALUES (2,6,5,'Sive','nomatch');
    cqlsh:cql3ks> INSERT INTO compmore (id, x, y, z,data)
                    VALUES (3,7,7,'Seven','match');
    cqlsh:cql3ks> INSERT INTO compmore (id, x, y, z,data)
                    VALUES (4,8,8,'Eight','match');
    cqlsh:cql3ks> INSERT INTO compmore (id, x, y, z,data)
                    VALUES (5,9,10,'Ninen','nomatch');
    
  1. Using Pig, add logic to load the data from the Cassandra compmore table to a Pig relation.

    grunt> moredata = load 'cql://cql3ks/compmore' USING CqlStorage;
    
  2. Add the fields from the table to a relation.

    grunt> insertformat = FOREACH moredata GENERATE TOTUPLE
             (TOTUPLE('a',x),TOTUPLE('b',y),
             TOTUPLE('c',z)),TOTUPLE(data);
    

    The data processed later during the MapReduce phase is formatted as follows:

    ((PartitionKey_Name,Value),(ClusteringKey_1_name,Value)...)(ArgValue1,ArgValue2,ArgValue3,...)

  1. Save the Pig relation to the Cassandra test table.

    grunt> STORE insertformat INTO 'cql://cql3ks/compotable?output_query=
             UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING CqlStorage;
    

    The STORE statement needs to be on one line with no space after output_query=. The cql:// URL includes a prepared statement, described later.

Example: Explore library data

This example uses library data from the Institute of Library and Museum Services, encoded in UTF-8 format. Download the formatted data for this example now. Two files are available to copy/paste code and run a pig script instead of stepping through this example manually. The files are located in these directories:

  • Packaged installation

    /usr/share/dse-demos/pig

  • Tarball installation

    <install-location>/demos/pig/cql

Using these files, you can:

  • Copy/paste the commands in steps 2-3 from this document or from the library-populate-cql.txt file.
  • Execute steps 7-10 automatically by running the library-cql.pig script.
  1. Unzip libdata.csv.zip and give yourself permission to access the downloaded file. On the Linux command line, for example:

    chmod 777 libdata.csv
    
  2. Create and use a keyspace called libdata.

    cqlsh:libdata> CREATE KEYSPACE libdata WITH replication =
                     {'class': 'SimpleStrategy', 'replication_factor': 1 };
    
    cqlsh:libdata> USE libdata;
    
  3. Create a table for the library data that you downloaded.

    cqlsh:libdata> CREATE TABLE libout ("STABR" TEXT, "FSCSKEY" TEXT, "FSCS_SEQ" TEXT,
                     "LIBID" TEXT, "LIBNAME" TEXT, "ADDRESS" TEXT, "CITY" TEXT,
                     "ZIP" TEXT, "ZIP4" TEXT, "CNTY" TEXT, "PHONE" TEXT, "C_OUT_TY" TEXT,
                     "C_MSA" TEXT, "SQ_FEET" INT, "F_SQ_FT" TEXT, "L_NUM_BM" INT,
                     "F_BKMOB" TEXT, "HOURS" INT, "F_HOURS" TEXT, "WKS_OPEN" INT,
                     "F_WKSOPN" TEXT, "YR_SUB" INT, "STATSTRU" INT, "STATNAME" INT,
                     "STATADDR" INT, "LONGITUD" FLOAT, "LATITUDE" FLOAT, "FIPSST" INT,
                     "FIPSCO" INT, "FIPSPLAC" INT, "CNTYPOP" INT, "LOCALE" TEXT,
                     "CENTRACT" FLOAT, "CENBLOCK" INT, "CDCODE" TEXT, "MAT_CENT" TEXT,
                     "MAT_TYPE" INT, "CBSA" INT, "MICROF" TEXT,
                     PRIMARY KEY ("FSCSKEY", "FSCS_SEQ"));
    
  4. Import data into the libout table from the libdata.csv file that you downloaded.

    cqlsh:libdata> COPY libout ("STABR","FSCSKEY","FSCS_SEQ","LIBID","LIBNAME",
                     "ADDRESS","CITY","ZIP","ZIP4","CNTY","PHONE","C_OUT_TY",
                     "C_MSA","SQ_FEET","F_SQ_FT","L_NUM_BM","F_BKMOB","HOURS",
                     "F_HOURS","WKS_OPEN","F_WKSOPN","YR_SUB","STATSTRU","STATNAME",
                     "STATADDR","LONGITUD","LATITUDE","FIPSST","FIPSCO","FIPSPLAC",
                     "CNTYPOP","LOCALE","CENTRACT","CENBLOCK","CDCODE","MAT_CENT",
                     "MAT_TYPE","CBSA","MICROF") FROM 'libdata.csv' WITH HEADER=TRUE;
    

    In the FROM clause of the COPY command, use the path to libdata.csv in your environment.

  5. Check that the libout table contains the data you copied from the downloaded file.

    cqlsh:libdata> SELECT count(*) FROM libdata.libout LIMIT 20000;
    
      count
     -------
      17598
    
  6. Create a table to hold results of Pig relations.

    cqlsh:libdata> CREATE TABLE libsqft (
                     year INT,
                     state TEXT,
                     sqft BIGINT,
                     PRIMARY KEY (year, state)
                   );
    
  7. Using Pig, add a plan to load the data from the Cassandra libout table to a Pig relation.

    grunt> libdata = LOAD 'cql://libdata/libout' USING CqlStorage();
    
  8. Add logic to remove data about outlet types other than books-by-mail (BM). The C_OUT_TY column uses BM and other abbreviations to identify these library outlet types:

    • CE–Central Library
    • BR–Branch Library
    • BS–Bookmobile(s)
    • BM–Books-by-Mail Only
    grunt> book_by_mail = FILTER libdata BY C_OUT_TY == 'BM';
    grunt> DUMP book_by_mail;
    
  9. Add logic to filter the data based on library buildings and sum square footage of the buildings. Group data by state. The STABR column contains the state codes.

    grunt> libdata_buildings = FILTER libdata BY SQ_FEET > 0;
    grunt> state_flat = FOREACH libdata_buildings GENERATE
                          STABR AS State,SQ_FEET AS SquareFeet;
    grunt> state_grouped = GROUP state_flat BY State;
    grunt> state_footage = FOREACH state_grouped GENERATE
                             group as State,SUM(state_flat.SquareFeet)
                             AS TotalFeet:int;
    grunt> DUMP state_footage;
    

    The MapReduce job completes successfully and the output shows the square footage of the buildings.

  10. Add logic to filter the data by year, state, and building size, and save the relation to Cassandra using the cql:// URL. The URL includes a prepared statement, described later.

    grunt> insert_format= FOREACH state_footage GENERATE
             TOTUPLE(TOTUPLE('year',2011),TOTUPLE('state',State)),TOTUPLE(TotalFeet);
    grunt> STORE insert_format INTO 'cql://libdata/libsqft?output_query=UPDATE%20libdata.
             libsqft%20SET%20sqft%20%3D%20%3F' USING CqlStorage;
    

    When the MapReduce job completes, a message appears that the records were written successfully.

  11. In CQL, query the libsqft table to see the Pig results now stored in Cassandra.

    SELECT * FROM libdata.libsqft;
    
     year | state | sqft
    ------+-------+----------
     2011 |    AK |   570178
     2011 |    AL |  2792246
    
     . . .
    
     2011 |    WV |  1075356
     2011 |    WY |   724821
    

Data access using Pig

DataStax Enterprise includes a custom storage handler for Cassandra that you use to execute Pig programs directly on data stored in Cassandra. The DataStax Enterprise Pig driver uses the Cassandra File System (CassandraFS) instead of the Hadoop distributed file system (HDFS). Apache Cassandra, on the other hand, includes a Pig driver that uses the Hadoop Distributed File System (HDFS).

In DataStax Enterprise 3.1.2 and later, use one of these storage handlers and urls to transform Cassandra data using Pig:

Table Format Storage Handler URL
CQL CqlStorage() cql://
storage engine CassandraStorage() cassandra://

Working with legacy Cassandra tables

You use the CassandraStorage() handler and cfs:// url instead of CqlStorage() and cql:// to work with Cassandra tables that are in the storage engine (CLI/Thrift) format in Pig. Legacy tables are created using the WITH COMPACT STORAGE directive in CQL or are created using Thrift or the CLI.

Using the CqlStorage handler

To use data in CQL tables created by any version of Cassandra, use the CqlStorage() handler and cql:// url. To access data in the CassandraFS, the target keyspace and table must already exist. Data in a Pig relation can be stored in a Cassandra table, but Pig will not create the table.

DataStax Enterprise supports these Pig data types:

  • int
  • long
  • float
  • double
  • boolean
  • chararray

The Pig LOAD statement pulls Cassandra data into a Pig relation through the storage handler. The format of the Pig LOAD statement is:

<pig_relation_name> = LOAD 'cql://<keyspace>/<table>'
                        USING CqlStorage();

The Pig demo examples include using the LOAD command.

LOAD schema

The LOAD Schema is:

(colname:colvalue, colname:colvalue, … )

where each colvalue is referenced by the Cassandra column name.

Saving a Pig relation to Cassandra

The Pig STORE command pushes data from a Pig relation to Cassandra through the CqlStorage handler:

STORE <relation_name> INTO 'cql://<keyspace>/<column_family>?<prepared statement>'
  USING CqlStorage();

URL format

The url format for CqlStorage is:

cql://[username:password@]<keyspace>/<columnfamily>[?
  [page_size=<size>]
  [&columns=<col1,col2>]
  [&output_query=<prepared_statement_query>]
  [&where_clause=<clause>]
  [&split_size=<size>]
  [&partitioner=<partitioner>]
  [&use_secondary=true|false]]

Where:

  • page_size -- the number of rows per page
  • columns -- the select columns of CQL query
  • where_clause -- the where clause on the index columns, which needs url encoding
  • split_size -- number of rows per split
  • partitioner -- Cassandra partitioner
  • use_secondary -- to enable pig filter partition push down
  • output_query -- the CQL query for writing in a prepared statement format

Store schema

The input schema for Store is:

(value, value, value)

where each value schema has the name of the column and value of the column value.

The output schema for Store is:

(((name, value), (name, value)), (value ... value), (value ... value))

where the first tuple is the map of partition key and clustering columns. The rest of the tuples are the list of bound values for the output in a prepared CQL query.

Creating a URL-encoded prepared statement

The Pig demo examples show the steps required for setting up a prepared CQL query:

  1. Format the data for the query.
  2. Construct the prepared output_query portion of the cql:// URL, and execute the query.

First, format the Cassandra data for the prepared query, and then construct and execute the query.

Format the data

The example of saving Pig relations from/to Cassandra shows the output schema: the name of the test table primary key 'a', represented as a chararray in the relation is paired with a value in the moredata table. In this case, the key for test table is only a partitioning key, and only a single tuple is needed.

The Pig statement to add (moredata) fields to the relation is:

grunt> insertformat= FOREACH morevalues GENERATE
         TOTUPLE(TOTUPLE('a',x)),TOTUPLE(y);

The example of exploring library data works with more complicated data, a partition key and clustering column:

grunt> insertformat = FOREACH moredata GENERATE
         TOTUPLE(TOTUPLE('a',x),TOTUPLE('b',y),TOTUPLE('c',z)),TOTUPLE(data);

Construct the prepared query

The output query portion of the cql:// URL is the prepared statement. The prepared statement must be url-encoded to make special characters readable by Pig.

The example of saving Pig relations from/to Cassandra shows how to construct a prepared query:

'cql://cql3ks/test?output_query=UPDATE+cql3ks.test+set+b+%3D+%3F'

The key values of the test table are automatically transformed into the 'WHERE (key) =' clause to form the output_query portion of a prepared statement.

Execute the query

To update the test table using the values in the moredata table (4-6), the prepared statement is executed using these WHERE clauses when the MapReduce job runs:

... WHERE a = 5
... WHERE a = 4
... WHERE a = 6

This output_query in Pig statement forms the '...' url-encoded portion of the prepared statement:

grunt> STORE insertformat INTO
         'cql://cql3ks/test?output_query=UPDATE+cql3ks.test+set+b+%3D+%3F'
         USING CqlStorage;

Unencoded the UPDATE statement is:

UPDATE cql3ks.test set b = ?

The prepared statement represents these queries:

UPDATE cql3ks.test set b = 5 WHERE a = 5;
UPDATE cql3ks.test set b = 4 WHERE a = 4;
UPDATE cql3ks.test set b = 6 WHERE a = 6;