CompanySeptember 29, 2014

Interactive Advanced Analytics with DSE and Spark MLlib

Artem Aliev
Artem Aliev
Interactive Advanced Analytics with DSE and Spark MLlib
wget http://www.heatonresearch.com/dload/data/iris.csv
tail -n +2 iris.csv |dse hadoop fs -put - iris.csv
dse spark
case class Iris(
    id:java.util.UUID,
    sepal_l:Double,
    sepal_w:Double,
    petal_l:Double,
    petal_w:Double,
    species:String
)
val data = sc.textFile("iris.csv")
val parsed = data.filter(!_.isEmpty).map {row =>
    val splitted = row.split(",")
    val Array(sl, sw, pl, pw) = splitted.slice(0,4).map(_.toDouble)
    Iris (java.util.UUID.randomUUID(), sl, sw, pl, pw, splitted(4))
}
parsed.take(2).foreach(println)
import com.datastax.spark.connector.cql.CassandraConnector
CassandraConnector(sc.getConf).withSessionDo { session =>
    session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
    session.execute ("""CREATE TABLE IF NOT EXISTS
        test.iris (
            id uuid primary key,
            sepal_l double,
            sepal_w double,
            petal_l double,
            petal_w double,
            species text
        )
    """)
}
parsed.saveToCassandra ("test", "iris")
val data = sc.cassandraTable[Iris]("test", "iris").cache()
val class2id = data.map(_.species).distinct.collect.zipWithIndex.map{case (k,v)=>(k, v.toDouble)}.toMap
val id2class = class2id.map(_.swap)
import org.apache.spark.mllib.regression.LabeledPoint
val parsedData = data.map { i => LabeledPoint(class2id(i.species), Array(i.petal_l,i.petal_w,i.sepal_l,i.sepal_w)) }
import org.apache.spark.mllib.classification.NaiveBayes
val model = NaiveBayes.train(parsedData)
model.predict(Array(5, 1.5, 6.4, 3.2))
id2class(model.predict(Array(5, 1.5, 6.4, 3.2)))
Share

One-stop Data API for Production GenAI

Astra DB gives JavaScript developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.