DataStax Enterprise (DSE) includes a Cassandra-enabled Hive MapReduce client. Hive is a data warehouse system for Hadoop that allows you to project a relational structure onto data stored in Hadoop-compatible file systems, and to query the data using a SQL-like language called HiveQL. The HiveQL language also allows traditional MapReduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL. In DataStax Enterprise, you can start the Hive client on any analytics node, define Hive data structures, and issue MapReduce queries. DSE Hive includes a custom storage handler for Cassandra that allows you to run Hive queries directly on data stored in Cassandra.
DataStax Enterprise 2.0 supports Hive 0.8.1**. This version includes a JDBC compliant user interface to connect to and work with Hive from inside the server. It also includes support for binary data and support for wide rows (up to 2 billion columns).
Metadata about the objects you define in Hive is stored in a database called the metastore. In regular HDFS-based Hive, when you run Hive on your local machine, your DDL commands create objects in a local metastore that is not available to other Hive clients. In DataStax enterprise, the Hive metastore is implemented as a keyspace within Cassandra. This automatically makes it a shared metastore without any additional configuration required.
Hive generates MapReduce jobs for most of its queries. Hive MapReduce jobs are submitted to the job tracker node for the DataStax Enterprise cluster. In DataStax Enterprise, the job tracker node information is stored in a column family in CassandraFS, and is initially populated on cluster startup by selecting the first Analytics node from the Cassandra seeds list. Assuming you have properly configured the Cassandra seeds list for DataStax Enterprise in cassandra.yaml, there is no additional configuration required. Hive clients will automatically select the correct job tracker node upon startup.
The default job tracker client port is 8012. If you are not sure which node in your cluster is the job tracker, run the following command:
or in a binary distribution:
If your primary Job Tracker node fails, DataStax Enterprise provides a utility (dsetool movejt) that allows you to move the job tracker to another Analytics node in the cluster.
Log in to a DataStax Enterprise Analytics node.
Run the dsetool movejt command and specify the IP address of the new job tracker node in your DataStax Enterprise cluster. For example:
dsetool movejt 22.214.171.124
or in a binary distribution:
<install_location>/bin/dsetool movejt 126.96.36.199
Allow 20 seconds for all of the Analytics nodes to detect the change and restart their task tracker processes.
In a browser, connect to the new job tracker and confirm that it is up and running. For example (change the IP to reflect your job tracker node IP):
If you are running Hive or Pig MapReduce clients, you must restart them to pick up the new job tracker node information.
When you install DataStax Enterprise using the packaged or AMI distributions, you can start Hive as follows:
or in a binary distribution:
To connect to Hive via the JDBC driver, start Hive on one of the Hadoop nodes as follows:
./bin/dse hive --service hiveserver
or in a binary distribution:
<install_location>/bin/dse hive --service hiveserver
DataStax Enterprise allows you to use Hive with CassandraFS just as you would in a regular Hadoop implementation. You can define Hive tables and load them with data using the regular HiveQL SQL-like syntax. In this type of usage, you create your Hive tables using the CREATE TABLE command.
hive> CREATE TABLE invites (foo INT, bar STRING) PARTITIONED BY (ds STRING);
You can then load a table using the LOAD DATA command. See the HiveQL Manual for more information about the HiveQL syntax. In this usage, your loaded data resides in the cfs keyspace. Your Hive metadata store also resides in Cassandra in its own keyspace.
hive> LOAD DATA LOCAL INPATH '<install_location>/resources/hive/examples/files/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15'); hive> LOAD DATA LOCAL INPATH '<install_location>/resources/hive/examples/files/kv3.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-08'); hive> SELECT count(*), ds FROM invites GROUP BY ds;
The paths to the Hive example files shown in the example LOAD commands above are for the binary distribution.
DataStax Enterprise uses a custom storage handler to allow direct access to data stored in Cassandra through Hive.
To access data stored in Cassandra, first define a database in Hive that maps to a keyspace in Cassandra. One way you can map them is by making sure that the name is the same in both Hive and Cassandra. For example:
hive> CREATE DATABASE PortfolioDemo;
Optionally, if your Hive database and Cassandra keyspace use different names (or the Cassandra keyspace does not exist), you can declare keyspace properties in your external table definition using the TBLPROPERTIES clause. If the keyspace does not yet exist in Cassandra, Hive will create it.
For example, in the case where the keyspace exists in Cassandra but under a different name:
hive> CREATE DATABASE MyHiveDB; hive> CREATE EXTERNAL TABLE MyHiveTable(row_key string, col1 string, col2 string) STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' TBLPROPERTIES ( "cassandra.ks.name" = "MyCassandraKS" )
Or if the keyspace does not exist in Cassandra yet and you want to create it:
hive> CREATE EXTERNAL TABLE MyHiveTable(row_key string, col1 string, col2 string) STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' TBLPROPERTIES ( "cassandra.ks.name" = "MyCassandraKS", "cassandra.ks.repfactor" = "2", "cassandra.ks.strategy" = "org.apache.cassandra.locator.NetworkTopologyStrategy");
The default host is localhost.
An external table in Hive maps to a column family in Cassandra. The STORED BY clause specifies the storage handler to use, which for Cassandra is org.apache.hadoop.hive.cassandra.CassandraStorageHandler. The WITH SERDEPROPERTIES clause specifies the properties used when serializing/deserializing data passed between the Hive table and Cassandra. The TBLPROPERTIES clause specifies CassandraFS and MapReduce properties for the table. For example:
hive> CREATE EXTERNAL TABLE Users(userid string, name string, email string, phone string) STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( "cassandra.columns.mapping" = ":key,user_name,primary_email,home_phone") TBLPROPERTIES ( "cassandra.range.size" = "100", "cassandra.slice.predicate.size" = "100" );
For static Cassandra column families that model objects (such as users), mapping them to a relational structure is straightforward. In the example above, the first column of the Hive table (userid) maps to the row key in Cassandra. The row key in Cassandra is similar to a PRIMARY KEY in a relational table and should be the first column in your Hive table. If you know what the column names are in Cassandra, you can map the Hive column names to the Cassandra column names as shown above.
However, for dynamic column families (such as time series data), all rows likely have a different set of columns, and in most cases you do not know what the column names are. To convert this type of column family to a Hive table, you would convert a wide row in Cassandra to a collection of short rows in Hive using a special set of column names (row_key, column_name, value). For example:
hive> CREATE EXTERNAL TABLE PortfolioDemo.Stocks (row_key string, column_name string, value string) STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler';
Optionally, you can add a WITH SERDEPROPERTIES clause to map meaningful column names in Hive to the Cassandra row key, column names and column values. For example:
hive> CREATE EXTERNAL TABLE PortfolioDemo.PortfolioStocks (portfolio string, ticker string, number_shares string) STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,:column,:value" );
Using cassandra.columns.mapping, you can use a mapping of meaningful column names you assign in the Hive table to Cassandra row key, column/subcolumn names and column/subcolumn values. In the mapping, :key is a special name reserved for the column family row key, :column for column names, :subcolumn for subcolumn names (in super column families), and :value for column (or subcolumn) values. If you do not provide a mapping, then the first column of the Hive table is assumed to be the row key of the corresponding Cassandra column family.
Once you have defined your external tables in Hive, you should be able to do a SELECT to see the data stored in them. For example:
hive> SELECT * FROM PortfolioDemo.Stocks;
Any other query besides a SELECT * in Hive will run as a MapReduce job.
Once you have defined an external table object in Hive that maps to a Cassandra column family, you can move the results of MapReduce queries back into Cassandra using the INSERT OVERWRITE TABLE command. For example:
hive> CREATE EXTERNAL TABLE PortfolioDemo.HistLoss (row_key string, worst_date string, loss string) STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler';
hive> INSERT OVERWRITE TABLE PortfolioDemo.HistLoss SELECT a.portfolio, rdate, cast(minp as string) FROM ( SELECT portfolio, MIN(preturn) as minp FROM portfolio_returns GROUP BY portfolio ) a JOIN portfolio_returns b ON (a.portfolio = b.portfolio and a.minp = b.preturn);
The SERDEPROPERTIES clause specifies the properties used when serializing/deserializing data passed between the Hive table and Cassandra. You can add a WITH SERDEPROPERTIES clause to map meaningful column names in Hive to the Cassandra row key, column names and column values.
The following properties can be declared in a WITH SERDEPROPERTIES clause:
The TBLPROPERTIES clause specifies CassandraFS and MapReduce properties for the table. The following properties can be declared in a TBLPROPERTIES clause:
You can change performance settings in the following ways:
Using the set Hive command. For example: set mapred.reduce.tasks=32;
In the mapred-site.xml file.
Packaged installations: /etc/dse/hadoop/mapred-site.xml
Binary installations: <install_location>/resources/hadoop/conf/mapred-site.xml
This is a system setting so if you change it you must restart the analytics nodes.
Speeding up map reduce jobs:
Increase your mappers to one per CPU core by setting mapred.tasktracker.map.tasks.maximum in mapred-site.xml.
Accessing rows with 100,000 columns or more:
In the TBLPROPERTIES clause, set the cassandra.range.size and cassandra.slice.predicate.size to fetch one row with 100,000 columns at once. Although this requires more disk usage and scan runs, it is better to fetch one row with 100,000 columns at once than fetching 1000 rows with 100,000 columns at a time.
Increasing the number of map tasks to maximize performance:
Improving Counter Performance:
For example, when performing select count(1) from <column family>;, you can improve the speed of the counter by setting cassandra.enable.widerow.iterator=false. This setting causes all columns after the 1000th column to be ignored for each row, thus improving the speed of the counter.
Out of Memory Errors:
When your mapper or reduce tasks fail with OOMs, turn the mapred.map.child.java.opts setting in Hive to:
SET mapred.child.java.opts="-server -Xmx512M"
You can also lower memory usage by turning off map output compression in mapred-site.xml.