Kindling Part 2: An Introduction to Spark with Cassandra
Erich Ess, Chief Technology Officer at SimpleRelevance
Erich Ess is the son of a Swiss sailor and an American librarian. He studied Computer Science and Pure Mathematics at Purdue University, where he wrote papers on scientific visualization. He then went to work for Northrop Grumman, Verizon, and GreatCall as a software engineer. At Verizon and GreatCall he spent is time building highly scalable, reliable enterprise systems. He is currently the CTO of SimpleRelevance’s leading team which is building advanced machine learning algorithms.
Welcome to the second half of my blog post about using Spark with Cassandra. The previous post focused on getting Spark setup, the basics on how to program with Spark, then a small demonstration of Spark’s in memory processing, and finally how to interact with Cassandra from Spark. This post will use those basics to accomplish a simple but common task: Extract, Transform, and Load. I will use Spark to parse and load the MovieLens dataset into Cassandra and then perform some simple analytics.
The data in this example comes from the MovieLens datasets (http://grouplens.org/datasets/movielens/); specifically, the “MovieLens 1M” dataset. This dataset contains 1 million ratings for 4 thousand movies submitted by around 6 thousand users.
I will show the Cassandra schema I am using to store the raw data from the text files. However, I will skip over the code that I used to take the data from the raw text files and load it into Cassandra, since that step isn’t relevant to using Spark.
(filename text, line_number int, line_text text, Primary Key(filename, line_number))
This data will be transformed into the following schemas:
(id int, name text, age int)
(id int, name text, year int)
(id int, user_id int, movie_id int, rating float)
A link to a Gist with the CQL commands: https://gist.github.com/erichgess/a02aefddd6231c91babb#file-cassandra-schemas
A link to the small Python script I used to load the raw source files into the “raw_files” table: https://gist.github.com/erichgess/a02aefddd6231c91babb#file-data_loader-py
In the Spark Shell:
Processing the Raw Data:
First we need to setup an RDD for the raw data table. One of the nice things about Spark is that we can automatically map the data being read from Cassandra to an easy to work with data type. Here I am going to use a case class data type:
case class RawFileData(Filename: String, Line: Int, Contents: String )
val raw_files = sc.cassandraTable[RawFileData]("spark_demo", "raw_files")
This gives me an RDD of type RawFileData which I can then iterate over and process.
I’ll start by filtering to just the raw data from the “users.dat” file:
val raw_users = raw_files.filter( r => r.Filename == "users.dat" )
One thing to note about Spark is that the transformations are lazily evaluated, which means that nothing is done until an action is performed on an RDD. And when that action is performed, only the bare minimum amount of work required for the result is done.
For example, I can run the first action to get the first entry in the raw_users RDD:
This will pull just that first line from Cassandra. I find that first is a good way to validate that the correct data is being used in the RDD.
Now I want to transform that raw user data into something that I can easily use in code. A case class for the user will be perfect. Simply, use a chain of transformations to take the raw user data and transform it into an RDD of data type User.
case class User(Id: Int, Age: Int, Gender: String, Occupation: Int, Zip: String )
val users = raw_users.map( l => l.Contents.trim.split("::") ).map( v => User(Id = v(0).toInt, Age = v(2).toInt, Gender=v(1), Occupation=v(3).toInt, Zip=v(4)))
Now I will save that back into Cassandra. Note that no action is performed on the raw user data until I save it to Cassandra:
users.saveToCassandra("spark_demo", "users" )
Repeating this for the Movies and Ratings:
val raw_movies = raw_files.filter( r => r.Filename == "movies.dat" ) val movies = raw_users.map( l => l.Contents.trim.split("::") ).map( v => Movie(Id = v(0).toInt, Title = v(1), Genres=v(2))) movies.saveToCassandra(“spark_demo”, “movies”) val raw_ratings = raw_files.filter( r => r.Filename == "ratings.dat" ) val ratings = raw_ratings.map( l => l.Contents.trim.split("::") ).map( v => Rating(UserId = v(0).toInt, MovieId = v(1).toInt, Rating=v(2).toFloat)) ratings.saveToCassandra(“spark_demo”, “ratings”)
Here’s a Gist with the above code: https://gist.github.com/erichgess/a02aefddd6231c91babb#file-etl-sc
In Memory Processing
Something that may have been apparent in the above example is that all the ETL processes came from the same source of data in Cassandra: the table raw_files. Doing multiple independent manipulations on the same source of data is a great opportunity to demonstrate Spark’s in-memory features.
To use in-memory processing simply tell Spark to cache a specific RDD. Then, when Spark pulls data from Cassandra it will keep the data in memory across the cluster. Each future computation on that RDD will be done from the in-memory data set. If you do not use this feature, then Spark will pull the data from Cassandra for each computation.
To cache the raw RDD call the cache function when creating the RDD:
val raw = sc.cassandraTable[RawFileData]("spark_demo", "raw_files").cache
To demonstrate the performance improvement provided by caching I will run the ETL for users, movies, and ratings comparing the performance for when the raw data has been cached before ETL and when it has not been cached before ETL.
|Data||Without Caching Time||With Caching Time|
|Users||29.78 seconds||0.267 seconds|
|Movies||27.65 seconds||.43 seconds|
|Ratings||58 seconds||17.9 seconds|
Just a note: these tests were run using a 15” MacBook Pro with 16GB of RAM, a single Cassandra server running locally on a Vagrant VM and a single Spark Shell running locally.
Repeat for Movies and Reviews
Some Simple Analytics
Now that we have the data, let’s do some simple analytics.
- Find the top 10 most prolific reviewers:
val reviews = sc.cassandraTable[Review](“spark_demo”, “reviews”)
val top_10 = reviews.groupBy( x => x.user_id ).map( x => (x._2.count, x._1)).top(10)
- top will take the the first 10 elements sorted in descending order by the implicit ordering of tuples in Scala, which compares first the first element of the tuple and then the second. This is why the mapping put the number of ratings first and the user id second.
- Find the movie with the most reviews:
val top_movie = reviews.map(r => (r.movie_id,1)).reduceByKey((k,v) => v+1).map(p => (p._2, p._1)).top(1)._2
- Find the movie with the highest average rating:
val top_average = ratings.map( r => (r.MovieId, (r.Rating, 1))).reduceByKey( (n,c) => (n._1 + c._1, n._2 + 1)).map( p => (p._2._1 / p._2._2, p._1)).top(1)._2
The reason this uses reduceByKey rather than groupBy is to avoid shuffling data. groupBy causes data to be shuffled through the cluster which means the data in the RDD will have to be redistributed across all the Spark worker nodes. reduceByKey avoids this, but is less flexible than groupBy.
This blog post originally appeared on Planet Cassandra.
DataStax has many ways for you to advance in your career and knowledge.
You can take free classes, get certified, or read one of our many white papers.
register for classes
DBA's Guide to NoSQL