This guide highlights best practices for loading data with the DseGraphFrame package. The DseGraphFrame package provides a Spark API for bulk operations and analytics on DataStax Graph. It is inspired by Databricks’ GraphFrame library and supports a subset of Apache TinkerPop™ Gremlin graph traversal language. The DseGraphFrame package supports reading of DataStax Graph data into a GraphFrame, and writing GraphFrames from any format supported by Spark into DataStax Graph. For a review of our initial offering and more introductory examples see our Introducing DataStax GraphFrames blog post. Technical documentation on general usage of DataStax Enterprise GraphFrames (DGF) can be viewed in our DSE GraphFrame overview page.

Common Pitfalls

Null unset issue can cause excessive tombstones

In version prior to 6.0.7 and 6.7.3, if a user omitted columns during DSE GraphFrames edge updates, the missing columns fields were implicitly written to DSE with null values, causing unintended deletions, tombstone build-up, and ultimately excessive stress on the system.

The workaround at the time was to set spark.setCassandraConf(Map("spark.cassandra.output.ignoreNulls" -> "true")), which will ignore unset or null-valued columns and not create unintended deletions on the server side. In DSE versions 6.0.7, 6.7.3, and higher the default value for ignoreNulls is true.

Unintended caching can lead to OOM exceptions

Prior to DSE versions 5.1.14, 6.0.5, and 6.7.1, a problem existed such that during a DataStax GraphFrame bulk loading job, the Spark cache was being used by default, but not explicitly emptied. This lead to OutOfMemory(OOM) errors and other issues. The Spark Cassandra Connector parameter spark.dse.graphframes.update.persistLevel was introduced that allows better control over Spark caching levels.

Additionally, a new cache parameter was introduced in the multi-label update methods that can be used as a workaround if the user wishes to explicitly uncache data after use. More details on this coming soon...

How to workaround Materialized Views during bulk loading

When indexing with Materialized Views is desired, it is often recommended to enable this after the data has been loaded because it significantly affects insertion performance. We expect about a 10% performance penalty per MV, and there are some subtleties to be aware of when defining the data model, see the Materialized View Performance in Cassandra 3.x blog post for more details.

Recommended steps for bulk loading data:

  1. Drop all indices

  2. Bulk load data

  3. Recreate indices

After data is loaded, and one enables indexing, how do we know when it's done? There is a nodetool viewbuildstatus command for accomplishing exactly this.

How to manage multi/meta-properties

Here is an example of updating vertex multi and meta-properties. Suppose we add a multi-property called nicknames, which itself has meta-properties name time and date. The person vertex label will be defined with the nicknames property.


schema.propertyKey("time").Timestamp().single().create() schema.propertyKey("date").Date().single().create() schema.propertyKey("nicknames").Text().multiple().create() schema.propertyKey("nicknames").properties("time", "date").add() schema.vertexLabel("person"). partitionKey("name", "ssn"). clusteringKey("age"). properties("address", "coffeePerDay", "nicknames"). create()

We'll start with 2 person vertices, 2 software vertices, and 2 created edges. Notice none of the person vertices have nicknames set yet.


scala> g.V().show(false) +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+---------+ |id |~label |name |version|lang |temp |static_property|ssn |age |address |coffeePerDay|nicknames| +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+---------+ |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5 |software|timer |2 |groovy|100 |alpha |null |null|null |null |null | |software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ== |software|chat |1 |scala |mambo|beta |null |null|null |null |null | |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|person |isabella|null |null |null |null |111-11-1111|2 |2420 P Street |50 |null | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |person |rocco |null |null |null |null |222-22-2222|28 |1017 Mohar Street|25 |null | +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+---------+ scala> g.E().show(false) +-------------------------------------------------------+---------------------------------------------+-------+------------------------------------+------+ |src |dst |~label |id |weight| +-------------------------------------------------------+---------------------------------------------+-------+------------------------------------+------+ |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ==|created|296d28c0-e62c-11e9-ace5-5b43d7c0da8d|1.0 | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5|created|f4774f00-e62c-11e9-ace5-5b43d7c0da8d|1.0 | +-------------------------------------------------------+---------------------------------------------+-------+------------------------------------+------+

Now let’s add nicknames and its meta-properties. First construct a DataFrame consisting of the id of the vertex named rocco, along with the nicknames property and meta-properties we wish to update.


scala> val df = Seq(("person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw=", "docRoc", java.sql.Date.valueOf("2017-01-01"), new java.sql.Timestamp(100L))).toDF("id", "nicknames", "date", "time") df: org.apache.spark.sql.DataFrame = [id: string, nicknames: string ... 2 more fields] scala> +---------------------------------------------------+---------+----------+---------------------+ |id |nicknames|date |time | +---------------------------------------------------+---------+----------+---------------------+ |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw=|docRoc |2017-01-01|1970-01-01 00:00:00.1| +---------------------------------------------------+---------+----------+---------------------+

Now we create a new DataFrame consisting of just the id of the vertex and the nickname property to update.


scala> val updateDF ="id"), array(struct($"nicknames", $"date", $"time")) as "nicknames") updateDF: org.apache.spark.sql.DataFrame = [id: string, nicknames: array<struct<nicknames:string,date:date,time:timestamp>>]

Notice how we construct the nicknames fields in this DataFrame, it is an array of struct type.


scala> updateDF.printSchema root |-- id: string (nullable = true) |-- nicknames: array (nullable = false) | |-- element: struct (containsNull = false) | | |-- nicknames: string (nullable = true) | | |-- date: date (nullable = true) | | |-- time: timestamp (nullable = true) scala> +---------------------------------------------------+-------------------------------------------+ |id |nicknames | +---------------------------------------------------+-------------------------------------------+ |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw=|[[docRoc,2017-01-01,1970-01-01 00:00:00.1]]| +---------------------------------------------------+-------------------------------------------+

Now we update vertices using this updated DataFrame. Notice we are using the multi-vertex label API of the vertex update method.


scala> g.updateVertices(updateDF) scala> g.V().show(false) +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+-------------------------------------------+ |id |~label |name |version|lang |temp |static_property|ssn |age |address |coffeePerDay|nicknames | +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+-------------------------------------------+ |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5 |software|timer |2 |groovy|100 |alpha |null |null|null |null |null | |software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ== |software|chat |1 |scala |mambo|beta |null |null|null |null |null | |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|person |isabella|null |null |null |null |111-11-1111|2 |2420 P Street |50 |null | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |person |rocco |null |null |null |null |222-22-2222|28 |1017 Mohar Street|25 |[[docRoc,1970-01-01 00:00:00.1,2017-01-01]]| +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+-------------------------------------------+

Alternatively, we could use the Apache TinkerPop™ property update syntax with DataStax GraphFrames


scala> g.V().has("name", "rocco").property("nicknames", "docRoc", "date", java.sql.Date.valueOf("2017-01-01"), "time", new java.sql.Timestamp(100L)).iterate()

It is worth noting that regardless of the approach used, updateVertices or Apache TinkerPop™ update syntax with DGF, multi-properties are append only. For example, if we look at the vertex named rocco after executing the second update with the Apache TinkerPop™ syntax, we'll see two entries in the nicknames column.


scala> g.V().show(false) +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+--------------------------------------------------------------------------------------+ |id |~label |name |version|lang |temp |static_property|ssn |age |address |coffeePerDay|nicknames | +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+--------------------------------------------------------------------------------------+ |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5 |software|timer |2 |groovy|100 |alpha |null |null|null |null |null | |software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ== |software|chat |1 |scala |mambo|beta |null |null|null |null |null | |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|person |isabella|null |null |null |null |111-11-1111|2 |2420 P Street |50 |null | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |person |rocco |null |null |null |null |222-22-2222|28 |1017 Mohar Street|25 |[[docRoc,1970-01-01 00:00:00.1,2017-01-01], [docRoc,1970-01-01 00:00:00.1,2017-01-01]]| +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+--------------------------------------------------------------------------------------+

How to carry out idempotent edge updates

When updating edges users should provide a valid and unique UUID for the id column.

Suppose we start with the following graph schema, our examples will look at updates with the lives edge label.


// truncated example for brevity schema.propertyKey("nicknames").Text().multiple().create() schema.propertyKey("reason").Text().single().create() schema.propertyKey("age").Int().single().create() schema.propertyKey("name").Text().single().create() schema.propertyKey("date").Date().single().create() schema.propertyKey("nicknames").properties("time", "date").add() schema.vertexLabel("location").properties("name").create() schema.vertexLabel("god").properties("name", "age", "nicknames").create() schema.vertexLabel("god").index("god_age_index").secondary().by("age").add() schema.vertexLabel("god").index("god_name_index").secondary().by("name").add() schema.edgeLabel("lives").multiple().properties("reason").create() schema.edgeLabel("lives").connection("god", "location").add() // add data Vertex neptune = graph.addVertex(T.label, "god", "name", "neptune", "age", 4500); Vertex sea = graph.addVertex(T.label, "location", "name", "sea"); neptune.addEdge("lives", sea).property("reason", "loves waves");

The edge table maintains data for the lives edge in the src, dst, ~label, id, and reason columns.


DseGraphFrame gf = DseGraphFrameBuilder.dseGraph(keyspace, spark); gf.E().df().show(false); +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |src |dst |~label |id |time |name |reason | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |god:RnS4AAAAAAAAAAAE |location:RnS4AAAAAAAAAAAJ|lives |f695a6b0-4500-11e9-8e88-fb68397e4bea|null |null |loves waves | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+

Now let's grab the edge that has a loves waves value in the reason column, then overwrite with the word New.


DseGraphFrame gf = DseGraphFrameBuilder.dseGraph(keyspace, spark); Dataset<Row> u ="reason = 'loves waves'").drop("time").drop("reason").drop("name").withColumn("reason", functions.lit("New"));

This gives us the following DataFrame that can be used to update the edge. Notice that because we are using an existing row and simply reinserting it with a new reason, we get the unique id for free.

java; +--------------------+-------------------------+------+------------------------------------+------+ |src |dst |~label|id |reason| +--------------------+-------------------------+------+------------------------------------+------+ |god:RnS4AAAAAAAAAAAE|location:RnS4AAAAAAAAAAAJ|lives |f695a6b0-4500-11e9-8e88-fb68397e4bea|New | +--------------------+-------------------------+------+------------------------------------+------+

We can now update the edge using the updateEdges method, and as expected we see the edge with id value f695a6b0-4500-11e9-8e88-fb68397e4bea has the reason column set with the new value New.


gf.updateEdges(u); gf.E().df().show(false); +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |src |dst |~label |id |time |name |reason | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |god:RnS4AAAAAAAAAAAE |location:RnS4AAAAAAAAAAAJ|lives |f695a6b0-4500-11e9-8e88-fb68397e4bea|null |null |New | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+

In general, it’s important to note that when updating edges you must include the id column in the dataset for the existing edges to be updated. If a user omits the id column and instead only supplies the src, dst, and ~label, they will end up duplicating edges with auto-generated IDs.

Key order matters when using the idColumn method

The idColumn(label: String, idColumns: Column*): Column method is a utility used for generating GraphFrame compatible IDs, see the API documentation for more details. When using idColumn for vertices that have multiple key columns it is important to provide the key columns to idColumn in the same order in which they are defined in the schema.

Suppose we have the following name vertex label


gremlin> schema.vertexLabel("name"). ......1> partitionKey("internal_party_id"). ......2> clusteringKey( ......3> "prefx_nm", ......4> "first_nm", ......5> "mdl_nm", ......6> "last_nm", ......7> "sufx_nm", ......8> "name_line_desc" ......9> ).create()

When passing the keys to the idColumn method, the order must match the order defined in the schema. Notice in the example below the order of keys provided when constructing the dst column.


scala> val hasNameEdges = nameVertices .drop(col("~label")) .withColumn("src", nameVertices.col("partyId")) .withColumn("dst", g.idColumn( lit("name"), nameVertices.col("internal_party_id"), nameVertices.col("prefx_nm"), nameVertices.col("first_nm"), nameVertices.col("mdl_nm"), nameVertices.col("last_nm"), nameVertices.col("sufx_nm"), nameVertices.col("name_line_desc") )) .withColumn("~label", lit("has_name")) .drop(col("internal_party_id")) .drop(col("partyId")) .drop(col("first_nm")) .drop(col("last_nm")) .drop(col("mdl_nm")) .drop(col("name_line_desc")) .drop(col("prefx_nm")) .drop(col("sufx_nm")) scala> g.updateEdges(hasNameEdges)

The new API for updating single labels was introduced to address this issue and simplify the user experience.

Considerations for Loading Big Graphs

Spark Cassandra Connector tuning parameters still apply with DataStax Enterprise GraphFrames

To increase write performance during DataStax Enterprise GraphFrames bulk loading, remember that our existing Spark Cassandra Connector tuning parameters still apply.

For example, spark.cassandra.output.concurrent.writes has been found to be one of the most intuitive and effective parameters to tune during load testing. Other parameters such as spark.cassandra.output.throughputMBPerSec (formerly spark.cassandra.output.throughput_mb_per_sec) can be very helpful as well. In cases where one expects a long insertion workload, it may be wise to down-tune spark.cassandra.output.throughputMBPerSec appropriately to avoid overwhelming the database cluster.

The spark.cassandra.connection.keepAliveMs may also be useful in scenarios with long-running insertion workloads where connections may experience longer than expected periods of inactivity, a potential side-effect of periodic delays while processing insertions/updates on the server.

Here are examples of using these parameters:


dse spark-submit \ --conf "spark.cassandra.output.concurrent.writes=100" \ --conf "spark.cassandra.connection.keepAliveMS=120000" \ --conf "spark.cassandra.output.throughputMBPerSec=50" \ --class com.datastax.DataImport target/data-import-1.0-SNAPSHOT.jar \ newapi

Avoid over tuning your application on a small dataset

Be careful when tuning with a small dataset, very likely parameters tuned for short insertion workload will not behave similarly for longer more intensive workloads. A longer sustained insertion workload will lead to more data and more severe effects from background tasks such as memtable flushing, compaction, query routing, etc. In short, an incremental approach is recommended when loading large datasets. Try loading say 10-20% of the data, making note of parameters, cluster size, and overall node health during the process (e.g. lookout for obvious things like timeout exceptions, etc).

Also, increasing the cluster size can serve as an effective strategy in reducing individual node stress and improving overall ingestion performance. Again there is not a one-size-fits-all solution here, but an incremental approach with reasonably chosen tuning parameters and environment setup is a good approach.

How to copy a graph from one cluster to another

In DSE versions 5.1.15+, 6.0.8+, and 6.7.4+ a user has the ability to specify which host a DseGraphFrame object should connect with. This allows a user to read graph contents from one cluster and write to another. Suppose we want to copy vertices and edges from a remote cluster to the local cluster, here is a small example showing how to accomplish this.


import com.datastax.spark.connector.cql.CassandraConnectorConf spark.setCassandraConf("cluster1", CassandraConnectorConf.ConnectionHostParam.option("")) spark.setCassandraConf("cluster2", CassandraConnectorConf.ConnectionHostParam.option("")) spark.conf.set("cluster", "cluster1") val source = spark.dseGraph("srcGraph") spark.conf.set("cluster", "cluster2") val dst = spark.dseGraph("dstGraph") dst.updateVertices(src.V) dst.updateEdges(src.E)