email iconemail phone iconcall

Writing Scala Codecs for the Java Driver

By Alexandre Dutra -  August 8, 2017 | 0 Comments

One of the common griefs Scala developers express when using the DataStax Java driver is the overhead incurred in almost every read or write operation, if the data to be stored or retrieved needs conversion from Java to Scala or vice versa.

This could be avoided by using "native" Scala codecs. This has been occasionally solicited from the Java driver team, but such codecs unfortunately do not exist, at least not officially.

Thankfully, the TypeCodec API in the Java driver can be easily extended. For example, several convenience Java codecs are available in the driver's extras package.

In this post, we are going to piggyback on the existing extra codecs and show how developers can create their own codecs – directly in Scala.

Note: all the examples in this post are available in this Github repository.

Dealing with Nullability

It can be tricky to deal with CQL types in Scala because CQL types are all nullable, whereas most typical representations of CQL scalar types in Scala resort to value classes, and these are non-nullable.

As an example, let's see how the Java driver deserializes, say, CQL ints.

The default codec for CQL ints converts such values to java.lang.Integer instances. From a Scala perspective, this has two disadvantages: first, one needs to convert from java.lang.Integer to Int, and second, Integer instances are nullable, while Scala Ints aren't.

Granted, the DataStax Java driver's Row interface has a pair of methods named getInt that deserialize CQL ints into Java ints, converting null values into zeroes.

But for the sake of this demonstration, let's assume that these methods did not exist, and all CQL ints were being converted into java.lang.Integer. Therefore, developers would yearn to have a codec that could deserialize CQL ints into Scala Ints while at the same time addressing the nullability issue.

Let this be the perfect excuse for us to introduce IntCodec, our first Scala codec:

import java.nio.ByteBuffer
import com.datastax.driver.core.exceptions.InvalidTypeException
import com.datastax.driver.core.{DataType, ProtocolVersion, TypeCodec}
import com.google.common.reflect.TypeToken

object IntCodec extends TypeCodec[Int](DataType.cint(), TypeToken.of(classOf[Int]).wrap()) {

  override def serialize(value: Int, protocolVersion: ProtocolVersion): ByteBuffer = 
    ByteBuffer.allocate(4).putInt(0, value)

  override def deserialize(bytes: ByteBuffer, protocolVersion: ProtocolVersion): Int = {
    if (bytes == null || bytes.remaining == 0) return 0
    if (bytes.remaining != 4) throw new InvalidTypeException("Invalid 32-bits integer value, expecting 4 bytes but got " + bytes.remaining)
    bytes.getInt(bytes.position)
  }

  override def format(value: Int): String = value.toString

  override def parse(value: String): Int = {
    try {
      if (value == null || value.isEmpty || value.equalsIgnoreCase("NULL")) 0
      else value.toInt
    }
    catch {
      case e: NumberFormatException =>
        throw new InvalidTypeException( s"""Cannot parse 32-bits integer value from "$value"""", e)
    }
  }

}

All we did so far is extend TypeCodec[Int] by filling in the superclass constructor arguments (more about that later) and implementing the required methods in a very similar way compared to the driver's built-in codec.

Granted, this isn't rocket science, but it will get more interesting later. The good news is, this template is reproducible enough to make it easy for readers to figure out how to create similar codecs for every AnyVal that is mappable to a CQL type (Boolean, Long, Float, Double, etc... let your imagination run wild or just go for the ready-made solution).

(Tip: because of the automatic boxing/unboxing that occurs under the hood, don't use this codec to deserialize simple CQL ints, and prefer instead the driver's built-in one, which will avoid this overhead; but you can use IntCodec to compose more complex codecs, as we will see below – the more complex the CQL type, the more negligible the overhead becomes.)

Let's see how this piece of code solves our initial problems: as for the burden of converting between Scala and Java, Int values are now written directly with ByteBuffer.putInt, and read directly from ByteBuffer.getInt; as for the nullability of CQL ints, the issue is addressed just as the driver does: nulls are converted to zeroes.

Converting nulls into zeroes might not be satisfying for everyone, but how to improve the situation? The general Scala solution for dealing with nullable integers is to map them to Option[Int]. DataStax Spark Connector for Apache Cassandra®'s CassandraRow class has exactly one such method:

def getIntOption(index: Int): Option[Int] = ...

Under the hood, it reads a java.lang.Integer from the Java driver's Row class, and converts the value to either None if it's null, or to Some(value), if it isn't.

Let's try to achieve the same behavior, but using the composite pattern: we first need a codec that converts from any CQL value into a Scala Option. There is no such built-in codec in the Java driver, but now that we are codec experts, let's roll our own OptionCodec:

class OptionCodec[T](
    cqlType: DataType,
    javaType: TypeToken[Option[T]],
    innerCodec: TypeCodec[T])
  extends TypeCodec[Option[T]](cqlType, javaType)
    with VersionAgnostic[Option[T]] {

  def this(innerCodec: TypeCodec[T]) {
    this(innerCodec.getCqlType, TypeTokens.optionOf(innerCodec.getJavaType), innerCodec)
  }

  override def serialize(value: Option[T], protocolVersion: ProtocolVersion): ByteBuffer =
    if (value.isEmpty) OptionCodec.empty.duplicate else innerCodec.serialize(value.get, protocolVersion)

  override def deserialize(bytes: ByteBuffer, protocolVersion: ProtocolVersion): Option[T] =
    if (bytes == null || bytes.remaining() == 0) None else Option(innerCodec.deserialize(bytes, protocolVersion))

  override def format(value: Option[T]): String =
    if (value.isEmpty) "NULL" else innerCodec.format(value.get)

  override def parse(value: String): Option[T] =
    if (value == null || value.isEmpty || value.equalsIgnoreCase("NULL")) None else Option(innerCodec.parse(value))

}

object OptionCodec {

  private val empty = ByteBuffer.allocate(0)

  def apply[T](innerCodec: TypeCodec[T]): OptionCodec[T] =
    new OptionCodec[T](innerCodec)

  import scala.reflect.runtime.universe._

  def apply[T](implicit innerTag: TypeTag[T]): OptionCodec[T] = {
    val innerCodec = TypeConversions.toCodec(innerTag.tpe).asInstanceOf[TypeCodec[T]]
    apply(innerCodec)
  }

}

And voilà! As you can see, the class body is very simple (its companion object is not very exciting at this point either, but we will see later how it could do more than just mirror the class constructor). Its main purpose when deserializing/parsing is to detect CQL nulls and return None right away, without even having to interrogate the inner codec, and when serializing/formatting, intercept None so that it can be immediately converted back to an empty ByteBuffer (the native protocol's representation of null).

We can now combine our two codecs together, IntCodec and OptionCodec, and compose a TypeCodec[Option[Int]]:

import com.datastax.driver.core._
val codec: TypeCodec[Option[Int]] = OptionCodec(IntCodec)
assert(codec.deserialize(ByteBuffer.allocate(0), ProtocolVersion.V4).isEmpty)
assert(codec.deserialize(ByteBuffer.allocate(4), ProtocolVersion.V4).isDefined)

The problem with TypeTokens

Let's sum up what we've got so far: a TypeCodec[Option[Int]] that is the perfect match for CQL ints. But how to use it?

There is nothing really particular with this codec and it is perfectly compatible with the Java driver. You can use it explicitly, which is probably the simplest way:

import com.datastax.driver.core._
val codec: TypeCodec[Option[Int]] = OptionCodec(IntCodec)
val row: Row = ??? // some CQL query containing an int column
val v: Option[Int] = row.get(0, codec)

But your application is certainly more complex than that, and you would like to register your codec beforehand so that it gets transparently used afterwards:

import com.datastax.driver.core._
// first
val codec: TypeCodec[Option[Int]] = OptionCodec(IntCodec)
cluster.getConfiguration.getCodecRegistry.register(codec)

// then
val row: Row = ??? // some CQL query containing an int column
val v: Option[Int] = row.get(0, ???) // How to get a TypeToken[Option[Int]]?

Well, before we can actually do that, we first need to solve one problem: the Row.get method comes in a few overloaded flavors, and the most flavory ones accept a TypeToken argument; let's learn how to use them in Scala.

The Java Driver API, for historical reasons — but also, let's be honest, due to the lack of alternatives – makes extensive usage of Guava's TypeToken API (if you are not familiar with the type token pattern you might want to stop and read about it first).

Scala has its own interpretation of the same reflective pattern, named type tags. Both APIs pursue identical goals – to convey compile-time type information to the runtime – through very different roads. Unfortunately, it's all but an easy path to travel from one to the other, simply because there is no easy bridge between java.lang.Type and Scala's Type.

Hopefully, all is not lost. As a matter of fact, creating a full-fledged conversion service between both APIs is not a pre-requisite: it turns out that Guava's TypeToken works pretty well in Scala, and most classes get resolved just fine. TypeTokens in Scala are just a bit cumbersome to use, and quite error-prone when instantiated, but that's something that a helper object can facilitate.

We are not going to dive any deeper in the troubled waters of Scala reflection (well, at least not until the last chapter of this tutorial). It suffices to assume that the helper object we mentioned above really exists, and that it does the job of creating TypeToken instances while at the same time sparing the developer the boiler-plate code that this operation usually incurs.

Now we can resume our example and complete our code that reads a CQL int into a Scala Option[Int], in the most transparent way:

import com.datastax.driver.core._
val tt = TypeTokens.optionOf(TypeTokens.int) // creates a TypeToken[Option[Int]]
val row: Row = ??? // some CQL query containing an int column
val v: Option[Int] = row.get(0, tt) 

Dealing with Collections

Another common friction point between Scala and the Java driver is the handling of CQL collections.

Of course, the driver has built-in support for CQL collections; but obviously, these map to typical Java collection types: CQL list maps to java.util.List (implemented by java.util.ArrayList), CQL set to java.util.Set (implemented by java.util.LinkedHashSet) and CQL map to java.util.Map (implemented by java.util.HashMap).

This leaves Scala developers with two inglorious options:

  1. Use the implicit JavaConverters object and deal with – gasp! – mutable collections in their code;
  2. Deal with custom Java-to-Scala conversion in their code, and face the consequences of conversion overhead (this is the choice made by the already-mentioned Spark Connector for Apache Cassandra®, because it has a very rich set of converters available).

All of this could be avoided if CQL collection types were directly deserialized into Scala immutable collections.

Meet SeqCodec, our third Scala codec in this tutorial:

import java.nio.ByteBuffer
import com.datastax.driver.core.CodecUtils.{readSize, readValue}
import com.datastax.driver.core._
import com.datastax.driver.core.exceptions.InvalidTypeException

class SeqCodec[E](eltCodec: TypeCodec[E])
  extends TypeCodec[Seq[E]](
    DataType.list(eltCodec.getCqlType),
    TypeTokens.seqOf(eltCodec.getJavaType))
    with ImplicitVersion[Seq[E]] {

  override def serialize(value: Seq[E], protocolVersion: ProtocolVersion): ByteBuffer = {
    if (value == null) return null
    val bbs: Seq[ByteBuffer] = for (elt <- value) yield {
      if (elt == null) throw new NullPointerException("List elements cannot be null")
      eltCodec.serialize(elt, protocolVersion)
    }
    CodecUtils.pack(bbs.toArray, value.size, protocolVersion)
  }

  override def deserialize(bytes: ByteBuffer, protocolVersion: ProtocolVersion): Seq[E] = {
    if (bytes == null || bytes.remaining == 0) return Seq.empty[E]
    val input: ByteBuffer = bytes.duplicate
    val size: Int = readSize(input, protocolVersion)
    for (_ <- 1 to size) yield eltCodec.deserialize(readValue(input, protocolVersion), protocolVersion)
  }

  override def format(value: Seq[E]): String = {
    if (value == null) "NULL" else '[' + value.map(e => eltCodec.format(e)).mkString(",") + ']'
  }

  override def parse(value: String): Seq[E] = {
    if (value == null || value.isEmpty || value.equalsIgnoreCase("NULL")) return Seq.empty[E]
    var idx: Int = ParseUtils.skipSpaces(value, 0)
    if (value.charAt(idx) != '[') throw new InvalidTypeException( s"""Cannot parse list value from "$value", at character $idx expecting '[' but got '${value.charAt(idx)}'""")
    idx = ParseUtils.skipSpaces(value, idx + 1)
    val seq = Seq.newBuilder[E]
    if (value.charAt(idx) == ']') return seq.result
    while (idx < value.length) {
      val n = ParseUtils.skipCQLValue(value, idx)
      seq += eltCodec.parse(value.substring(idx, n))
      idx = n
      idx = ParseUtils.skipSpaces(value, idx)
      if (value.charAt(idx) == ']') return seq.result
      if (value.charAt(idx) != ',') throw new InvalidTypeException( s"""Cannot parse list value from "$value", at character $idx expecting ',' but got '${value.charAt(idx)}'""")
      idx = ParseUtils.skipSpaces(value, idx + 1)
    }
    throw new InvalidTypeException( s"""Malformed list value "$value", missing closing ']'""")
  }

  override def accepts(value: AnyRef): Boolean = value match {
    case seq: Seq[_] => if (seq.isEmpty) true else eltCodec.accepts(seq.head)
    case _ => false
  }

}

object SeqCodec {

  def apply[E](eltCodec: TypeCodec[E]): SeqCodec[E] = new SeqCodec[E](eltCodec)

}

(Of course, we are talking here about scala.collection.immutable.Seq.)

The code above is still vaguely ressemblant to the equivalent Java code, and not very interesting per se; the parse method in particular is not exactly a feast for the eyes, but there's little we can do about it.

In spite of its modest body, this codec allows us to compose a more interesting TypeCodec[Seq[Option[Int]]] that can convert a CQL list<int> directly into a scala.collection.immutable.Seq[Option[Int]]:

import com.datastax.driver.core._
type Seq[+A] = scala.collection.immutable.Seq[A]
val codec: TypeCodec[Seq[Int]] = SeqCodec(OptionCodec(IntCodec))
val l = List(Some(1), None)
assert(codec.deserialize(codec.serialize(l, ProtocolVersion.V4), ProtocolVersion.V4) == l)

Some remarks about this codec:

  1. This codec is just for the immutable Seq type. It could be generalized into an AbstractSeqCodec in order to accept other mutable or immutable sequences. If you want to know how it would look, the answer is here.
  2. Ideally, TypeCodec[T] should have been made covariant in T, the type handled by the codec (i.e. TypeCodec[+T]); unfortunately, this is not possible in Java, so TypeCodec[T] is in practice invariant in T. This is a bit frustrating for Scala implementors, as they need to choose the best upper bound for T, and stick to it for both input and output operations, just like we did above.
  3. Similar codecs can be created to map CQL sets to Sets and CQL maps to Maps; again, we leave this as an exercise to the user (and again, it is possible to cheat).

Dealing with Tuples

Scala tuples are an appealing target for CQL tuples.

The Java driver does have a built-in codec for CQL tuples; but it translates them into TupleValue instances, which are unfortunately of little help for creating Scala tuples.

Luckily enough, TupleCodec inherits from AbstractTupleCodec, a class that has been designed exactly with that purpose in mind: to be extended by developers wanting to map CQL tuples to more meaningful types than TupleValue.

As a matter of fact, it is extremely simple to craft a codec for Tuple2 by extending AbstractTupleCodec:

class Tuple2Codec[T1, T2](
    cqlType: TupleType, javaType: TypeToken[(T1, T2)],
    eltCodecs: (TypeCodec[T1], TypeCodec[T2]))
  extends AbstractTupleCodec[(T1, T2)](cqlType, javaType)
    with ImplicitVersion[(T1, T2)] {

  def this(eltCodec1: TypeCodec[T1], eltCodec2: TypeCodec[T2])(implicit protocolVersion: ProtocolVersion, codecRegistry: CodecRegistry) {
    this(
      TupleType.of(protocolVersion, codecRegistry, eltCodec1.getCqlType, eltCodec2.getCqlType),
      TypeTokens.tuple2Of(eltCodec1.getJavaType, eltCodec2.getJavaType),
      (eltCodec1, eltCodec2)
    )
  }

  {
    val componentTypes = cqlType.getComponentTypes
    require(componentTypes.size() == 2, s"Expecting TupleType with 2 components, got ${componentTypes.size()}")
    require(eltCodecs._1.accepts(componentTypes.get(0)), s"Codec for component 1 does not accept component type: ${componentTypes.get(0)}")
    require(eltCodecs._2.accepts(componentTypes.get(1)), s"Codec for component 2 does not accept component type: ${componentTypes.get(1)}")
  }

  override protected def newInstance(): (T1, T2) = null

  override protected def serializeField(source: (T1, T2), index: Int, protocolVersion: ProtocolVersion): ByteBuffer = index match {
    case 0 => eltCodecs._1.serialize(source._1, protocolVersion)
    case 1 => eltCodecs._2.serialize(source._2, protocolVersion)
  }

  override protected def deserializeAndSetField(input: ByteBuffer, target: (T1, T2), index: Int, protocolVersion: ProtocolVersion): (T1, T2) = index match {
    case 0 => Tuple2(eltCodecs._1.deserialize(input, protocolVersion), null.asInstanceOf[T2])
    case 1 => target.copy(_2 = eltCodecs._2.deserialize(input, protocolVersion))
  }

  override protected def formatField(source: (T1, T2), index: Int): String = index match {
    case 0 => eltCodecs._1.format(source._1)
    case 1 => eltCodecs._2.format(source._2)
  }

  override protected def parseAndSetField(input: String, target: (T1, T2), index: Int): (T1, T2) = index match {
    case 0 => Tuple2(eltCodecs._1.parse(input), null.asInstanceOf[T2])
    case 1 => target.copy(_2 = eltCodecs._2.parse(input))
  }

}

object Tuple2Codec {

  def apply[T1, T2](eltCodec1: TypeCodec[T1], eltCodec2: TypeCodec[T2]): Tuple2Codec[T1, T2] =
    new Tuple2Codec[T1, T2](eltCodec1, eltCodec2)

}

A very similar codec for Tuple3 can be found here. Extending this principle to Tuple4, Tuple5, etc. is straightforward and left for the reader as an exercise.

Going incognito with implicits

The careful reader noticed that Tuple2Codec's constructor takes two implicit arguments: CodecRegistry and ProtocolVersion. They are omnipresent in the TypeCodec API and hence, good candidates for implicit arguments – and besides, both have nice default values. To make the code above compile, simply put in your scope something along the lines of:

object Implicits {

  implicit val protocolVersion = ProtocolVersion.NEWEST_SUPPORTED
  implicit val codecRegistry = CodecRegistry.DEFAULT_INSTANCE

}

Speaking of implicits, let's now see how we can simplify our codecs by adding a pinch of those. Let's take a look at our first trait in this tutorial:

trait VersionAgnostic[T] {  this: TypeCodec[T] =>

  def serialize(value: T)(implicit protocolVersion: ProtocolVersion, marker: ClassTag[T]): ByteBuffer = 
    this.serialize(value, protocolVersion)

  def deserialize(bytes: ByteBuffer)(implicit protocolVersion: ProtocolVersion, marker: ClassTag[T]): T = 
    this.deserialize(bytes, protocolVersion)

}

This trait basically creates two overloaded methods, serialize and deserialize, which will infer the appropriate protocol version to use and forward the call to the relevant method (the marker argument is just the usual trick to work around erasure).

We can now mix-in this trait with an existing codec, and then avoid passing the protocol version to every call to serialize or deserialize:

import Implicits._
val codec = new SeqCodec(IntCodec) with VersionAgnostic[Seq[Int]]
codec.serialize(List(1,2,3))

We can now go even further and simplify the way codecs are composed together to create complex codecs. What if, instead of writing SeqCodec(OptionCodec(IntCodec)), we could simply write SeqCodec[Option[Int]]? To achieve that, let's enhance the companion object of SeqCodec with a more sophisticated apply method:

object SeqCodec {

  def apply[E](eltCodec: TypeCodec[E]): SeqCodec[E] = new SeqCodec[E](eltCodec)

  import scala.reflect.runtime.universe._

  def apply[E](implicit eltTag: TypeTag[E]): SeqCodec[E] = {
    val eltCodec = ??? // implicit TypeTag -> TypeCodec conversion
    apply(eltCodec)
  }

}

The second apply method guesses the element type by using implicit TypeTag instances (these are created by the Scala compiler, so you don't need to worry about instantiating them), then locates the appropriate codec for it. We can now write:

val codec = SeqCodec[Option[Int]]

Elegant, huh? Of course, we need some magic to locate the right codec given a TypeTag instance. Here we need to introduce another helper object, TypeConversions. Its method toCodec takes a Scala type and, with the help of some pattern matching, locates the most appropriate codec. We refer the interested reader to TypeConversions code for more details.

With the help of TypeConversions, we can now complete our new apply method:

def apply[E](implicit eltTag: TypeTag[E]): SeqCodec[E] = {
  val eltCodec = TypeConversions.toCodec[E](eltTag.tpe)
  apply(eltCodec)
}

Note: similar apply methods can be added to other codec companion objects as well.

It's now time to go really wild, bearing in mind that the following features should only be used with caution by expert users.

If only we could convert Scala's TypeTag instances into Guava's TypeToken ones, and then make them implicit like we did above, we would be able to completely abstract away these annoying types and write very concise code, such as:

val statement: BoundStatement = ???
statement.set(0, List(1,2,3)) // implicit TypeTag -> TypeToken conversion

val row: Row = ???
val list: Seq[Int] = row.get(0) // implicit TypeTag -> TypeToken conversion

Well, this can be achieved in a few different ways; we are going to explore here the so-called Type Class pattern.

The first step is be to create implicit classes containing "get" and "set" methods that take TypeTag instances instead of TypeToken ones; we'll name them getImplicitly and setImplicitly to avoid name clashes. Let's do it for Row and BoundStatement:

implicit class RowOps(val self: Row) {

  def getImplicitly[T](i: Int)(implicit typeTag: TypeTag[T]): T = 
    self.get(i, ???) // implicit TypeTag -> TypeToken conversion

  def getImplicitly[T](name: String)(implicit typeTag: TypeTag[T]): T =
    self.get(name, ???) // implicit TypeTag -> TypeToken conversion

}

implicit class BoundStatementOps(val self: BoundStatement) {

  def setImplicitly[T](i: Int, value: T)(implicit typeTag: TypeTag[T]): BoundStatement =
    self.set(i, value, ???) // implicit TypeTag -> TypeToken conversion

  def setImplicitly[T](name: String, value: T)(implicit typeTag: TypeTag[T]): BoundStatement = 
    self.set(name, value, ???) // implicit TypeTag -> TypeToken conversion
  }

}

Remember what we stated at the beginning of this tutorial: "there is no easy bridge between Java types and Scala types"? Well, we will have to lay one now to cross that river.

Our helper object TypeConversions has another method, toJavaType, that does just that. Again, digging into its details is out of the scope of this tutorial, but with this method we can complete our implicit classes as below:

def getImplicitly[T](i: Int)(implicit typeTag: TypeTag[T]): T = 
  val javaType: java.lang.reflect.Type = TypeConversions.toJavaType(typeTag.tpe)
  self.get(i, TypeToken.of(javaType).wrap().asInstanceOf[TypeToken[T]])

And we are done!

Now, by simply placing the above implicit classes into scope, we will be able to write code as concise as:

statement.setImplicitly(0, List(1,2,3)) // implicitly converted to statement.setImplicitly(0, List(1,2,3)) (TypeTag[Seq[Int]]), then
                                        // implicitly converted to statement.set          (0, List(1,2,3), TypeToken[Seq[Int]])

When retrieving values, it's a bit more complicated because the Scala compiler needs some help from the developer to be able to fill in the appropriate implicit TypeTag instance; we do so like this:

val list = row.getImplicitly[Seq[Int]](0) // implicitly converted to statement.getImplicitly(0) (TypeTag[Seq[Int]]), then
                                          // implicitly converted to statement.get          (0,  TypeToken[Seq[Int]])

That's it. We hope that with this tutorial, we could demonstrate how easy it is to create codecs for the Java driver that are first-class citizens in Scala. Enjoy!









DataStax has many ways for you to advance in your career and knowledge.

You can take free classes, get certified, or read one of our many white papers.



register for classes

get certified

DBA's Guide to NoSQL







Comments

Your email address will not be published. Required fields are marked *




Subscribe for newsletter: