Accessing Cassandra from Spark in Java

By Jacek Lewandowski -  August 26, 2014 | 27 Comments

A few weeks ago we decided to move our Spark Cassandra Connector to the open source area (GitHub: datastax/spark-cassandra-connector). The connector is intended to be primarily used in Scala, however customers and the community have expressed a desire to use it in Java as well. To address this need we have created an additional module which provides a Java-API for the connector. In this post, we describe how to utilize this new API to access Apache Cassandra™ via Apache Spark™ from Java Applications.

Prerequisites

At the time of writing this post, 1.0.0-rc4 version of the connector was available in the central Maven repository. This version works with Cassandra 2.0 and Spark 0.9.

Note: This blog post was written targeting DSE 4.5 which included Apache Spark™ 0.9. Please refer to the DataStax documentation for your specific version of DSE if different.

The easiest way to run this application is to follow these steps:

  1. Download and run Cassandra server
  2. Create a blank Maven project
  3. Add the following artifacts to the dependencies section:
  4. Implement the application (see the tutorial below)
  5. Compile and run the application with parameters: local[4] 127.0.0.1 (the first argument means that the application will be run without the need to use the real Spark cluster – this is the best for learning and testing purposes; the second argument is the address of a Cassandra database node)

Tutorial

Lets create a simple application which reads sales information and computes roll-up summaries for a products hierarchy.

The products hierarchy may looks like the one depicted below:

Sample products hierarchy

We will use the following stub of the application:

public class JavaDemo implements Serializable {
    private transient SparkConf conf;
 
    private JavaDemo(SparkConf conf) {
        this.conf = conf;
    }
 
    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        generateData(sc);
        compute(sc);
        showResults(sc);
        sc.stop();
    }
 
    private void generateData(JavaSparkContext sc) {
    }
 
    private void compute(JavaSparkContext sc) {
    }
 
    private void showResults(JavaSparkContext sc) {
    }
 
    public static void main(String[] args) {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }
 
        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);
 
        JavaDemo app = new JavaDemo(conf);
        app.run();
    } 
}

In the subsequent sections we will implement those three empty methods which generate the data, compute summaries and display the results.

Generate data

This application creates the schema and generates random data each time it is run. The data generation procedure has been broken into three simple steps, which are presented below.

Create schema

At first we connect to Cassandra using CassandraConnector, drop the keyspace if it exists and then create all the tables from scratch. Although the CassandraConnector is implemented in Scala, we can easily use it in Java with try with resources syntax:

        CassandraConnector connector = CassandraConnector.apply(sc.getConf());
 
        try (Session session = connector.openSession()) {
            session.execute("DROP KEYSPACE IF EXISTS java_api");
            session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
            session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
            session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
            session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
        }

The schema consists of three entities. We assume that the corresponding bean classes for them are already implemented (Product, Sale, Summary). While creating beans to be used with Connector, remember to add appropriate getters, setters and a no-args constructor (more info here).

Generate products hierarchy

Now, we generate the products hierarchy depicted above. Of course, we can do this by sending simple insert statements, however by doing this in a Spark way, we introduce the methods used to save RDDs to Cassandra. So, firstly we create a list of Product instances, then we convert that list to an RDD and eventually save it to Cassandra:

        List<Product> products = Arrays.asList(
                new Product(0, "All products", Collections.<Integer>emptyList()),
                new Product(1, "Product A", Arrays.asList(0)),
                new Product(4, "Product A1", Arrays.asList(0, 1)),
                new Product(5, "Product A2", Arrays.asList(0, 1)),
                new Product(2, "Product B", Arrays.asList(0)),
                new Product(6, "Product B1", Arrays.asList(0, 2)),
                new Product(7, "Product B2", Arrays.asList(0, 2)),
                new Product(3, "Product C", Arrays.asList(0)),
                new Product(8, "Product C1", Arrays.asList(0, 3)),
                new Product(9, "Product C2", Arrays.asList(0, 3))
        );
 
        JavaRDD<Product> productsRDD = sc.parallelize(products);
        javaFunctions(productsRDD, Product.class).saveToCassandra("java_api", "products");

javaFunctions is a static method defined in CassandraJavaUtil class. We strongly encourage users to import this class statically, to make the code more clear:

import static com.datastax.spark.connector.CassandraJavaUtil.*;

The methods defined in CassandraJavaUtil are the main entry points to the functionalities offered by Connector in Java. They are used to create special wrappers around such objects as: RDD, DStream, SparkContext, StreamingContext and their Java counterparts: JavaRDD, JavaDStream, JavaSparkContext, JavaStreamingContext.

Generate random sales

Finally, we want to generate random sales data. In this case we could use simple insert statements also, but we won’t do that. Instead, we will use Spark to filter the products which are leaves in the hierarchy (a very naive approach – just check the number of parents), and then create 1000 random sales for each such product.

        JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {
            @Override
            public Boolean call(Product product) throws Exception {
                return product.getParents().size() == 2;
            }
        }).flatMap(new FlatMapFunction<Product, Sale>() {
            @Override
            public Iterable<Sale> call(Product product) throws Exception {
                Random random = new Random();
                List<Sale> sales = new ArrayList<>(1000);
                for (int i = 0; i < 1000; i++) {
                    sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));
                }
                return sales;
            }
        });
 
        javaFunctions(salesRDD, Sale.class).saveToCassandra("java_api", "sales");

In the last statement, we created a wrapper around JavaRDD. That wrapper provides a number of overloaded saveToCassandra methods. In its simplest form, it writes Java bean-like classes to Cassandra. The attributes of the bean class are mapped to the corresponding columns by their names. However, with the other versions of saveToCassandra, specific mappings are able to be defined.

Compute summaries

In this subsection we are going to compute roll-up summaries for sales of each product.

        JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc)
                .cassandraTable("java_api", "products", Product.class)
                .keyBy(new Function<Product, Integer>() {
                    @Override
                    public Integer call(Product product) throws Exception {
                        return product.getId();
                    }
                });
 
        JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc)
                .cassandraTable("java_api", "sales", Sale.class)
                .keyBy(new Function<Sale, Integer>() {
                    @Override
                    public Integer call(Sale sale) throws Exception {
                        return sale.getProduct();
                    }
                });

First, productsRDD and salesRDD RDDs are created. They contain product id-product and product id-sale pairs respectively. In the next step we want to join these RDDs so converting them into pair RDDs was necessary.

        JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD);

At this point, joinedRDD contains tuples in which sale and the corresponding product are together. One method to compute roll-ups (not necessarily the best one) is to multiply each sale for every product in the branch it belongs. In order to do this, we will use flatMap method:

        JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMap(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() {
            @Override
            public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception {
                Tuple2<Sale, Product> saleWithProduct = input._2();
                List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1);
                allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice()));
                for (Integer parentProduct : saleWithProduct._2().getParents()) {
                    allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice()));
                }
                return allSales;
            }
        });

Once we have the RDD with product id and price for each sale and each hierarchy node that sale belongs to, we can simply reduce by key so that we obtain a sum of all the sales for each hierarchy node. Then, we convert the tuples to Summary objects:

        JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() {
            @Override
            public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception {
                return v1.add(v2);
            }
        }).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {
            @Override
            public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {
                return new Summary(input._1(), input._2());
            }
        });

Finally, we use saveToCassandra method to save summaries to the database, as we did before:

        javaFunctions(summariesRDD, Summary.class).saveToCassandra("java_api", "summaries");

Show the results

We want to display the results in the way that we have a product name and summary printed together. In order to do this, we need to join an RDD of summaries and an RDD of products by product keys:

        JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
                .cassandraTable("java_api", "summaries", Summary.class)
                .keyBy(new Function<Summary, Integer>() {
                    @Override
                    public Integer call(Summary summary) throws Exception {
                        return summary.getProduct();
                    }
                });
 
        JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
                .cassandraTable("java_api", "products", Product.class)
                .keyBy(new Function<Product, Integer>() {
                    @Override
                    public Integer call(Product product) throws Exception {
                        return product.getId();
                    }
                });

Displaying the results is pretty straightforward now:

        List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray();
 
        for (Tuple2<Product, Optional<Summary>> result : results) {
            System.out.println(result);
        }

Summary

This tutorial shows the two main features of Spark Cassandra Connector – reading data from Cassandra into RDD and writing RDD to Cassandra. It is intended to be a quick and simple introduction to using Cassandra in Spark applications. More information about Java API can be found in the documentation.

The full source code of the JavaDemo class and a sample pom.xml file can be found here.



Comments

  1. sia says:

    Thank you for complete description.
    when I ran the program I got IllegalArgumentException in JavaRDD summariesRDD = rdd.map(new Function, Summary>() {
    @Override
    public Summary call(Tuple2 input) throws Exception {
    return new Summary(input._1(), input._2());
    }

    1. Jacek Lewandowski says:

      Do you use the components in the same versions as provided in this post or other?

      1. sia says:

        I am using newer versions. Do you think that causes a problem?

        1. Jacek Lewandowski says:

          Hi, I’ve tried once again with the newest connector – 1.0.3, and it works. What versions do you use exactly?

  2. Jacek Lewandowski says:

    Might be. I’ll check it and update the blog post to the final release of the connector 1.0 if needed.

  3. Shishir Prasad says:

    Hi,

    I am facing one issue where I am not fetch dynamic columns from a cassandra column family using the spark-cassandra connector. I have posted this issue here : http://stackoverflow.com/questions/26683942/spark-cassandra-connector-not-able-to-fetch-dynamic-columns .

    Any thoughts on that ?

  4. Mitur says:

    I was able to execute this in my cluster. I then decided to add

    Map countByKey = productsRDD.countByKey();

    in the compute method. Now I am consistently getting:

    Exception in thread “main” java.lang.ClassCastException: it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap cannot be cast to it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap
    at org.apache.spark.rdd.RDD.countByValue(RDD.scala:780)
    at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:203)
    at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:195)
    at com.maarcus.mitur.sparktutorial.JavaDemo.compute(JavaDemo.java:98)
    at com.maarcus.mitur.sparktutorial.JavaDemo.run(JavaDemo.java:34)
    at com.maarcus.mitur.sparktutorial.JavaDemo.main(JavaDemo.java:181)

    Any ideas of what went wrong?

    Thanks.

    1. Mitur says:

      BTW, the Map is defined as a Map of Integer to Object. Copy and paste took that part out.

  5. Liquid says:

    How to make an existed ‘keyspace’ as a source of ‘JavaRDD’ and then I can process the data in that ‘keyspace’ using spark Java API?

  6. Steinar says:

    Hi, I am trying to run this example. I am using Cassandra 2.0.11 with the exact Maven dependencies mentioned in Prerequisites above. It gets pretty far, but I get the following exception:

    Exception in thread “main” java.lang.IllegalArgumentException
    at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown Source)
    at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown Source)
    at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$getClassReader(ClosureCleaner.scala:42)
    at org.apache.spark.util.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:85)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:982)
    at org.apache.spark.rdd.RDD.map(RDD.scala:249)
    at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:69)
    at org.apache.spark.api.java.JavaPairRDD.map(JavaPairRDD.scala:47)
    at com.globalspec.rnd.sparksample.JavaDemo.compute(JavaDemo.java:126)
    at com.globalspec.rnd.sparksample.JavaDemo.run(JavaDemo.java:33)
    at com.globalspec.rnd.sparksample.JavaDemo.main(JavaDemo.java:174)

    The exception is thrown on this line of the compute function: }).map(new Function<Tuple2, Summary>() {

    Any ideas?

    1. Jonathan Neufeld says:

      My guess is a versioning problem. The following snippet in ClassReader is implicated:

      if(this.readShort(6) > 51) {
      throw new IllegalArgumentException();

      I’m willing to bet it’s trying to validate the class version, which would suggest that the library is not compatible with Java 8.

      So in summary my guess is that there’s some sort of compiled class file version conflict at play here.

  7. aldo says:

    i got the same error mentioned above, i’, using cassandra 2.1.2, spark cassandra connector 1.0.0 and spark 0.9.2

  8. aldo says:

    i have updated spark to 1.2.0 and cassandra connector to 1.1.0 and now it’s working

  9. Sreeharsha says:

    Can we access the data from hive from Spark in Java

  10. james says:

    Can i catch the cassandraRDD/any other RDD to cassandra instead of just disk ..

  11. praveen Kumar says:

    I have tried the above example. Insert is working fine but while retrieving im getting timout exception

    Exception in thread “main” org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection timed out: connect
    at org.apache.thrift.transport.TSocket.open(TSocket.java:185)
    at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
    at org.apache.cassandra.thrift.TFramedTransportFactory.openTransport(TFramedTransportFactory.java:41)
    at com.datastax.spark.connector.cql.CassandraConnector.createThriftClient(CassandraConnector.scala:127)
    at com.datastax.spark.connector.cql.CassandraConnector.createThriftClient(CassandraConnector.scala:134)
    at com.datastax.spark.connector.cql.CassandraConnector.withCassandraClientDo(CassandraConnector.scala:140)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner.partitions(CassandraRDDPartitioner.scala:115)
    at com.datastax.spark.connector.rdd.CassandraRDD.getPartitions(CassandraRDD.scala:273)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:51)
    at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:51)
    at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
    at java.util.TimSort.countRunAndMakeAscending(Unknown Source)
    at java.util.TimSort.sort(Unknown Source)
    at java.util.TimSort.sort(Unknown Source)
    at java.util.Arrays.sort(Unknown Source)
    at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
    at scala.collection.AbstractSeq.sorted(Seq.scala:40)
    at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
    at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
    at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:51)
    at org.apache.spark.rdd.PairRDDFunctions.leftOuterJoin(PairRDDFunctions.scala:392)
    at org.apache.spark.api.java.JavaPairRDD.leftOuterJoin(JavaPairRDD.scala:368)
    at com.datastax.spark.demo.JavaDemo.showResults(JavaDemo.java:189)
    at com.datastax.spark.demo.JavaDemo.run(JavaDemo.java:42)
    at com.datastax.spark.demo.JavaDemo.main(JavaDemo.java:203)

    1. Santhosh says:

      Hi Praveen,
      I got the same error. Did you find any solution for that. Thanks in advance.

  12. Pallabi Chakraborty says:

    In the same code, I added the first part of the code for generating tables

    private void generateData(JavaSparkContext sc) {

    CassandraConnector connector = CassandraConnector.apply(sc.getConf());

    try (Session session = connector.openSession()) {
    session.execute(“DROP KEYSPACE IF EXISTS java_api”);
    session.execute(“CREATE KEYSPACE java_api WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’: 1}”);
    session.execute(“CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST)”);
    session.execute(“CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)”);
    session.execute(“CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)”);
    }
    }

    I get the following error:
    Exception in thread “main” java.io.IOException: Failed to open native connection to Cassandra at {XX.XX.XX.XX}:9042
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:176)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73)
    at JavaDemo.generateData(JavaDemo.java:28)
    at JavaDemo.run(JavaDemo.java:18)
    at JavaDemo.main(JavaDemo.java:52)
    Caused by: java.lang.IllegalArgumentException: Contact points contain multiple data centers:
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.init(LocalNodeFirstLoadBalancingPolicy.scala:47)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1024)
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:270)
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:169)

    Is there anything i can do about it

  13. Hi, i am trying to run this code in my local machine, i have a vmware centos 6 image running a pseudo distributed spark node, a 1.3.0 cloudera version and a localhost cassandra node running on OS X yosemite.

    The code compiles fine, but i cannot run the code, i am using this two parameters:

    spark://192.168.30.154:7077 my-cassandra-node-001:9160

    I can see i can connect to spark but cassandra is not able. I just modified this file /Users/aironman/dsc-cassandra-2.1.9/conf/cassandra-env.sh
    exactly this line, decomment it

    JVM_OPTS=”$JVM_OPTS -Djava.rmi.server.hostname=my-cassandra-node-001″

    when i run the code, i am getting this exception:

    15/09/28 12:10:17 INFO slf4j.Slf4jLogger: Slf4jLogger started
    15/09/28 12:10:17 INFO Remoting: Starting remoting
    15/09/28 12:10:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@192.168.30.1:56178]
    15/09/28 12:10:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@192.168.30.1:56178]
    15/09/28 12:10:17 INFO spark.SparkEnv: Registering BlockManagerMaster
    15/09/28 12:10:17 INFO storage.DiskBlockManager: Created local directory at /var/folders/gn/pzkybyfd2g5bpyh47q0pp5nc0000gn/T/spark-local-20150928121017-d6e6
    15/09/28 12:10:17 INFO storage.MemoryStore: MemoryStore started with capacity 2.1 GB.
    15/09/28 12:10:17 INFO network.ConnectionManager: Bound socket to port 56179 with id = ConnectionManagerId(192.168.30.1,56179)
    15/09/28 12:10:17 INFO storage.BlockManagerMaster: Trying to register BlockManager
    15/09/28 12:10:17 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager 192.168.30.1:56179 with 2.1 GB RAM
    15/09/28 12:10:17 INFO storage.BlockManagerMaster: Registered BlockManager
    15/09/28 12:10:17 INFO spark.HttpServer: Starting HTTP Server
    15/09/28 12:10:17 INFO server.Server: jetty-7.6.8.v20121106
    15/09/28 12:10:17 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:56180
    15/09/28 12:10:17 INFO broadcast.HttpBroadcast: Broadcast server started at http://192.168.30.1:56180
    15/09/28 12:10:17 INFO spark.SparkEnv: Registering MapOutputTracker
    15/09/28 12:10:17 INFO spark.HttpFileServer: HTTP File server directory is /var/folders/gn/pzkybyfd2g5bpyh47q0pp5nc0000gn/T/spark-7d4fe302-462b-410d-95b4-62696b7bdf10
    15/09/28 12:10:17 INFO spark.HttpServer: Starting HTTP Server
    15/09/28 12:10:17 INFO server.Server: jetty-7.6.8.v20121106
    15/09/28 12:10:17 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:56181
    15/09/28 12:10:17 INFO server.Server: jetty-7.6.8.v20121106
    15/09/28 12:10:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
    15/09/28 12:10:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
    15/09/28 12:10:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
    15/09/28 12:10:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
    15/09/28 12:10:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
    15/09/28 12:10:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
    15/09/28 12:10:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
    15/09/28 12:10:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
    15/09/28 12:10:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
    15/09/28 12:10:17 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
    15/09/28 12:10:17 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    15/09/28 12:10:17 INFO ui.SparkUI: Started Spark Web UI at http://192.168.30.1:4040
    15/09/28 12:10:17 INFO client.AppClient$ClientActor: Connecting to master spark://192.168.30.154:7077…
    2015-09-28 12:10:18.013 java[17072:536877] Unable to load realm info from SCDynamicStore
    Exception in thread “main” java.net.UnknownHostException: my-cassandra-node-001:9160: nodename nor servname provided, or not known
    at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
    at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901)
    at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293)
    at java.net.InetAddress.getAllByName0(InetAddress.java:1246)
    at java.net.InetAddress.getAllByName(InetAddress.java:1162)
    at java.net.InetAddress.getAllByName(InetAddress.java:1098)
    at java.net.InetAddress.getByName(InetAddress.java:1048)
    at com.datastax.spark.connector.cql.CassandraConnectorConf$.apply(CassandraConnectorConf.scala:37)
    at com.datastax.spark.connector.cql.CassandraConnector$.apply(CassandraConnector.scala:208)
    at com.datastax.spark.connector.cql.CassandraConnector.apply(CassandraConnector.scala)
    at com.aironman.cassandra.JavaDemo.generateData(JavaDemo.java:44)
    at com.aironman.cassandra.JavaDemo.run(JavaDemo.java:37)
    at com.aironman.cassandra.JavaDemo.main(JavaDemo.java:179)

    Can anybody help me?

    thank you

  14. Abhishek K\ says:

    This example doesnt work with cassandra 2.1

  15. Jason says:

    What are the parameters to connect to a remote spark cluster instead of a “local” one?

    1. valchkou says:

      SparkConf conf = new SparkConf()
      .setMaster(“spark://host:7077”)
      .setAppName( “Name”)
      .set(“spark.cassandra.connection.host”, “host”)
      .set(“spark.cassandra.auth.username”, “usr”) // optional
      .set(“spark.cassandra.auth.password”, “pwd”); // optional

  16. Joe says:

    I had also some problems with the connection to a remote spark and was struggling a few days. Then I found out that the spark master url must be set to “local” also I connect to a remote cluster.

    So here is my working JavaSparkContext:

    final SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName(this.getClass().getSimpleName() + “#” + UUID.randomUUID());
    sparkConf.set(“spark.cassandra.connection.host”, 192.168.242.12);
    sparkConf.setMaster(“local”);
    final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

  17. Ashis says:

    Hi, Thanks for the nice post. However unable to understand which connector to use when? What is the latest most stable connector?

    Can you please explain this properly (Spark version + cassandra version + scala version + pom.xml file).

    I’m not finding any article on the latest connector. Stepwise guide will definitely help.

    Thanks,
    Ashis

    1. Chris says:

      Hi, trying to run this really cool example with Spark 2.0 and Cassandra 3.7, but I’m running into problems with my maven dependencies:

      org.apache.spark
      spark-core_2.11
      2.0.0

      org.apache.spark
      spark-sql_2.11
      2.0.0

      org.apache.spark
      spark-streaming_2.11
      2.0.0


      com.datastax.spark
      spark-cassandra-connector_2.11
      2.0.0-M2

      Can I run this example with Spark 2.0 and C* 3.7 at all? How would my POM file have to look like?

  18. Sam T says:

    The code needs to be updated with new methods for Spark2 and connector 2. Some parts were easy to fix, but 1 unable to fix:
    JavaPairRDD allSalesRDD = joinedRDD.flatMap(new PairFlatMapFunction<Tuple2<Integer, Tuple2>, Integer, BigDecimal>() {

    The method flatMap(FlatMapFunction<Tuple2<Integer,Tuple2>,U>) in the type
    AbstractJavaRDDLike<Tuple2<Integer,Tuple2>,JavaPairRDD<Integer,Tuple2>> is not applicable for the arguments (new
    PairFlatMapFunction<Tuple2<Integer,Tuple2>,Integer,BigDecimal>(){})

  19. Subhash says:

    Good tutorial for starters. I found the latest working code added by baghelamit here https://gist.github.com/baghelamit/f2963d9e37acc55474559104f5f16cf1

Comments

Your email address will not be published. Required fields are marked *




Subscribe for newsletter:

Tel. +1 (408) 933-3120 sales@datastax.com Offices France Germany

DataStax Enterprise is powered by the best distribution of Apache Cassandra™.

© 2017 DataStax, All Rights Reserved. DataStax, Titan, and TitanDB are registered trademark of DataStax, Inc. and its subsidiaries in the United States and/or other countries.
Apache Cassandra, Apache, Tomcat, Lucene, Solr, Hadoop, Spark, TinkerPop, and Cassandra are trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.