CompanyNovember 1, 2019

DataStax Enterprise GraphFrames: Best Practices

Rocco Varela
Rocco Varela
DataStax Enterprise GraphFrames: Best Practices
schema.propertyKey("time").Timestamp().single().create() schema.propertyKey("date").Date().single().create() schema.propertyKey("nicknames").Text().multiple().create() schema.propertyKey("nicknames").properties("time", "date").add() schema.vertexLabel("person"). partitionKey("name", "ssn"). clusteringKey("age"). properties("address", "coffeePerDay", "nicknames"). create()
scala> g.V().show(false) +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+---------+ |id |~label |name |version|lang |temp |static_property|ssn |age |address |coffeePerDay|nicknames| +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+---------+ |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5 |software|timer |2 |groovy|100 |alpha |null |null|null |null |null | |software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ== |software|chat |1 |scala |mambo|beta |null |null|null |null |null | |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|person |isabella|null |null |null |null |111-11-1111|2 |2420 P Street |50 |null | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |person |rocco |null |null |null |null |222-22-2222|28 |1017 Mohar Street|25 |null | +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+---------+ scala> g.E().show(false) +-------------------------------------------------------+---------------------------------------------+-------+------------------------------------+------+ |src |dst |~label |id |weight| +-------------------------------------------------------+---------------------------------------------+-------+------------------------------------+------+ |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ==|created|296d28c0-e62c-11e9-ace5-5b43d7c0da8d|1.0 | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5|created|f4774f00-e62c-11e9-ace5-5b43d7c0da8d|1.0 | +-------------------------------------------------------+---------------------------------------------+-------+------------------------------------+------+
scala> val df = Seq(("person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw=", "docRoc", java.sql.Date.valueOf("2017-01-01"), new java.sql.Timestamp(100L))).toDF("id", "nicknames", "date", "time") df: org.apache.spark.sql.DataFrame = [id: string, nicknames: string ... 2 more fields] scala> df.show(false) +---------------------------------------------------+---------+----------+---------------------+ |id |nicknames|date |time | +---------------------------------------------------+---------+----------+---------------------+ |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw=|docRoc |2017-01-01|1970-01-01 00:00:00.1| +---------------------------------------------------+---------+----------+---------------------+
scala> val updateDF = df.select(col("id"), array(struct($"nicknames", $"date", $"time")) as "nicknames") updateDF: org.apache.spark.sql.DataFrame = [id: string, nicknames: array<struct<nicknames:string,date:date,time:timestamp>>]
scala> updateDF.printSchema root |-- id: string (nullable = true) |-- nicknames: array (nullable = false) | |-- element: struct (containsNull = false) | | |-- nicknames: string (nullable = true) | | |-- date: date (nullable = true) | | |-- time: timestamp (nullable = true) scala> updateDF.show(false) +---------------------------------------------------+-------------------------------------------+ |id |nicknames | +---------------------------------------------------+-------------------------------------------+ |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw=|[[docRoc,2017-01-01,1970-01-01 00:00:00.1]]| +---------------------------------------------------+-------------------------------------------+
scala> g.updateVertices(updateDF) scala> g.V().show(false) +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+-------------------------------------------+ |id |~label |name |version|lang |temp |static_property|ssn |age |address |coffeePerDay|nicknames | +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+-------------------------------------------+ |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5 |software|timer |2 |groovy|100 |alpha |null |null|null |null |null | |software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ== |software|chat |1 |scala |mambo|beta |null |null|null |null |null | |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|person |isabella|null |null |null |null |111-11-1111|2 |2420 P Street |50 |null | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |person |rocco |null |null |null |null |222-22-2222|28 |1017 Mohar Street|25 |[[docRoc,1970-01-01 00:00:00.1,2017-01-01]]| +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+-------------------------------------------+
scala> g.V().has("name", "rocco").property("nicknames", "docRoc", "date", java.sql.Date.valueOf("2017-01-01"), "time", new java.sql.Timestamp(100L)).iterate()
scala> g.V().show(false) +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+--------------------------------------------------------------------------------------+ |id |~label |name |version|lang |temp |static_property|ssn |age |address |coffeePerDay|nicknames | +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+--------------------------------------------------------------------------------------+ |software:AAAABXRpbWVyAAAABAAAAAIAAAAGZ3Jvb3Z5 |software|timer |2 |groovy|100 |alpha |null |null|null |null |null | |software:AAAABGNoYXQAAAAEAAAAAQAAAAVzY2FsYQ== |software|chat |1 |scala |mambo|beta |null |null|null |null |null | |person:AAAACGlzYWJlbGxhAAAACzExMS0xMS0xMTExAAAABAAAAAI=|person |isabella|null |null |null |null |111-11-1111|2 |2420 P Street |50 |null | |person:AAAABXJvY2NvAAAACzIyMi0yMi0yMjIyAAAABAAAABw= |person |rocco |null |null |null |null |222-22-2222|28 |1017 Mohar Street|25 |[[docRoc,1970-01-01 00:00:00.1,2017-01-01], [docRoc,1970-01-01 00:00:00.1,2017-01-01]]| +-------------------------------------------------------+--------+--------+-------+------+-----+---------------+-----------+----+-----------------+------------+--------------------------------------------------------------------------------------+
// truncated example for brevity schema.propertyKey("nicknames").Text().multiple().create() schema.propertyKey("reason").Text().single().create() schema.propertyKey("age").Int().single().create() schema.propertyKey("name").Text().single().create() schema.propertyKey("date").Date().single().create() schema.propertyKey("nicknames").properties("time", "date").add() schema.vertexLabel("location").properties("name").create() schema.vertexLabel("god").properties("name", "age", "nicknames").create() schema.vertexLabel("god").index("god_age_index").secondary().by("age").add() schema.vertexLabel("god").index("god_name_index").secondary().by("name").add() schema.edgeLabel("lives").multiple().properties("reason").create() schema.edgeLabel("lives").connection("god", "location").add() // add data Vertex neptune = graph.addVertex(T.label, "god", "name", "neptune", "age", 4500); Vertex sea = graph.addVertex(T.label, "location", "name", "sea"); neptune.addEdge("lives", sea).property("reason", "loves waves");
DseGraphFrame gf = DseGraphFrameBuilder.dseGraph(keyspace, spark); gf.E().df().show(false); +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |src |dst |~label |id |time |name |reason | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |god:RnS4AAAAAAAAAAAE |location:RnS4AAAAAAAAAAAJ|lives |f695a6b0-4500-11e9-8e88-fb68397e4bea|null |null |loves waves | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+
DseGraphFrame gf = DseGraphFrameBuilder.dseGraph(keyspace, spark); Dataset<Row> u = gf.gf().edges().filter("reason = 'loves waves'").drop("time").drop("reason").drop("name").withColumn("reason", functions.lit("New"));
u.show(false); +--------------------+-------------------------+------+------------------------------------+------+ |src |dst |~label|id |reason| +--------------------+-------------------------+------+------------------------------------+------+ |god:RnS4AAAAAAAAAAAE|location:RnS4AAAAAAAAAAAJ|lives |f695a6b0-4500-11e9-8e88-fb68397e4bea|New | +--------------------+-------------------------+------+------------------------------------+------+
gf.updateEdges(u); gf.E().df().show(false); +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |src |dst |~label |id |time |name |reason | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+ |god:RnS4AAAAAAAAAAAE |location:RnS4AAAAAAAAAAAJ|lives |f695a6b0-4500-11e9-8e88-fb68397e4bea|null |null |New | +------------------------+-------------------------+-------+------------------------------------+-----------------------+----------------------------------+-------------------+
gremlin> schema.vertexLabel("name"). ......1> partitionKey("internal_party_id"). ......2> clusteringKey( ......3> "prefx_nm", ......4> "first_nm", ......5> "mdl_nm", ......6> "last_nm", ......7> "sufx_nm", ......8> "name_line_desc" ......9> ).create()
scala> val hasNameEdges = nameVertices .drop(col("~label")) .withColumn("src", nameVertices.col("partyId")) .withColumn("dst", g.idColumn( lit("name"), nameVertices.col("internal_party_id"), nameVertices.col("prefx_nm"), nameVertices.col("first_nm"), nameVertices.col("mdl_nm"), nameVertices.col("last_nm"), nameVertices.col("sufx_nm"), nameVertices.col("name_line_desc") )) .withColumn("~label", lit("has_name")) .drop(col("internal_party_id")) .drop(col("partyId")) .drop(col("first_nm")) .drop(col("last_nm")) .drop(col("mdl_nm")) .drop(col("name_line_desc")) .drop(col("prefx_nm")) .drop(col("sufx_nm")) scala> g.updateEdges(hasNameEdges)
dse spark-submit \ --conf "spark.cassandra.output.concurrent.writes=100" \ --conf "spark.cassandra.connection.keepAliveMS=120000" \ --conf "spark.cassandra.output.throughputMBPerSec=50" \ --class com.datastax.DataImport target/data-import-1.0-SNAPSHOT.jar \ newapi
import com.datastax.spark.connector.cql.CassandraConnectorConf spark.setCassandraConf("cluster1", CassandraConnectorConf.ConnectionHostParam.option("10.0.0.1")) spark.setCassandraConf("cluster2", CassandraConnectorConf.ConnectionHostParam.option("10.0.0.2")) spark.conf.set("cluster", "cluster1") val source = spark.dseGraph("srcGraph") spark.conf.set("cluster", "cluster2") val dst = spark.dseGraph("dstGraph") dst.updateVertices(src.V) dst.updateEdges(src.E)
Discover more
DSE Graph
Share

One-stop Data API for Production GenAI

Astra DB gives JavaScript developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.