Introducing DSE Graph Frames

By Artem Aliev -  May 2, 2017 | 16 Comments

The DseGraphFrame package provides the Spark base API for bulk operations and analytics on DSE Graph. It is inspired by Databricks’ GraphFrame library and supports a subset of Apache TinkerPop™ Gremlin graph traversal language. It supports reading of DSE Graph data into a GraphFrame and writing GraphFrames from any format supported by Spark into DSE Graph.

The package ties DSE Analytics and DSE Graph components together even stronger than it was before! Spark users will have direct access to the Graph and Graph users will be able to perform bulk deletes, updates and have an advanced API for faster scanning operations.

In this blog post I will cover the main DseGraphFrame advantages and operations:

  1. Importing DSE Graph data into Spark DataFrames and GraphFrames
  2. Updating or deleting Vertex and Edges in DSE Graph
  3. Inserting Edges
  4. Inserting Vertices with custom ids.
  5. Combining Graph and non-graph data
    1. Update graph element properties
    2. Spark Streaming
    3. Join with any Spark supported sources
  6. API support for graph manipulation and graph algorithms:
    1. Spark GraphFrames http://graphframes.github.io
    2. A Subset of TinkerPop3 Gremlin
  7. Data Loading from various sources including Apache Cassandra®, JDBC, DSEFS, S3

DseGraphFrame has both Java and Scala Spark APIs at this moment.


 

Table of contents:

 


 

DSE Graph

DSE Graph is built with Apache TinkerPop and fully supports the Apache TinkerPop Gremlin language. A Graph is a set of Vertices and Edges that connect them. Vertices have a mandatory unique id. Edges are identified by two ends inV() and outV(). Both Edges and Vertices have labels that define type of the element and a set of properties.

Create Example

Let’s create a toy "friends" graph in gremlin-console, that will be used in following examples:

Start DSE server with graph and spark enabled.

#> dse cassandra -g -k


Run gremlin console

#> dse gremlin-console


Create empty graph

system.graph("test").create()


Create a short alias ‘g’ for it and define schema

:remote config alias g test.g
//define properties
schema.propertyKey("age").Int().create()
schema.propertyKey("name").Text().create()
schema.propertyKey("id").Text().single().create()
//define vertex with id property as a custom ID
schema.vertexLabel("person").partitionKey("id").properties("name", "age").create()
// two type of edges without properties
schema.edgeLabel("friend").connection("person", "person").create()
schema.edgeLabel("follow").connection("person", "person").create()

Add some vertices and edges

Vertex marko = graph.addVertex(T.label, "person", "id", "1", "name", "marko", "age", 29)
Vertex vadas = graph.addVertex(T.label, "person", "id", "2", "name", "vadas", "age", 27)
Vertex josh = graph.addVertex(T.label, "person", "id", "3", "name", "josh", "age", 32)
marko.addEdge("friend", vadas)
marko.addEdge("friend", josh)
josh.addEdge("follow", marko)
josh.addEdge("friend", marko)


You can use DataStax Studio for visualizations and to run commands instead of gremlin-console.

http://docs.datastax.com/en/dse/5.1/dse-dev/datastax_enterprise/studio/stdToc.html

DSE Graph OLAP

DSE Graph utilises the power of the Spark engine for deep analytical queries with Graph OLAP. It is easily accessed in gremlin console or Studio with ‘.a’ alias:

:remote config alias g test.a

DSE Graph OLAP has broader support for TinkerPop than the DseGraphFrame API. While Graph OLAP is the best for deep queries (those requiring several edge traversals), simple filtering and counts are much faster in the DseGraphFrame API

.gremlin> g.V().has("name", "josh").out("friend").out("friend").dedup()
==>v[{~label=person, id=2}]
==>v[{~label=person, id=3}]

 

DseGraphFrame

DseGraphFrame represents a Graph as two virtual tables: a Vertex DataFrame and an Edge DataFrame.

Let’s see how the graph looks in Spark:

#> dse spark
scala> val g = spark.dseGraph("test")
scala> g.V.show

+---------------+------+---+-----+---+
|             id|~label|_id| name|age|
+---------------+------+---+-----+---+
|person:AAAAAjEx|person|  1|marko| 29|
|person:AAAAAjEz|person|  3| josh| 32|
|person:AAAAAjEy|person|  2|vadas| 27|
+---------------+------+---+-----+---+

 

scala> g.E.show

 

+---------------+---------------+------+--------------------+
|            src|            dst|~label|                  id|
+---------------+---------------+------+--------------------+
|person:AAAAAjEx|person:AAAAAjEy|friend|29ea1ef0-139a-11e...|
|person:AAAAAjEx|person:AAAAAjEz|friend|2d65b121-139a-11e...|
|person:AAAAAjEz|person:AAAAAjEx|friend|33de7dc0-139a-11e...|
|person:AAAAAjEz|person:AAAAAjEx|follow|33702b91-139a-11e...|
+---------------+---------------+------+--------------------+

 

DseGraphFrame uses a GraphFrame-compatible format.  This format requires the Vertex DataFrame to have only one ‘id’ column and the Edge DataFrame to have hardcoded ‘src’ and ‘dst’ columns. Since DSE Graph allows users to define any arbitrary set of columns as the Vertex id and since there is no concept of `labels` in GraphFrame, DseGraphFrame will serialize the entire DSE Graph id into one ‘id’ column. The label is represented as part of the id and also as ‘~label’ property column.

DseGraphFrame methods list:

gf() returns GraphFrame object for graph frame API usage
V() returns DseGraphTraversal[Vertex] object to start  TinkerPop vertex traversa
E() returns DseGraphTraversal[Edge] object to start  TinkerPop edge traversal
cache()
persist()
cache the graph data with Spark
deleteVertices()
deleteEdges()
delete vertices or edges
deleteVertexProperties()
deleteEdgeProperties()
delete properties from the DB. It doesn’t change schema.
updateVertices()
updateEdges()
change properties or insert new vertices and edges.

 

DseGraphFrameBuilder

DseGraphFrameBuilder is a factory for DseGraphFrame.

Java API

is the same excluding graph initialization

//load a graph
DseGraphFrame graph = DseGraphFrameBuilder.dseGraph("test", spark);
//check some vertices
graph.V().df().show()

Scala API

A Scala implicit adds the factory dseGraph() method to a Spark session, so the Scala version is shorter:

// load a graph
val graph = spark.dseGraph("test_graph")
//check some vertices
graph.V.show

Spark GraphFrame

Java API

Use DseGraphFrame.gf() to get GraphFrame.

DseGraphFrameBuilder.dseGraph(String graphName, GraphFrame gf) method to return back to DseGraph

Scala API

Scala provides implicit conversions from GraphFrame to DseGraphFrame and back. It also converts GraphTraversals to DataFrames.So both GraphFrame filtering and TinkerPop traversal methods can be mixed.

TinkerPop3 Gremlin Support

The DseGraphFrame API supports a limited subset of the Gremlin language that covers basic traversal and update queries. TinkerPop traversals are generally more clear and intuitive compared to the GraphFrame motif search queries, so we recommend using Gremlin if possible.

See example of finding all Josh’s friends of friends:

//TinkerPop Gremlin in spark shell
scala>g.V.has("name", "josh").out("friend").out("friend").show
//GraphFrame motif finding is less readable
scala>g.find("(a)-[e]->(b); (b)-[e2]->(c)").filter(" a.name = 'josh' and e.`~label` = 'friend' and e2.`~label` = 'friend'").select("c.*").show

Both outputs are the same, but Gremlin looks much shorter and more readable.

+---------------+------+---+-----+---+
|             id|~label|_id| name|age|
+---------------+------+---+-----+---+
|person:AAAAAjEz|person|  3| josh| 32|
|person:AAAAAjEy|person|  2|vadas| 27|
+---------------+------+---+-----+---+

List of Gremlin query methods supported by DseGraphFrame:

Step Method
CountGlobalStep count()
GroupCountStep groupCount()
IdStep id()
PropertyValuesStep values()
PropertyMapStep propertyMap()
HasStep has(), hasLabel()
IsStep is()
VertexStep to(), out(), in(), both(), toE(), outE(), inE(), bothE()
EdgeVertexStep toV(), inV(), outV(), bothV()
NotStep not()
WhereStep where()
AndStep and(A,B)
PageRankVertexProgramStep pageRank()

 

Bulk Drop and Property Updates

DseGraphFrame is currently the only way to drop millions of vertices or edges at once. It is also much faster for bulk property updates than other methods. For example to drop all ‘person’ vertices and their associated edges:

scala>g.V().hasLabel("person").drop().iterate()

List of Gremlin update methods supported by DseGraphFrame:

DropStep V().drop(),E().drop(),properties().drop()
AddPropertyStep property(name, value, ...)

The Traverser concept and side effects are not supported.

Java API

The DseGraphFrame V() and E() methods returns a GraphTraversal, this is a java interface, so all methods exists but some of them throw UnsupportedOperationException. The GraphTraversal is a java iterator and also has toSet() and toList() methods to get query results:

//load a graph
DseGraphFrame graph = DseGraphFrameBuilder.dseGraph("test", spark);
//print names
for(String name: graph.V().values("name")) System.out.println(name);

To finish a traversal and return to the DataFrame API instead of list or iterator use the .df() method:

graph.V().df()

Scala API

DseGraphFrame supports implicit conversion of GraphTraversal to DataFrame in scala.The following example will traverse Vertices with TinkerPop and then show result as DataFrame

scala>g.V().out().show

In some cases the Java API is required to get correct TinkerPop objects.

For example, to extract the DSE Graph Id object the Traversal java iterator can be converted to a scala iterator which allows direct access to the TinkerPop representation of the Id. This method allows using the original Id instead of DataFrame methods which return the DataFrame String representation of the Id, you can also use toList() and toSet() methods to get appropriate id set

.scala> import scala.collection.JavaConverters._
scala> for(i <-g.V().id().asScala) println (i)
{~label=person, id=1}
{~label=person, id=3}
{~label=person, id=2}scala> g.V.id.toSet
res12: java.util.Set[Object] = [{~label=person, id=2}, {~label=person, id=3}, {~label=person, id=1}]

Logical operations are supported with TinkerPop P predicates class

g.V().has("age", P.gt(30)).show
T.label constant could be used to point label
g.E().groupCount().by(T.label).show

Note: Scala is not always able to infer return types, especially in the spark-shell. Thus to get property values, the type should be provided explicitly:

g.V().values[Any]("name").next()
//or
val n: String = g.V().values("name").next()

The same approach is needed to drop a property from the spark-shell. To query property before drop you should pass the type of the property ‘[Any]’:

g.V().properties[Any]("age", "name").drop().iterate()

The Dataframe method looks more user friendly in this case:

scala> g.V().properties("age", "name").drop().show()
++
||
++
++
scala>  g.V().values("age").show()
+---+
|age|
+---+
| 29|

DseGraphFrame updates

Spark has various sources. As far as you can get this data as DataFrame that has ‘id’, ‘~label’ and one or more properties column you can load this date into the graph. Format the DataFrame to proper format and call one of the update methods:

val v = new_data.vertices.select ($"id" as "_id", lit("person") as "~label", $"age")
g.updateVertices (v)
val e = new_data.edges.select (g.idColumn(lit("person"), $"src") as "src", g.idColumn(lit("person"), $"dst") as "dst",  $"relationship" as "~label")
g.updateEdges (e)

Spark Streaming Example

DataFrame could come from any sources even from Spark streaming. You just need to get DataFrame in an appropriate format and call updateVertices or updateEdges:

dstream.foreachRDD(rdd => {
val updateDF = rdd.toDF("_id", "messages").withColumn("~label", lit("person"))
graph.updateVertices(updateDF)
})

Full source code of the streaming application could be found here

GraphX, GraphFrame and DataFrame

DseGraphFrame can return a GraphFrame representation of the graph with the DseGraphFrame.gf() methods. That give you access to all GraphFrame and GraphX advanced algorithms. It also allows you to build sophisticated queries that are not yet supported by the DseGraphFrame subset of the TinkerPop API.A DseGraphTraversal can return its result as a DataFrame with the df() method. In the Scala API an implicit conversion is provided for conversion from traversal to DataFrame, so all DataFrame methods are available on the DseGraphTraversal.

scala> g.V.select(col("name")).show
+-----+
| name|
+-----+
|marko|
| josh|
|vadas|
+-----+

GraphFrame Reserved Column Names

GraphFrame uses the following set of columns internally:"id", "src", "dst", "new_id", "new_src", "new_dst", "graphx_attr"TinkerPop properties with these names will be prepended with "_" when represented inside a GraphFrame/DataFrame.

Querying DSE Graph with SparkSQL

Spark data sources allow one to query graph data with SQL. There are com.datastax.bdp.graph.spark.sql.vertex and com.datastax.bdp.graph.spark.sql.edge for vertices and edges.The result tables are in Spark GraphFrame compatible format. To permanently register tables for Spark SQL or JDBC access (via the Spark SQL Thriftserver), run the following commands in a `dse spark-sql` session:

spark-sql> CREATE DATABASE graph_test;
spark-sql> USE graph_test;
spark-sql> CREATE TABLE vertices USING com.datastax.bdp.graph.spark.sql.vertex OPTIONS (graph 'test');
spark-sql>  CREATE TABLE edges USING com.datastax.bdp.graph.spark.sql.edge OPTIONS (graph 'test');

In addition to operating on the graph from Spark via Scala, Java, and SQL, that method allows you to query and modify the graph from Spark Python or R:

Scala

#>dse spark
scala> val df = spark.read.format("com.datastax.bdp.graph.spark.sql.vertex").option("graph", "test").load()
scala> df.show

 

PySpark

#>dse pyspark
>>> df = spark.read.format("com.datastax.bdp.graph.spark.sql.vertex").load(graph = "test")
>>> df.show()

 

SparkR

#>dse sparkR
> v <- read.df("", "com.datastax.bdp.graph.spark.sql.vertex", graph="test")
>head(v)

 

Export Graph

A DseGraphFrame is represented as two DataFrames, so it is easy to export DSE Graph data to any format supported by Spark.

scala> g.V.write.json("dsefs:///tmp/v_json")
scala> g.E.write.json("dsefs:///tmp/e_json")

This will create two directories in the DSEFS file system with vertex and edge data in JSON formatted in text files.

You can copy the data to the local filesystem (if there is capacity):

#>dse hadoop fs -cat /tmp/v_json/* > local.jsonor use the data for offline analytics, by loading it back from DSEFS:val g = DseGraphFrameBuilder.dseGraph("test", spark.read.json("/tmp/v.json"), spark.read.json("/tmp/e.json"))

 

Export to CSV

Spark CSV does not support arrays or structs. This means that multi-properties and properties with metadata must be converted before exporting.

For example: let "nicknames" column be a multi property with metadata, it will be represented as array(struct(value, metadata*)))

To save it and id you will need following code:

val plain = g.V.select (col("id"), col("~label"), concat_ws (" ", col("nicknames.value")) as "nicknames")
plain.write.csv("/tmp/csv_v")

 

Importing Graph Data into DSE Graph

DseGraphFrame is able to insert data back to DSE Graph. The parallel nature of Spark makes the inserting process faster than single-client approaches. The current DseGraphFrame API supports inserting only to Graphs with a Custom ID and the process is experimental in DSE 5.1.

Limitations: Custom ID only! Graph schema should first be created manually in gremlin-console.

Import previously exported graph

  1. Export schema:

In the gremlin-console:

gremlin> :remote config alias g gods.g
gremlin> schema.describe()

Copy the schema and apply it to new graph:

system.graph('test_import’').create()
:remote config alias g test_import.g
// paste schema here
  1. Back to spark and import V and E
val g = spark.dseGraph("test_import")
g.updateVertices(spark.read.json("/tmp/v.json"))
g.updateEdges(spark.read.json("/tmp/e.json"))

 

Import custom graph

Let’s add data into our test graph. I will use a GraphFrame example graph which has a similar structure to to our graph schema. I will tune the schema to be compatible and then update our graph with new vertices and edges.

scala> val new_data = org.graphframes.examples.Graphs.friends

It consists of two DataFrames.  Let’s check schemas:

scala> new_data.vertices.printSchema
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
scala> new_data.edges.printSchema
root
|-- src: string (nullable = true)
|-- dst: string (nullable = true)
|-- relationship: string (nullable = true)

Open our graph and check the expected schema.

scala> val g = spark.dseGraph("test")
scala>g.V.printSchema
root
|-- id: string (nullable = false)
|-- ~label: string (nullable = false)
|-- _id: string (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
scala>g.E.printSchema
root
|-- src: string (nullable = false)
|-- dst: string (nullable = false)
|-- ~label: string (nullable = true)
|-- id: string (nullable = true)

The labels need to be defined for vertices and some id columns need to be renamed. Vertex serialized IDs will be calculated by DSE Graph, but an explicit mapping using the idColumn() function is required for Edge fields ‘src’ and ‘dst’.

val v = new_data.vertices.select ($"id" as "_id", lit("person") as "~label", $"name", $"age")
val e = new_data.edges.select (g.idColumn(lit("person"), $"src") as "src", g.idColumn(lit("person"), $"dst") as "dst",  $"relationship" as "~label")

Append them in the graph:

g.updateVertices (v)
g.updateEdges (e)

This approach can be applied to data from any data source supported by Spark: JSON, JDBC, etc. Create a DataFrame, update schema with a select() statement (or other Spark transformations) and update the DSE Graph via updateVertices() and updateEdges().


 









DataStax has many ways for you to advance in your career and knowledge.

You can take free classes, get certified, or read one of our many white papers.



register for classes

get certified

DBA's Guide to NoSQL







Comments

    1. Artem Aliev says:

      Post is updated with the correct one:
      http://docs.datastax.com/en/dse/5.1/dse-dev/datastax_enterprise/studio/stdToc.html
      Thank you for the finding

  1. Sharon O'Connor says:

    It is not clear to me at all how to use GraphFrameBuilder in studio.

    1. Artem Aliev says:

      DseGraphFrame library is for Spark only. So you can use it only Scala or Java Spark applications. GraphFrameBuilder is targeted for Java. All examples (except loading initial example) is done in ‘dse spark’ console. So you can not use DseGraphFrame in DSE Studio.

      1. stephen says:

        Can you share details of maven module for DseGraphFrameBuilder.

        1. Artem Aliev says:

          DseGraphFrameBuilder is part of DSE jars and no maven artefact available yet. So you should use unmanaged DSE jars. `dse spark-classpath` could be used to get the classpath. The example build is here:
          https://github.com/datastax/graph-examples/tree/master/dse-graph-frame

        2. Artem Aliev says:

          com.datastax.dse:dee-spark-dependencies:5.1.1 incudes DseGraphFrame now. It should be added to dependencies with provided scope. It describes dse spark classpath jars.

  2. Adam Jackson says:

    i get Task not serialized exception when using g.idColumn in a select statement saying that DseGraphFrame object is not serialized
    g object was created like this:
    val g = spark.dseGraph(“mygraph”)

    * the graph exists and im am able to see the data it already has

    1. Artem Aliev says:

      Interesting… Scala try to serialise g/DseGraphFrame in case it is part of the query
      scala> g.V.df.select (g.idColumn(lit(“person”), $”_id”) as “id”).show
      org.apache.spark.SparkException: Task not serialisable

      idColumn() function call does not include g into closue:
      scala> val new_data = org.graphframes.examples.Graphs.friends

      scala> new_data.vertices.select (g.idColumn(lit(“person”), $”id”) as “id”).show

      1. Adam Jackson says:

        Thanks for the reply,
        it was pretty interesting for me too.
        So for now is there any workaround or should i just wait for a patch? (my guess is that just adding an “extends Serializable” to the DseGraphFrame object should fix it)

        1. Artem Aliev says:

          You are right “extends Serializable” is enough. I did not find an easy workaround, so please wait for the patch release. But.
          You can updated vertices by just passing natural ids: (DGF id will be calculated)

          g.updateVertices (g.V.df.select(lit(“person”) as “~label”, $”_id”, lit (10) as “age”))

          Then you can select DGF ids from vertex table and use them to insert edges.

          1. Adam Jackson says:

            Thanks for the tip updating the vertices work great and very fast.
            Also i just installed DSE 5.1.1, and the exception keeps happening and si i still cant update the edges.
            So my question is:
            Is that fix planned for the upcoming patches? (5.1.X versions)

          2. Artem Aliev says:

            The fix is in 5.1.2

  3. shrikant says:

    I think spark with dse graph is good combination.
    The issue I see is that it is quite open and confusing for readers whether to use gremlin/ spark, graph olap etc. and how spark is installed on DSE. Hope to see a end-to-end and clear documentation..

  4. Arthur says:

    Thats looks like a very good combination.
    importing graphs using DseGraphFrames did a huge boost to batch loading times.
    though i think i encountered a pretty major bug
    when updating a graph that has multiple vertex labels where each label partitioned by some property and trying to load one kind of label throws an exception that a primary key is missing (for a vertex label that is not included in the dataframe im trying to load)

    1. Artem Aliev says:

      You can add a fake column with that primary key name as a workaround.
      val ndf = df.withColumn(“pk”, lit(null))
      I’ll take a look at the problem and will fix

Comments

Your email address will not be published. Required fields are marked *




Subscribe for newsletter:

Tel. +1 (408) 933-3120 sales@datastax.com Offices France GermanyJapan

DataStax Enterprise is powered by the best distribution of Apache Cassandra™.

© 2017 DataStax, All Rights Reserved. DataStax, Titan, and TitanDB are registered trademark of DataStax, Inc. and its subsidiaries in the United States and/or other countries.
Apache Cassandra, Apache, Tomcat, Lucene, Solr, Hadoop, Spark, TinkerPop, and Cassandra are trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.