I am having troubles using Pig + Cassandra (I deployed a staging node using DataStax Auto-Clustering
AMI 2.0, ami-edc30384 on an EC2 m1.large instance)
Here a description of what I'm doing, with snippets from the pig script I'm using:
register '/home/ubuntu/src/pig/contrib/piggybank/java/piggybank.jar';
register '/home/ubuntu/src/pygmalion/udf/target/pygmalion-1.1.0-SNAPSHOT.jar';
define FromCassandraBag org.pygmalion.udf.FromCassandraBag();
define ToCassandraBag org.pygmalion.udf.ToCassandraBag();
data = LOAD 'cassandra://RawLogTest/RawLog2' using CassandraStorage()
AS (key, columns: {T: tuple( name, value ) });
rows = FOREACH data GENERATE key, FLATTEN(FromCassandraBag('..., some_id, ...', columns)) AS (
...
some_id:int,
...
);
rows_stripped = FOREACH rows {
t = REGEX_EXTRACT( key, '([0-9]+)\\.([0-9]+)\\.([0-9]+)', 1 );
GENERATE (long)t AS timestamp, ..., some_id, ...;
}
Now I select a slice of rows_stripped, using parameters coming from the user.
Then I GROUP and generate a report from the raw data.
interval = FILTER rows_stripped BY (timestamp >= $FROM and timestamp <= $TO);
grouped = GROUP rows_stripped BY (...);
report = FOREACH grouped GENERATE FLATTEN( group ), COUNT( rows_stripped );
So far so good. I can DUMP all the relations defined, getting back the
data I expect.
Now I gather some other data from CSV files I previously put on Cfs (they actually
come from a db table, which I imported via Sqoop). My purpose is to join
the two relations to augment the report.
other_info = LOAD 'lat.csv' using PigStorage(',') as (... , some_id:int, ...);
This relation is DUMPable as well without problems.
And now, I join the data with this.augmented_report:
augmented_report = JOIN other_info BY some_id, report BY some_id;
When I try to STORE or DUMP augmented_report, the mapreduce fails with the following
error:
java.io.IOException: Type mismatch in key from map: expected org.apache.pig.impl.io.NullableIntWritable, recieved org.apache.pig.impl.io.NullableBytesWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1013)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Map.collect(PigMapReduce.java:116)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:239)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:232)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
I can solve this problem storing report in Cfs, then JOINing other_info against
a new relation, but I'd like to be able to do my augmented_report using data
from the original source.
What strategy do you recommend to debug and solve the problem ?
thank you,
s.
