DataStax Enterprise 3.0 Documentation

Using Hive

This documentation corresponds to an earlier product version. Make sure this document corresponds to your version.

Latest DSE documentation | Earlier DSE documentation

DataStax Enterprise includes a Cassandra-enabled Hive MapReduce client. Hive is a data warehouse system for Hadoop that projects a relational structure onto data stored in Hadoop-compatible file systems. You query the data using a SQL-like language called HiveQL. You can start the Hive client on any analytics node and run MapReduce queries directly on data already stored in Cassandra. DataStax maps any existing column families into Hive tables. You do not need to run a stand-alone Hive MetaStore. To define new Hive tables and load data into the Cassandra File System (CassandraFS), use HiveQL just as you would use it in an HDFS-based Hadoop implementation.

HiveQL is extensible by virtue of its user defined types. You upload custom user-defined functions to manipulate the data in your queries.

About the Hive metastore

Metadata about the objects you define in Hive is stored in a database called the metastore. In HDFS-based Hive, when you run Hive on your local machine, your DDL commands create objects in a local metastore that is not available to other Hive clients. DataStax Enterprise implements the Hive metastore as a keyspace within Cassandra. Therefore, the metastore is shared and requires no configuration except increasing the default replication factor.

Note

The default replication for system keyspaces is 1. This replication factor is suitable for development and testing, not for a production environment. For production increase the replication factors for the HiveMetaStore and cfs keyspaces to at least 2; see Changing replication settings.

Using Hive

This section provides information about:

Setting the Job Tracker node for Hive

Hive generates MapReduce jobs for most of its queries. Hive MapReduce jobs are submitted to the job tracker node for the DataStax Enterprise cluster. Hive clients automatically select the correct job tracker node upon startup. You set the job tracker node for Hive as you would for any analytics node. Use the dsetool commands to manage the job tracker.

Starting the Hive server

To connect to Hive via the JDBC driver, start Hive on one of the Hadoop nodes as follows:

dse hive --service hiveserver

or in a binary distribution:

<install_location>/bin/dse hive  --service hiveserver

Starting a Hive client

When you install DataStax Enterprise using the packaged or AMI distributions, you can start Hive as follows:

dse hive

or in a binary distribution:

<install_location>/bin/dse hive

Creating Hive CassandraFS tables

Use Hive with CassandraFS as you would use it in an HDFS-based Hadoop implementation. Create Hive tables using the CREATE TABLE command.

For example:

hive>  CREATE TABLE invites (foo INT, bar STRING)
       PARTITIONED BY (ds STRING);

Load data into a table using the LOAD DATA command. The HiveQL Manual provides more information about the HiveQL syntax. The loaded data resides in the cfs keyspace. Your Hive metadata store also resides in Cassandra in its own keyspace.

For example:

hive>  LOAD DATA LOCAL
       INPATH '<install_location>/resources/hive/examples/files/kv2.txt'
       OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');

hive>  LOAD DATA LOCAL
       INPATH '<install_location>/resources/hive/examples/files/kv3.txt'
       OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-08');

hive>  SELECT count(*), ds FROM invites GROUP BY ds;

Note

The paths to the Hive example files shown in the example LOAD commands above are for the binary distribution.

Using the count function

Using cqlsh, set the consistency level to ALL before issuing a Hive SELECT expression containing the count function. Using ALL ensures that when you ping one node for a scan of all keys, the node is fully consistent with the rest of the cluster. Using a consistency level other than ALL can return resultsets having fewer rows than expected because replication has not finished propagating the rows to all nodes. A count that is higher than expected can occur because tombstones have not yet been propagated to all nodes.

To get accurate results from the count function using a consistency level other than ALL:

  • Repair all nodes.
  • Prevent new data from being added or deleted.

Changing Hive storage properties on the fly

You can change Hive storage properties listed in the serdeproperties and tblproperties on the fly using DataStax Enterprise 2.1 and later. Using the Hive SET command, set properties in the hive session. The settings become effective for the next query. Using DataStax Enterprise 2.0 and earlier, you had to use ALTER TABLE to change the storage properties.

Running the Hive demo

The Hive demo shows you how to access data in Cassandra. DataStax Enterprise uses a custom storage handler to allow direct access to data stored in Cassandra through Hive.

Mapping a Hive database to a Cassandra keyspace

To access data stored in Cassandra, first define a database in Hive that maps to a keyspace in Cassandra. One way you can map them is by making sure that the name is the same in both Hive and Cassandra. For example:

hive> CREATE DATABASE PortfolioDemo;

Optionally, if your Hive database and Cassandra keyspace use different names (or the Cassandra keyspace does not exist), you can declare keyspace properties in your external table definition using the TBLPROPERTIES clause. If the keyspace does not yet exist in Cassandra, Hive will create it.

For example, in the case where the keyspace exists in Cassandra but under a different name:

hive> CREATE DATABASE MyHiveDB;

hive> CREATE EXTERNAL TABLE MyHiveTable(row_key string, col1 string, col2 string)
      STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
      TBLPROPERTIES ( "cassandra.ks.name" = "MyCassandraKS" )

Or if the keyspace does not exist in Cassandra yet and you want to create it:

hive> CREATE EXTERNAL TABLE MyHiveTable(row_key string, col1 string, col2 string)
      STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
      TBLPROPERTIES ( "cassandra.ks.name" = "MyCassandraKS",
                      "cassandra.ks.repfactor" = "2",
                      "cassandra.ks.strategy" = "org.apache.cassandra.locator.SimpleStrategy");

You use the cassandra.ks.repfactor property to define the replication factor for a keyspace that uses the SimpleStrategy replication strategy.

To create a keyspace that uses the NetworkTopologyStrategy replication strategy, use the cassandra.ks.stratOptions property to define the replication factors for data centers:

hive> CREATE EXTERNAL TABLE MyHiveTable(row_key string, col1 string, col2 string)
      STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
      TBLPROPERTIES ( "cassandra.ks.name" = "MyCassandraKS",
      "cassandra.ks.stratOptions" = "DC1:1, DC2:2, DC3:1",
      "cassandra.ks.strategy" = "org.apache.cassandra.locator.NetworkTopologyStrategy");

Hive to Cassandra table mapping

An external table in Hive maps to a column family in Cassandra. In DataStax Enterprise 2.0 and earlier, all automatically created hive tables relied on the SERDE property to map typed data in the Cassandra column family to stings. Hive did not store Cassandra data in a typed manner.

In DataStax Enterprise 2.1 and later, automatically created hive tables use the following logic for mapping Cassandra Types to Hive:

Cassandra Type Hive Type
UTF8Type string
AsciiType string
DateType timestamp
LongType bigint
Int32Type int
DoubleType double
FloatType float
BooleanType boolean
UUIDType binary
TimeUUIDType binary
all other types binary

Validating types

The STORED BY clause specifies the storage handler to use, which for Cassandra is org.apache.hadoop.hive.cassandra.CassandraStorageHandler. The WITH SERDEPROPERTIES clause specifies the properties used when serializing/deserializing data passed between the Hive table and Cassandra.

Validate types using the cassandra.cf.validatorType. Set the value of the validatorType to the Cassandra types that map to Hive types.

hive> DROP TABLE IF EXISTS StockHist;
      create external table StockHist(row_key string, column_name string, value double)
        STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
        WITH SERDEPROPERTIES ("cassandra.ks.name" = "PortfolioDemo",
        "cassandra.cf.validatorType" = "UTF8Type,UTF8Type,DoubleType"
        );

This forces the columns to be deserialized from CassandraTypes into Strings.

Specifying CassandraFS and MapReduce properties

The TBLPROPERTIES clause specifies CassandraFS and MapReduce properties for the table. For example:

hive> CREATE EXTERNAL TABLE Users(userid string, name string,
      email string, phone string)
      STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
      WITH SERDEPROPERTIES ( "cassandra.columns.mapping" = ":key,user_name,primary_email,home_phone")
      TBLPROPERTIES ( "cassandra.range.size" = "100",
                      "cassandra.slice.predicate.size" = "100" );

For static Cassandra column families that model objects (such as users), mapping them to a relational structure is straightforward. In the example above, the first column of the Hive table (userid) maps to the row key in Cassandra. The row key in Cassandra is similar to a PRIMARY KEY in a relational table and should be the first column in your Hive table. If you know what the column names are in Cassandra, you can map the Hive column names to the Cassandra column names as shown above.

However, for dynamic column families (such as time series data), all rows likely have a different set of columns, and in most cases you do not know what the column names are. To convert this type of column family to a Hive table, you would convert a wide row in Cassandra to a collection of short rows in Hive using a special set of column names (row_key, column_name, value). For example:

hive> CREATE EXTERNAL TABLE PortfolioDemo.Stocks
      (row_key string, column_name string,  value double)
      STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler';

Mapping column names to Cassandra row and column names

Optionally, you can add a WITH SERDEPROPERTIES clause to map meaningful column names in Hive to the Cassandra row key, column names and column values. For example:

hive> CREATE EXTERNAL TABLE PortfolioDemo.PortfolioStocks
      (portfolio string, ticker string, number_shares int)
      STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
      WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,:column,:value" );

Using cassandra.columns.mapping, you can use a mapping of meaningful column names you assign in the Hive table to Cassandra row key, column/subcolumn names and column/subcolumn values. In the mapping, :key is a special name reserved for the column family row key, :column for column names, :subcolumn for subcolumn names (in super column families), and :value for column (or subcolumn) values. If you do not provide a mapping, then the first column of the Hive table is assumed to be the row key of the corresponding Cassandra column family.

Once you have defined your external tables in Hive, you can query the database to select from the Hive table. For example:

hive> SELECT * FROM PortfolioDemo.Stocks;

Any other query besides a SELECT * in Hive runs as a MapReduce job.

Inserting data into Cassandra via Hive

Once you have defined an external table object in Hive that maps to a Cassandra column family, you can move the results of MapReduce queries back into Cassandra using the INSERT OVERWRITE TABLE command. For example:

hive> CREATE EXTERNAL TABLE PortfolioDemo.HistLoss
      (row_key string, worst_date string, loss string)
      STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler';
hive> INSERT OVERWRITE TABLE PortfolioDemo.HistLoss
      SELECT a.portfolio, rdate, cast(minp as string)
        FROM (
        SELECT portfolio, MIN(preturn) as minp
        FROM portfolio_returns
      GROUP BY portfolio )
      a JOIN portfolio_returns b ON
      (a.portfolio = b.portfolio and a.minp = b.preturn);

SERDEPROPERTIES reference

The SERDEPROPERTIES clause specifies the properties used when serializing/deserializing data passed between the Hive table and Cassandra. You can add a WITH SERDEPROPERTIES clause to map meaningful column names in Hive to the Cassandra row key, column names and column values.

The following properties can be declared in a WITH SERDEPROPERTIES clause:

  • cassandra.columns.mapping - Mapping of Hive to Cassandra columns
  • cassandra.cf.name - Column family name in Cassandra
  • cassandra.host - IP of a Cassandra node to connect to
  • cassandra.port - Cassandra RPC port - default 9160
  • cassandra.partitioner - Partitioner - default RandomPartitioner

TBLPROPERTIES reference

The TBLPROPERTIES clause specifies CassandraFS and MapReduce properties for the table. The following properties can be declared in a TBLPROPERTIES clause:

  • cassandra.ks.name - Cassandra keyspace name.
  • cassandra.ks.repfactor - Cassandra replication factor - default 1.
  • cassandra.ks.strategy - Replication strategy - default SimpleStrategy.
  • cassandra.input.split.size - MapReduce split size - default 64 * 1024. This property dictates how many rows are processed per mapper (that is, 64k rows per split).
  • cassandra.range.size - MapReduce key range size - default 1000. This property specifies the number of rows fetched at a time over the split. For example, if a mapper is processing a total of 64k rows, it pulls 1000 rows at a time 64 times.
  • cassandra.slice.predicate.size - MapReduce slice predicate size - default 1000. This property describes which columns to fetch from each row and how many columns per row are fetched. For example, for a wide row in Hive, this is the paging size for columns across a row. This means that a row with 10,000 columns is fetched 1000 columns at a time.

MapReduce performance tuning

You can change performance settings in the following ways:

  • In your external table definitions, using the TBLPROPERTIES or SERDEPROPERTIES clauses.

  • Using the set Hive command. For example: set mapred.reduce.tasks=32;

  • In the mapred-site.xml file.

    Packaged installations: /etc/dse/hadoop/mapred-site.xml

    Binary installations: <install_location>/resources/hadoop/conf/mapred-site.xml

    Note

    This is a system setting so if you change it you must restart the analytics nodes.

Speeding up map reduce jobs:

Increase your mappers to one per CPU core by setting mapred.tasktracker.map.tasks.maximum in mapred-site.xml.

Accessing rows with 100,000 columns or more:

In the TBLPROPERTIES clause, set the cassandra.range.size and cassandra.slice.predicate.size to fetch one row with 100,000 columns at once. Although this requires more disk usage and scan runs, it is better to fetch one row with 100,000 columns at once than fetching 1000 rows with 100,000 columns at a time.

Increasing the number of map tasks to maximize performance:

  • Turn off map output compression, in mapred-site.xml, to lower memory usage.
  • The cassandra.input.split.size property (in TBLPROPERTIES) sets how many rows are processed per mapper. The default size is 64k rows per split. You can decrease the split size to create more mappers.

Improving Counter Performance:

For example, when performing select count(1) from <column family>;, you can improve the speed of the counter by setting cassandra.enable.widerow.iterator=false. This setting causes all columns after the 1000th column to be ignored for each row, thus improving the speed of the counter.

Out of Memory Errors:

When your mapper or reduce tasks fail, reporting Out of Memory (OOM) errors, turn the mapred.map.child.java.opts setting in Hive to:

SET mapred.child.java.opts="-server -Xmx512M"

You can also lower memory usage by turning off map output compression in mapred-site.xml.

ODBC driver for Hive

DataStax provides an ODBC driver for Hive on Windows.