DataStax Enterprise 2.1 Documentation

Getting Started with Pig in DataStax Enterprise

This document corresponds to an earlier product version. Make sure you are using the version that corresponds to your version.

Latest DSE documentation | Earlier DSE documentation

DataStax Enterprise (DSE) includes a CassandraFS-enabled Apache Pig Client. Pig is a platform for analyzing large data sets that uses a high-level language (called Pig Latin) for expressing data analysis programs. Pig Latin lets developers specify a sequence of data transformations such as merging data sets, filtering them, and applying functions to records or groups of records. Pig comes with many built-in functions, but developers can also create their own user-defined functions for special-purpose processing. Documentation for Pig Latin is available from Apache.

Pig Latin programs run in a distributed fashion on a DSE cluster (programs are complied into MapReduce jobs and executed using Hadoop). When using Pig with DSE, all jobs can be run in MapReduce mode (even on a single-node cluster). Since all Hadoop nodes are peers in DSE (no Name Node), there is no concept of local mode for Pig. DSE Pig includes a custom storage handler for Cassandra that allows you to run Pig programs directly on data stored in Cassandra. The native Pig storage handler stores data in CassandraFS (the Cassandra-enabled Hadoop distributed file system).

Setting the Job Tracker Node for Pig

Pig Latin programs are compiled into sequences of MapReduce jobs that are run in parallel. Jobs are submitted to the job tracker node for the DataStax Enterprise cluster. Pig clients will automatically select the correct job tracker node upon startup. You set the job tracker node for pig as you would for any analytics node and use the dsetool commands to manage the job tracker.

Starting Pig

When you install DSE using the packaged distributions, you can start the Pig shell (grunt) as follows:

dse pig

or in a binary distribution:

<install_location>/bin/dse pig

Working in DSE Pig

DSE allows you to use Pig with data stored in CassandraFS just as you would in a regular Hadoop implementation (using the default Pig storage handler). Pig Latin statements work with relations. A relation can be defined as follows:

  • A relation is a bag (more specifically, an outer bag).
  • A bag is a collection of tuples.
  • A tuple is an ordered set of fields.
  • A field is a piece of data.

A Pig relation is a bag of tuples. A Pig relation is similar to a table in a relational database, where the tuples in the bag correspond to the rows in a table. Unlike a relational table, however, Pig relations do not require that every tuple contain the same number of fields or that the fields in the same position (column) be of the same type. So in a way, Pig relations are more similar to Cassandra column families than they are to a relational table.

The Apache Pig documentation contains more information on defining and working with Pig relations.

Using Pig to Access Data in Cassandra

DSE uses a custom storage handler, CassandraStorage() to allow direct access to data stored in Cassandra through Pig. In order to access data in Cassandra, the target keyspace and column family must already exist (Pig can read and write data from/to a column family in Cassandra, but it will not create the column family if it does not already exist).

Using the Pig LOAD command, you pull data into a Pig relation from Cassandra via the CassandraStorage handler. When pulling data from Cassandra, you do not need to specify type information as it is automatically inferred from the column family comparators and validators.

The format of the Pig LOAD command is as follows for a regular column family:

<pig_relation_name> = LOAD 'cassandra://<keyspace>/<column_family>'
USING CassandraStorage();

Using the Pig STORE command, you push data from a Pig relation to Cassandra via the CassandraStorage handler. You can then push a Pig relation from Pig to Cassandra as follows:

STORE <relation_name> INTO 'cassandra://<keyspace>/<column_family>'
  USING CassandraStorage();

Running the Pig Demo

Pig operates on data stored in the Hadoop distributed file system (or CassandraFS in DSE). Your DSE installation contains sample data that you can use to run the Pig examples documented in this section. The sample data file contains tuples of two fields each (name and score). Using Pig, the examples in this section show how to create a Pig relation and perform a simple MapReduce job to calculate the total score for each user. Result output can then be stored back into CFS or into a Cassandra column family.

Loading Pig Sample Data Into CFS

The Pig sample data file is located in /usr/share/dse-demos/pig/files/example.txt for packaged installations or <install_location>/demos/pig/files/example.txt for binary installations.

To load the Pig sample data file into CFS:

dse hadoop fs -put /usr/share/dse-demos/pig/files/example.txt /

or in a binary distribution:

dse hadoop fs -put <install_location>/demos/pig/files/example.txt /

Creating a Pig Relation from a Data File

Here we are creating a relation called score_data that defines a schema of two fields (or columns) - named name and score. Using the LOAD command, we are loading the relation with data in the example.txt file stored in CFS. The USING PigStorage() clause is optional, since this is already the default storage handler for Pig.

grunt> score_data = LOAD 'cfs:///example.txt' USING PigStorage()
         AS (name:chararray, score:long);

To see the tuples stored in the relation:

grunt> DUMP score_data;

Running a MapReduce Job in Pig

In this example, we take the raw data we loaded into the score_data relation, and perform a number of calculations on the data using the Pig built-in relational operators. Intermediate results are also stored in Pig relations.

First we GROUP the tuples in the score_data relation by the name field, and store the results in a relation called name_group. The PARALLEL keyword controls how many reducers are used.

grunt> name_group = GROUP score_data BY name PARALLEL 3;

Next we use the FOREACH operator to calculate the total score for each user grouping in the name_group relation, and store the results in a relation called name_total.

grunt> name_total = FOREACH name_group GENERATE group, COUNT(score_data.name),
   LongSum(score_data.score) AS total_score;

Finally we order the results in descending order by total score and store the results in a relation called ordered_scores.

grunt> ordered_scores = ORDER name_total BY total_score DESC PARALLEL 3;

Then if we wanted to output the final results, we could use the DUMP command to send the results to standard output. Or we could use the STORE command to output the results to a file in CFS. The USING clause is optional in this case, since PigStorage() is already the default storage handler.

grunt> DUMP ordered_scores;
grunt> STORE ordered_scores INTO 'cfs:///final_scores.txt' USING PigStorage();

Creating the PigDemo Keyspace in Cassandra

In order for Pig to access data in Cassandra, the target keyspace and column family must already exist (Pig can read and write data from/to a column family in Cassandra, but it will not create the column family if it does not already exist).

To create the PigDemo keyspace and Scores column family used in the following examples, run the following commands in the cassandra-cli utility.

  1. Start the cassandra-cli utility:

    cassandra-cli
    

    or in a binary distribution:

    <install_location>/resources/cassandra/bin/cassandra-cli
    
  2. Connect to a node in your DSE cluster on port 9160. For example:

    [default@unknown] connect 110.82.155.4/9160
    

    or if running on a single-node cluster as localhost:

    [default@unknown] connect localhost/9160
    
  3. Create the PigDemo keyspace.

    [default@unknown] create keyspace PigDemo with
        placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy'
        and strategy_options = [{replication_factor:1}];
    
  4. Connect to the PigDemo keyspace you just created.

    [default@unknown] use PigDemo;
    
  5. Create the Scores column family.

    [default@unknown] create column family Scores with comparator = 'LongType';
    
  6. Exit cassandra-cli:

    [default@unknown] exit;
    

Writing Data to a Cassandra Column Family

In this example, we are using the scores example data loaded into CFS (see Loading Pig Sample Data Into CFS). This data has tuples containing 2 fields (name and score). For a Cassandra column family, however, we need to store 3 fields: the row key (name), the column name (score), and the column value (an empty value in this case).

We want to calculate the total score for each user in the same manner as we did in the Running a MapReduce Job in Pig example, however in this example our relations contain an extra empty field for the column value.

To run these commands, start the Pig shell if you do not have it running (see Starting Pig).

  1. If you have not already, create the score_data relation from the example.txt file stored in CFS.

    grunt> score_data = LOAD 'cfs:///example.txt' AS (name:chararray, score:long);
    
  2. Create a relation called cassandra_tuple to define a tuple of three fields for Cassandra (row key, column name, column value). In this case, the column value is an empty string (using null would be equivalent to a delete).

    grunt> cassandra_tuple = FOREACH score_data GENERATE name, score, '' AS value;
    
  3. Group by name and store the results into a relation called group_by_name. The PARALLEL keyword controls how many reducers are used.

    grunt> group_by_name = GROUP cassandra_tuple BY name PARALLEL 3;
    
  4. Create an aggregated row for each user containing tuples of their scores and store the results in a relation called aggregate_scores.

    grunt> aggregate_scores = FOREACH group_by_name GENERATE group,
             cassandra_tuple.(score, value);
    grunt> DUMP aggregate_scores;
    

    Notice how the data was aggregated for input into Cassandra. A tuple was constructed for each Cassandra row. In Pig notation, a tuple is enclosed in parentheses ( ). Within each row tuple, is a bag of column tuples - each column tuple representing an individual score. A bag is a collection of tuples in Pig. In Pig notation an inner bag is enclosed in curly brackets { }. So a Pig tuple that represents a row in a column family is structured as:

    (<row_key>,{(<column_name1>,<value1>),(<column_name2>,<value2>)})

    Note that in this example, the value is empty (creating a value-less column in Cassandra):

    (brandon,{(36,),(128,)})
  5. Now that the data is in a format that can map to the Cassandra column family, we can store the Pig results into Cassandra using the CassandraStorage handler. The INTO clause specifies where to store the data in Cassandra in the format of: cassandra://<keyspace>/<column_family>

    grunt> STORE aggregate_scores INTO 'cassandra://PigDemo/Scores' USING CassandraStorage();
    

Reading Data From a Cassandra Column Family

The examples in this section assume you have completed Writing Data to a Cassandra Column Family to group the raw score data into rows by user and load it into Cassandra. In this example, we calculate the total scores for each user.

  1. First create a Pig relation called cassandra_data by loading rows from the Cassandra column family:

    grunt> cassandra_data = LOAD 'cassandra://PigDemo/Scores' USING CassandraStorage()
             AS (name, columns: bag {T: tuple(score, value)});
  2. Use the FOREACH operator to calculate the total score for each user, and store the results in a relation called total_scores.

    grunt> total_scores = FOREACH cassandra_data GENERATE name, COUNT(columns.score),
           SUM(columns.score) as total PARALLEL 3;
    
  3. Order the results in descending order by total score and store the results in a relation called ordered_scores.

    grunt> ordered_scores = ORDER total_scores BY total DESC PARALLEL 3;
    grunt> DUMP ordered_scores;