DataStax Enterprise 3.1 Documentation

Using Hive

This documentation corresponds to an earlier product version. Make sure this document corresponds to your version.

Latest DSE documentation | Earlier DSE documentation

DataStax Enterprise includes Hive, a data warehouse system for Hadoop that converts most HiveQL queries into MapReduce jobs for execution on a DataStax Enterprise analytics node. HiveQL is a SQL-like language that organizes data into table structures for storage in the Cassandra File System (CassandraFS). DataStax Enterprise analytics nodes store Hive table structures in the CassandraFS instead of in the Hadoop Distributed File System (HDFS). The table metadata is stored in the metastore database.


../../_images/hive_tables.png

hive_tables.png

Unlike open source Hive, there is no need to run the metastore as a standalone database to support multiple users. DataStax Enterprise implements the Hive metastore as a keyspace within Cassandra. The metastore supports multiple users and requires no configuration except increasing the default replication factor of the keyspace.

Setting the Job Tracker node for Hive

Hive clients automatically select the correct job tracker node upon startup. You change the job tracker node for Hive as you would for any analytics node, and you use the dsetool commands to manage the job tracker.

Note

The default replication for system keyspaces is 1. This replication factor is suitable for development and testing, not for a production environment. For production, increase the replication factors for the HiveMetaStore and cfs keyspaces to at least 2.

Hive storage handlers

You use a Hive client and custom storage handlers to access data in Cassandra. DataStax Enterprise maps existing Cassandra tables into Hive tables. To store Hive tables in the Cassandra file system (CassandraFS) as CQL tables, DataStax Enterprise 3.1 and later provides the CQL 3 storage handler: org.apache.hadoop.hive.cassandra.cql3.CqlStorageHandler.

For information about using Hive with legacy tables, such as those created using Thrift or the CLI, see DataStax Enterprise 3.0 documentation.

Hive to Cassandra type mapping

This table shows CQL, Cassandra internal storage engine, and Hive data type mapping:

CQL 3 Cassandra Internal Hive
text/varchar UTF8Type string
ascii AsciiType string
timestamp DateType timestamp
bigint LongType bigint
int Int32Type int
double DoubleType double
float FloatType float
boolean BooleanType boolean
uuid UUIDType binary
timeuuid TimeUUIDType binary
other other binary

TBLPROPERTIES and SERDEPROPERTIES

The TBLPROPERTIES clause specifies CassandraFS and MapReduce properties for the table. The SERDEPROPERTIES clause specifies the properties used when serializing/deserializing data passed between the Hive table and Cassandra. You can add a WITH SERDEPROPERTIES clause to map meaningful column names in Hive to the Cassandra partition key, column names and column values.

The following properties can be declared in the TBLPROPERTIES or SERDEPROPERTIES clause or both. You can change Hive storage properties listed in the following table on the fly using DataStax Enterprise 2.1 and later. Using the Hive SET command, set properties in the hive session. The settings become effective for the next query.

Property Used in Hive Clause Description
cassandra.cf.name TBL and SERDE Cassandra table name
cassandra.columns.mapping TBL Mapping of Hive to legacy Cassandra columns
cassandra.consistency.level TBL and SERDE Consistency level - default LOCAL_ONE
cassandra.cql3.type TBL and SERDE CQL types
cassandra.host TBL and SERDE IP of a Cassandra node to connect to
cassandra.ks.name TBL and SERDE Cassandra keyspace name
cassandra.ks.repfactor TBL and SERDE Cassandra replication factor - default 1
cassandra.ks.strategy TBL and SERDE Replication strategy class
cassandra.ks.stratOptions TBL and SERDE Strategy options shown in the NetworkToplogyStrategy example
cassandra.page.size SERDE Fetch tables with many columns by page size
cassandra.partitioner TBL and SERDE Partitioner - default is your configured partitioner
cassandra.port TBL and SERDE Cassandra RPC port - default 9160
cassandra.input.split.size TBL and SERDE MapReduce split size, rows processed per mapper (64k rows per split) - default 64 * 1024

In DataStax Enterprise 3.1.5, the default consistency level for cassandra.consistency.level is LOCAL_ONE.

When you create an external table in Hive, you need to specify these properties:

  • cassandra.ks.name
  • cassandra.cf.name
  • cassandra.ks.repfactor (if SimpleStrategy is used)
  • cassandra.ks.strategy
  • cassandra.ks.stratOptions (if NetworkTopologyStrategy is used)

You do not need to specify cassandra.partitioner. Your configured partitioner is used by Hive. For example, Hive uses this property value if you use the Cassandra 1.2 default partitioner:

"cassandra.partitioner" = "org.apache.cassandra.dht.Murmur3Partitioner"

Mapping a Hive database to a Cassandra keyspace and MapReduce performance tuning show examples of using some of these properties.

Running Hive

You can run Hive as a server or as a client. To perform the demo examples, run Hive as a client.

Starting a Hive client

Use a Hive client on a node in the cluster under these conditions:

  • To connect to the Hive server running on another node
  • To use Hive in a single-node cluster

You can start a Hive client on any analytics node and run MapReduce queries directly on data already stored in Cassandra.

To start a Hive client:

For packaged or AMI distributions, you can start the Hive client as follows:

dse hive

For a tarball, binary distribution, start Hive as follows:

<install_location>/bin/dse hive

A warning about not being able to access history indicates a permissions problem. Change file system permissions or use sudo to start hive.

Creating a table

If the table exists in Cassandra, you can create a corresponding table to access data using Hive. Use the CREATE EXTERNAL TABLE command. If the table does not already exist in Cassandra, create one using CQL 3, and then use the CREATE EXTERNAL TABLE command.

The HiveQL Manual provides information about the HiveQL syntax.

  1. Start cqlsh. For example, on Linux:

    ./cqlsh
    
  2. Create a keyspace and a table and insert some data using CQL 3.

    cqlsh> CREATE KEYSPACE cql3ks WITH replication =
             { 'class': 'NetworkTopologyStrategy',
             'Analytics': '1' };
    
    cqlsh> CREATE TABLE cql3ks.test
             (m text, n text, o text, p text, PRIMARY KEY (m));
    
    cqlsh> INSERT INTO cql3ks.test (m, n, o, p)
             VALUES ('abc', 'def', 'hij', 'klm');
    

    Alternatively, you can create legacy tables because the Hive implementation in DataStax Enterprise 3.1 is backward compatible.

  3. Start the Hive client. For example, from the DataStax Enterprise <install_home>/bin directory on Linux:

    ./dse hive
    
  4. Create a table in Hive that corresponds to the test table you created in CQL 3.

    hive>  CREATE EXTERNAL TABLE MyHiveTable
             (m string, n string, o string, p string)
             STORED BY 'org.apache.hadoop.hive.cassandra.cql3.CqlStorageHandler'
             TBLPROPERTIES ( "cassandra.ks.name" = "cql3ks",
             "cassandra.cf.name" = "test",
             "cassandra.cql3.type" = "text, text, text, text");
    
  5. Issue a Hive Query to get some information about the keyspace/Hive database.

    hive>  describe database cql3ks;
    

    Output is:

    OK
    cql3ks    cfs://127.0.0.1/user/hive/warehouse/cql3ks.db
    
  6. Retrieve the data in the table using a Hive query.

    hive> select * from MyHiveTable;
    

    Output is:

    OK
    abc def hij klm
    

Automatic recognition of Cassandra keyspaces

If a keyspace exists in Cassandra, you can use the keyspace in Hive. For example, create a keyspace in Cassandra using cqlsh. Add some data to the table using cqlsh, and then access the data by simply issuing a USE command in Hive.

cqlsh> CREATE KEYSPACE cassandra_keyspace WITH replication =
         {'class': 'SimpleStrategy', 'replication_factor': 1};
cqlsh> USE cassandra_keyspace;
cqlsh:cassandra_keyspace> CREATE TABLE exampleTable
                            ( key int PRIMARY KEY , data text );
cqlsh:cassandra_keyspace> INSERT INTO exampletable (key, data )
                            VALUES ( 1, 'This data can be read
                              automatically in hive');
cqlsh:cassandra_keyspace> quit;

At this point, you can use the keyspace in Hive without manually specifying it.

hive> USE cassandra_keyspace;
hive> SHOW TABLES;
      OK
      exampletable
hive> SELECT * FROM exampletable;
      OK
      1 This data can be read automatically in hive

Mapping a Hive database to a Cassandra keyspace

You can map a Hive database to an existing Cassandra keyspace by naming them the same in the CREATE EXTERNAL TABLE definition. Optionally, if your Hive database and Cassandra keyspace use different names, you can declare keyspace properties in your external table definition using the TBLPROPERTIES clause.

SimpleStrategy keyspace example

hive> CREATE EXTERNAL TABLE MyHiveTable
        (m string, n string, o string, p string)
        STORED BY 'org.apache.hadoop.hive.cassandra.cql3.CqlStorageHandler'
        TBLPROPERTIES ( "cassandra.ks.name" = "MyCassandraKS",
          "cassandra.cf.name" = "mycasstable",
          "cassandra.ks.repfactor" = "2",
          "cassandra.ks.strategy" =
            "org.apache.cassandra.locator.SimpleStrategy");

NetworkToplogyStrategy keyspace example

To create a keyspace that uses the NetworkTopologyStrategy replication strategy, use the cassandra.ks.stratOptions property to define the replication factors for data centers:

hive> CREATE EXTERNAL TABLE MyHiveTable
        (m string, n string, o string, p string)
        STORED BY 'org.apache.hadoop.hive.cassandra.cql3.CqlStorageHandler'
        TBLPROPERTIES ( "cassandra.ks.name" = "MyCassandraKS",
          "cassandra.cf.name" = "mycasstable",
          "cassandra.ks.stratOptions" = "DC1:1, DC2:2, DC3:1",
          "cassandra.ks.strategy" =
            "org.apache.cassandra.locator.NetworkTopologyStrategy");

Accessing tables with many columns

In the TBLPROPERTIES clause, set the cassandra.page.size to fetch a table with many columns at once by page size.

Handling schema changes

A runtime exception can occur if you create an external table in Hive, change the Cassandra table that is mapped to the Hive table, and then query the Hive table. Changes that occur to the Cassandra table get out of synch with the Hive table, and naturally, this causes problem. The workaround is:

  1. In Hive, drop any external table after changing the corresponding Cassandra table.

  2. Run SHOW TABLES:

    hive> DROP TABLE mytable;
    
    hive> SHOW TABLES;
    

    Now, the table in Hive contains the updated data.

Creating a Hive managed table

To use Hive with CassandraFS as you would use it in an HDFS-based Hadoop implementation except saving the tables to CassandraFS instead of HDFS, follow examples in this section. Create a Hive managed table using the CREATE TABLE command.

For example:

hive>  CREATE TABLE invites (foo INT, bar STRING)
       PARTITIONED BY (ds STRING);

Load data into a table using the LOAD DATA command. The HiveQL Manual provides more information about the HiveQL syntax. The loaded data resides in the cfs keyspace. Your Hive metadata store also resides in Cassandra in its own keyspace.

For example, on the Mac OSX:

hive>  LOAD DATA LOCAL
       INPATH '<install_location>/resources/hive/examples/files/kv2.txt'
       OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');

hive>  LOAD DATA LOCAL
       INPATH '<install_location>/resources/hive/examples/files/kv3.txt'
       OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-08');

hive>  SELECT count(*), ds FROM invites GROUP BY ds;

Note

The paths to the Hive example files shown in the example LOAD commands above are for the binary distribution.

Using the count function

Using cqlsh, set the consistency level to ALL before issuing a Hive SELECT expression containing the count function. Using ALL ensures that when you ping one node for a scan of all keys, the node is fully consistent with the rest of the cluster. Using a consistency level other than ALL can return resultsets having fewer rows than expected because replication has not finished propagating the rows to all nodes. A count that is higher than expected can occur because tombstones have not yet been propagated to all nodes.

To get accurate results from the count function using a consistency level other than ALL:

  • Repair all nodes.
  • Prevent new data from being added or deleted.

Using an external file system in Hive

In DataStax Enterprise 3.1.5, you can map a file in an external file system, such as S3 native file system to a table in Hive. The DSE Analytics/Hadoop cluster continues to use the CassandraFS file system. The data source is external to Hive, located in S3 for example. You create a Hive external table for querying the data in an external file system. When you drop the external table, only the table metadata stored in the HiveMetaStore keyspace in the CassandraFS is removed. The data persists in the external file system.


../../_images/hive_tableS3.png

First, you set up the hive-site.xml and core-site.xml files, and then create an external table as described in this procedure.

  1. Open the hive-site.xml for editing. This file is located in:

    • Packaged installations: /etc/dse/hadoop
    • Binary installations: /<install_location>/resources/hive/conf
  2. Add a property to hive-site.xml to set the default file system to be the native S3 block file system. Use fs.default.name as the name of the file system and the location of the bucket as the value. For example, if the S3 bucket name is dsp2377:

    <property>
      <name>fs.default.name</name>
      <value>s3n://dsp2377</value>
    </property>
    
  3. Save the file.

  4. Open the core-site.xml file for editing. This file is located in:

    • Packaged installations: /etc/dse/hadoop
    • Binary installations: /<install_location>/resources/hadoop/conf
  5. Add these properties to core-site.xml to specify the access key ID and the secret access key credentials for accessing the native S3 block filesystem :

    <property>
      <name>fs.s3n.awsAccessKeyId</name>
      <value>ID</value>
    </property>
    
    <property>
      <name>fs.s3n.awsSecretAccessKey</name>
      <value>Secret</value>
    </property>
    
  6. Save the file and restart Cassandra.

  7. Start Hive, and on the Hive command line, create an external table for the data on S3. Specify the S3 file name as shown in this example:

    DSE creates the new CassandraFS.

    hive> CREATE EXTERNAL TABLE mytable (key STRING, value INT)
            ROW FORMAT DELIMITED FIELDS TERMINATED BY '=' STORED AS TEXTFILE LOCATION 's3n://dsp2377/2377/data';
    

    Now, having the S3 data in Hive, you can query the data using Hive.

Starting the Hive server

A node in the analytics cluster can act as the Hive server. Other nodes connect to Hive through the JDBC driver. To start the Hive server, choose a node in the Hadoop cluster and run this command:

dse hive --service hiveserver

or in a binary distribution:

<install_location>/bin/dse hive  --service hiveserver

MapReduce performance tuning

You can change performance settings in the following ways:

  • In your external table definitions, using the TBLPROPERTIES or SERDEPROPERTIES clauses.

  • Using the Hive SET command. For example: SET mapred.reduce.tasks=32;

  • In the mapred-site.xml file.

    Packaged installations: /etc/dse/hadoop/mapred-site.xml

    Binary installations: <install_location>/resources/hadoop/conf/mapred-site.xml

    Note

    This is a system setting so if you change it you must restart the analytics nodes.

Speeding up map reduce jobs

Increase your mappers to one per CPU core by setting mapred.tasktracker.map.tasks.maximum in mapred-site.xml.

Increasing the number of map tasks to maximize performance

  • Turn off map output compression, in mapred-site.xml, to lower memory usage.
  • The cassandra.input.split.size property specifies rows to be processed per mapper. The default size is 64k rows per split. You can decrease the split size to create more mappers.

Out of Memory Errors

When your mapper or reduce tasks fail, reporting Out of Memory (OOM) errors, turn the mapred.map.child.java.opts setting in Hive to:

SET mapred.child.java.opts="-server -Xmx512M"

You can also lower memory usage by turning off map output compression in mapred-site.xml.