DataStax Enterprise 3.0 Documentation

Using Pig

This documentation corresponds to an earlier product version. Make sure this document corresponds to your version.

Latest DSE documentation | Earlier DSE documentation

DataStax Enterprise (DSE) includes a CassandraFS-enabled Apache Pig Client. Pig is a platform for analyzing large data sets that uses a high-level language (called Pig Latin) for expressing data analysis programs. Pig Latin lets developers specify a sequence of data transformations such as merging data sets, filtering them, and applying functions to records or groups of records. Pig comes with many built-in functions and allows you to develop your own functions for special-purpose processing. Documentation for Pig Latin is available from Apache.

Pig Latin programs run in a distributed fashion on a DSE cluster. These programs are complied into MapReduce jobs and executed using Hadoop. When using Pig with DSE, all jobs can be run in MapReduce mode, even on a single-node cluster. Since all Hadoop nodes are peers in DSE (no Name Node), there is no concept of local mode for Pig. DSE Pig includes a custom storage handler for Cassandra that allows you to run Pig programs directly on data stored in Cassandra. The native Pig storage handler stores data in CassandraFS, the Cassandra-enabled Hadoop distributed file system.

Setting the Job tracker node for Pig

Pig Latin programs are compiled into sequences of MapReduce jobs that are run in parallel. Jobs are submitted to the job tracker node for the DataStax Enterprise cluster. Pig clients will automatically select the correct job tracker node on startup. You set the job tracker node for Pig as you would for any analytics (Hadoop) node and use the dsetool commands to manage the job tracker.

Starting Pig

DataStax Enterprise must be running as an analytics (Hadoop) node. See Starting and stopping DataStax Enterprise.

Start the Pig shell as follows:

Packaged installs: dse pig

Binary installs: <install_location>/bin/dse pig

After Pig starts the prompt changes to grunt. Be sure to use ";" for the shell commands.

Working in DSE Pig

DataStax Enterprise allows you to use Pig with data stored in CassandraFS just as you would in a regular Hadoop implementation (using the default Pig storage handler). Pig Latin statements work with relations. A Pig relation is defined as follows:

  • A relation is a bag (more specifically, an outer bag).
  • A bag is a collection of tuples.
  • A tuple is an ordered set of fields.
  • A field is a piece of data.

A Pig relation is similar to a table in a relational database, where the tuples in the bag correspond to the rows in a table. However, unlike a relational table, Pig relations do not require that every tuple contain the same number of fields or that the fields in the same position (column) be of the same type. Pig relations resemble Cassandra tables more than relational tables. The Apache Pig documentation contains more information on defining and working with Pig relations.

Using Pig to access data in Cassandra

DSE uses a custom storage handler, CassandraStorage() that allows direct access to data stored in Cassandra through Pig. To access data in Cassandra, the target keyspace and table must already exist. Pig can read and write data in a Cassandra table, but will not create the table.

The Pig LOAD command pulls data into a Pig relation from Cassandra via the CassandraStorage handler. You do not need to specify type information as it is automatically inferred. For a regular table, the format of the Pig LOAD command is as follows:

<pig_relation_name> = LOAD 'cassandra://<keyspace>/<column_family>'
USING CassandraStorage();

The Pig STORE command pushes data from a Pig relation to Cassandra via the CassandraStorage handler:

STORE <relation_name> INTO 'cassandra://<keyspace>/<column_family>'
  USING CassandraStorage();

Running the Pig demo

Pig operates on data stored in the Hadoop distributed file system (or CassandraFS in DSE). Your DSE installation contains sample data for running the Pig demo examples. The sample data file contains tuples of two fields each (name and score). Using Pig, the examples demonstrate creating a Pig relation and performing a simple MapReduce job that calculates the total score for each user. Result output can then be stored back into CassandraFS or into a Cassandra table.

Loading Pig sample data into CassandraFS

The Pig sample data file is located in /usr/share/dse-demos/pig/files/example.txt for packaged installations or <install_location>/demos/pig/files/example.txt for binary installations.

  1. Load the Pig sample data file into CassandraFS:

    • Packaged install: dse hadoop fs -put files/example.txt /
    • Binary installs: <install_location>/bin/dse hadoop fs -put files/example.txt /
  2. Verify that the file is present:

    dse hadoop fs -ls /
    

Creating a Pig relation from a data file

In this section you create a relation called score_data, which defines a schema of two fields (or columns) - named name and score. The LOAD command loads the relation with the data from the example.txt file stored in CassandraFS. The USING PigStorage() clause is optional, since this is already the default storage handler for Pig.

  1. Start Pig.

  2. Load the relation with data from the example.txt file:

    grunt> score_data = LOAD 'cfs:///example.txt' USING PigStorage()
             AS (name:chararray, score:long);
    
  3. View the tuples stored in the relation:

    grunt> DUMP score_data;
    

Running a MapReduce job in Pig

In this example, you take the raw data you loaded into the score_data relation and perform a number of calculations on the data using the Pig built-in relational operators. Intermediate results are also stored in Pig relations.

  1. GROUP the tuples in the score_data relation by the name field, and store the results in a relation called name_group:

    grunt> name_group = GROUP score_data BY name PARALLEL 3;
    

    The PARALLEL keyword controls how many reducers are used.

  2. Use the FOREACH operator to calculate the total score for each user grouping in the name_group relation, and store the results in a relation called name_total:

    grunt> name_total = FOREACH name_group GENERATE group, COUNT(score_data.name),
       LongSum(score_data.score) AS total_score;
    
  3. Order the results in descending order by total score and store the results in a relation called ordered_scores:

    grunt> ordered_scores = ORDER name_total BY total_score DESC PARALLEL 3;
    
  4. You can either output the results to standard output (DUMP) or to a file in CassandraFS (STORE):

    grunt> DUMP ordered_scores;
    grunt> STORE ordered_scores INTO 'cfs:///final_scores.txt' USING PigStorage();
    

    The USING clause is optional in this case, since PigStorage() is already the default storage handler.

Creating the PigDemo keyspace in Cassandra

For Pig to access data in Cassandra, the target keyspace and table must already exist. (Pig can read and write data from/to a table in Cassandra, but it will not create the table).

To create the PigDemo keyspace and Scores table used in the following examples, run the following commands in the cassandra-cli utility.

  1. Start the cassandra-cli utility:

    cassandra-cli
    

    or in a binary distribution:

    <install_location>/resources/cassandra/bin/cassandra-cli
    
  2. Connect to a node in your DSE cluster on port 9160:

    [default@unknown] connect <server-_ip_address>/9160
    

    Examples:

    [default@unknown] connect 110.82.155.4/9160
    [default@unknown] connect localhost/9160
    
  3. Create the PigDemo keyspace.

    [default@unknown] CREATE KEYSPACE PigDemo
       WITH placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy'
       AND strategy_options = [{replication_factor:1}];
    

    Note

    The default replication for system keyspaces is 1. This replication factor is suitable for development and testing, not for a production environment. For more information, see Changing replication settings.

  4. Connect to the PigDemo keyspace you just created.

    [default@unknown] use PigDemo;
    
  5. Create the Scores column family.

    [default@unknown] create column family Scores with comparator = 'LongType';
    
  6. Exit cassandra-cli:

    [default@unknown] exit;
    

Writing data to a Cassandra table

In this example, the scores example data is loaded into CassandraFS (see Loading Pig sample data into CassandraFS). This data consists of tuples containing twp fields (name and score). For a Cassandra table, you must store three fields: the row key (name), the column name (score), and the column value (empty).

To calculate the total score for each user in the same manner as in the Running a MapReduce job in Pig example, you have to account for the empty field.

Run these commands from the Pig shell.

  1. If you have not already created the score_data relation from the example.txt file stored in CassandraFS:

    grunt> score_data = LOAD 'cfs:///example.txt' AS (name:chararray, score:long);
    
  2. Create a relation called cassandra_tuple:

    grunt> cassandra_tuple = FOREACH score_data GENERATE name, score, '' AS value;
    

    This command defines a tuple of three fields for Cassandra (row key, column name, column value). The column value must be an empty string as using null is the equivalent to deleting.

  3. Group by name and store the results into a relation called group_by_name:

    grunt> group_by_name = GROUP cassandra_tuple BY name PARALLEL 3;
    

    The PARALLEL keyword controls how many reducers are used.

  4. Create an aggregated row for each user containing tuples of their scores and store the results in a relation called aggregate_scores:

    grunt> aggregate_scores = FOREACH group_by_name GENERATE group,
             cassandra_tuple.(score, value);
    grunt> DUMP aggregate_scores;
    

    Notice how the data was aggregated for input into Cassandra. A tuple was constructed for each Cassandra row. In Pig notation, a tuple is enclosed in parentheses ( ). Within each row tuple is a bag of column tuples where each column tuple represents an individual score. An inner bag is enclosed in curly brackets { }, so a Pig tuple that represents a row in a table is structured as:

    (<row_key>,{(<column_name1>,<value1>),(<column_name2>,<value2>)})

    Note that the value is empty to create a value-less column in Cassandra:

    (jdoe,{(36,),(128,)})
  5. Now that the data is in a format that can map to the Cassandra table, store the Pig results into Cassandra using the CassandraStorage handler:

    grunt> STORE aggregate_scores INTO 'cassandra://PigDemo/Scores' USING CassandraStorage();
    

    The INTO clause specifies where to store the data in Cassandra. The format is cassandra://<keyspace>/<column_family>.

Reading data from a Cassandra table

You must have previously completed the examples in Writing data to a Cassandra table to run this example. In this example, you calculate the total scores for each user and group the raw score data into rows by user:

  1. Create a Pig relation called cassandra_data by loading rows from the Cassandra table:

    grunt> cassandra_data = LOAD 'cassandra://PigDemo/Scores' USING CassandraStorage()
             AS (name, columns: bag {T: tuple(score, value)});
  2. Use the FOREACH operator to calculate the total score for each user, and store the results in a relation called total_scores:

    grunt> total_scores = FOREACH cassandra_data GENERATE name, COUNT(columns.score),
           SUM(columns.score) as total PARALLEL 3;
    
  3. Order the results in descending order by total score, and store the results in a relation called ordered_scores:

    grunt> ordered_scores = ORDER total_scores BY total DESC PARALLEL 3;
    grunt> DUMP ordered_scores;