TechnologyJune 2, 2014

Powers of Ten – Part II

Stephen Mallette
Stephen Mallette
Powers of Ten – Part II
$ curl -L -O http://downloads.cms.gov/foia/physician-referrals-2012-2013-days365.zip
$ unzip physician-referrals-2012-2013-days365.zip
g = com.thinkaurelius.titan.core.TitanFactory.open("conf/titan-cassandra.properties")
g.makeKey("npi").dataType(String.class).single().unique().indexed(Vertex.class).make()
sharedTxCount = g.makeKey("sharedTxCount").dataType(Integer.class).make()
patientTotal = g.makeKey("patientTotal").dataType(Integer.class).make()
sameDayTotal = g.makeKey("sameDayTotal").dataType(Integer.class).make()
g.makeLabel("shares").signature(sharedTxCount, patientTotal, sameDayTotal).make()
g.commit()
ID_CHARACTERS = ['0'..'9','D'].flatten()
NUM_CHARACTERS = ID_CHARACTERS.size()
def long encodeId(String id) {
  id.inject(0L, { acc, c ->
    acc * NUM_CHARACTERS + ID_CHARACTERS.indexOf(c)
  })
}
def boolean read(FaunusVertex vertex, String line) {
    def (id1,
         id2,
         sharedTxCount,
         patientTotal,
         sameDayTotal) = line.split(',')*.trim()
    vertex.reuse(encodeId(id1))
    vertex.setProperty("npi", id1)
    def edge = vertex.addEdge(Direction.OUT, "shares", encodeId(id2))
    edge.setProperty("sharedTxCount", sharedTxCount as Integer)
    edge.setProperty("patientTotal", patientTotal as Integer)
    edge.setProperty("sameDayTotal", sameDayTotal as Integer)
    return true
}
# input graph parameters
faunus.graph.input.format=com.thinkaurelius.faunus.formats.script.ScriptInputFormat
faunus.input.location=docgraph/Physician-Referrals-2012-2013-DAYS365.txt
faunus.graph.input.script.file=docgraph/NPIScriptInput.groovy
faunus.graph.input.edge-copy.direction=OUT
# output data (graph or statistic) parameters
faunus.graph.output.format=com.thinkaurelius.faunus.formats.titan.cassandra.TitanCassandraOutputFormat
faunus.graph.output.titan.storage.backend=cassandra
faunus.graph.output.titan.storage.hostname=localhost
faunus.graph.output.titan.storage.port=9160
faunus.graph.output.titan.storage.keyspace=titan
faunus.graph.output.titan.storage.batch-loading=true
faunus.graph.output.titan.infer-schema=false
mapred.task.timeout=5400000
mapred.max.split.size=5242880
mapred.reduce.tasks=2
mapred.map.child.java.opts=-Xmx8G
mapred.reduce.child.java.opts=-Xmx8G
mapred.job.reuse.jvm.num.tasks=-1
faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
faunus.output.location=output
faunus.output.location.overwrite=true
gremlin> hdfs.mkdir("docgraph")
==>null
gremlin> hdfs.copyFromLocal('Physician-Referrals-2012-2013-DAYS365.txt','docgraph/Physician-Referrals-2012-2013-DAYS365.txt')
==>null
gremlin> hdfs.copyFromLocal("NPIScriptInput.groovy","docgraph/NPIScriptInput.groovy")
==>null
gremlin> g = FaunusFactory.open("faunus.properties")
==>faunusgraph[scriptinputformat->titancassandraoutputformat]
gremlin> g._()      
13:55:05 INFO mapreduce.FaunusCompiler: Generating job chain: g._()
13:55:05 WARN mapreduce.FaunusCompiler: Using the distribution Faunus job jar: lib/faunus-0.4.2-job.jar
13:55:05 INFO mapreduce.FaunusCompiler: Compiled to 3 MapReduce job(s)
13:55:05 INFO mapreduce.FaunusCompiler: Executing job 1 out of 3: MapSequence[com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Map, com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Reduce]
...
17:55:25 INFO input.FileInputFormat: Total input paths to process : 2
17:55:25 INFO mapred.JobClient: Running job: job_201405141319_0004
17:55:26 INFO mapred.JobClient:  map 0% reduce 0%
17:56:23 INFO mapred.JobClient:  map 1% reduce 0%
...
02:06:46 INFO mapred.JobClient:  map 100% reduce 0%
...
18:54:05 INFO mapred.JobClient:   com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce$Counters
18:54:05 INFO mapred.JobClient:     EDGE_PROPERTIES_WRITTEN=463706751
18:54:05 INFO mapred.JobClient:     EDGES_WRITTEN=154568917
18:54:05 INFO mapred.JobClient:     SUCCESSFUL_TRANSACTIONS=624
...
18:54:05 INFO mapred.JobClient:     SPLIT_RAW_BYTES=77376
import com.thinkaurelius.faunus.FaunusVertex
import static com.tinkerpop.blueprints.Direction.OUT
def boolean read(final FaunusVertex v, final String line) {
    def parts = line.split(':')
    v.reuse(Long.valueOf(parts[0]))
    if (parts.size() > 1) {
        parts[1].split(',').each({
            v.addEdge(OUT, 'friend', Long.valueOf(it))
        })
    }
    return true
}
gremlin> hdfs.copyFromLocal("/tmp/FriendsterInput.groovy","FriendsterInput.groovy")
==>null
gremlin> g = FaunusFactory.open("bin/friendster.properties")
==>faunusgraph[scriptinputformat->titancassandraoutputformat]
gremlin> g._()                                             
18:28:46 WARN mapreduce.FaunusCompiler: Using the distribution Faunus job jar: lib/faunus-0.4.4-job.jar
18:28:46 INFO mapreduce.FaunusCompiler: Compiled to 3 MapReduce job(s)
18:28:46 INFO mapreduce.FaunusCompiler: Executing job 1 out of 3: MapSequence[com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Map, com.thinkaurelius.faunus.formats.EdgeCopyMapReduce.Reduce]
...
18:28:47 INFO input.FileInputFormat: Total input paths to process : 125
18:28:47 INFO mapred.JobClient: Running job: job_201405111636_0005
18:28:48 INFO mapred.JobClient:  map 0% reduce 0%
18:29:39 INFO mapred.JobClient:  map 1% reduce 0%
...
02:06:46 INFO mapred.JobClient:  map 100% reduce 0%
...
02:06:57 INFO mapred.JobClient:   File Input Format Counters
02:06:57 INFO mapred.JobClient:     Bytes Read=79174658355
02:06:57 INFO mapred.JobClient:   com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce$Counters
02:06:57 INFO mapred.JobClient:     SUCCESSFUL_TRANSACTIONS=15094
02:06:57 INFO mapred.JobClient:     EDGES_WRITTEN=2586147869
02:06:57 INFO mapred.JobClient:   FileSystemCounters
02:06:57 INFO mapred.JobClient:     HDFS_BYTES_READ=79189272471
02:06:57 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1754590920
...
02:06:57 INFO mapred.JobClient:     Bytes Written=0
Discover more
Gremlin
Share

One-stop Data API for Production GenAI

Astra DB gives JavaScript developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.