Developing High Performance Apache Cassandra® Applications in Rust (Part 1)
Rust has seen huge gains in popularity recently. It joins a C++ level of runtime performance, low resource requirements, and powerful control over details with strong correctness and safety features—a rare mixture not found in most other programming languages. In particular, Rust protects from a wide class of bugs that programmers could make when writing parallel or concurrent code. Concurrent code is hard to get right and can take ages to debug. And massively parallel code is the exact kind of code required to get the most power out of Apache Cassandra.
In this post, I'll explore the options available to develop a Cassandra app in Rust.
Driver Options
Several Rust drivers are currently available to help build Cassandra apps:
Rust bindings to the official DataStax C++ driver: cassandra-rs
- A family of community drivers developed in pure Rust based on CDRS:
- ScyllaDB driver developed in pure Rust, compatible with Apache Cassandra: scylla-rust-driver
Prerequisites
Cassandra achieves the best performance if it has enough work to do. One of the performance antipatterns is sending a query, waiting for the result, then sending another one, waiting again, and all that in a single thread. This isn’t the proper way to take advantage of Cassandra’s massively parallel architecture.
Async driver APIs avoid this antipattern by enabling many queries in parallel, even in a single thread of execution. So here, we’ll focus only on asynchronous code and all the examples assume they are executed in the async
context. This section will Briefly explain a few basic steps you need to make to enable async in Rust.
First you need to select an async runtime to use. There are two major, well-developed, popular choices: tokio and async-std. It is up to you which one you choose, but for the purpose of this blog post, we'll use tokio.
Enabling tokio is very easy. Add the following dependency to Cargo.toml
:
[dependencies]
tokio = { version = "1", features = ["full"] }
Next initialize the async context by providing the async main function:
#[tokio::main]
async fn main() {
// all async code goes here
}
Please refer to the tokio documentation(https://docs.rs/tokio/1.2.0/tokio/) for more details on tuning the schedulers, enabling/disabling features, etc.
DataStax C++ Driver Bindings
- Rust bindings website: https://github.com/Metaswitch/cassandra-rs
- C++ Driver website: https://github.com/datastax/cpp-driver
- License: Apache v2
This is the most feature-complete and the most battle-tested driver. The bindings cover most of the features available in the official C++ driver from DataStax:
- Connects to Apache Cassandra, DataStax Enterprise and ScyllaDB
- CQL protocol versions 3 and 4
- Asynchronous API
- Simple, Prepared, and Batch statements
- Result set paging
- Asynchronous I/O, parallel execution, and request pipelining
- Compression
- Password authentication
- SSL
- Connection pooling
- Automatic node discovery
- Automatic reconnection with tunable retry policies
- Idle connection heartbeats
- Configurable load balancing
- Latency-aware routing
- Performance metrics
- Support for Rust types
- Tuples and UDTs
- Collections
- Client-side timestamps
- Custom data types
- Support for materialized view and secondary index metadata
- Support for clustering key order, frozen<> and Cassandra version metadata
- Blacklist, whitelist DC, and blacklist DC load balancing policies
- Reverse DNS with SSL peer identity verification support
- Randomized contact points
- Speculative execution
Unfortunately, DSE-specific features like Kerberos, LDAP authentication, geospatial types, and DateRange are not exposed yet. Functions to connect to the DataStax Astra Cloud Data Platform are also not exposed.
Installation
First, you need to install the dependencies of the C++ driver. On Ubuntu 18.04 (Bionic) and newer:
sudo apt-get install libuv1 libuv1-dev libssl1.1 libssl-dev
It is possible to install these dependencies on other distributions, but in some cases you may need to download packages manually and install with dpkg
from here.
Next, install the C++ driver package:
wget https://downloads.datastax.com/cpp-driver/ubuntu/18.04/cassandra/v2.15.3/cassandra-cpp-driver_2.15.3-1_amd64.deb
wget https://downloads.datastax.com/cpp-driver/ubuntu/18.04/cassandra/v2.15.3/cassandra-cpp-driver-dev_2.15.3-1_amd64.deb
sudo dpkg -i cassandra-cpp-driver_2.15.3-1_amd64.deb cassandra-cpp-driver_2.15.3-1_amd64.deb
Note that you need the -dev
dependencies only for compiling the app. Users of your app will need to install only the packages without -dev
.
Project Setup
Just add cassandra-cpp
and optionally time
Cargo.toml:
[dependencies]
...
cassandra-cpp = "0.15.1"
time = "0.1.44"
Unfortunately there seems to be no way to use a common async runtime for both the underlying C++ driver and async Rust code. The C++ driver uses libuv internally, but Rust doesn't support using its thread-pool as the async runtime yet. It is also likely not possible to switch the driver to use tokio
or async-std
thread-pools. Therefore, your app is going to end up with two separate thread-pools—one for the driver and another one for running async tasks in your Rust code.
It is possible to use the driver without Rust async runtime at all, but in that case, you'd have to convert all async calls into blocking calls by calling wait
on the returned futures, which isn't very efficient. In order to achieve good performance this way, you'd have to create many hundreds or even thousands of threads.
Connecting
Configuring the connection is performed through Cluster
type:
use cassandra_cpp::*;
let mut cluster = Cluster::default();
// Specify nodes to contact:
cluster.set_contact_points("host1").unwrap();
cluster.set_contact_points("host2").unwrap();
// Optional authentication:
cluster.set_credentials("user", "password");
// Optional SSL config:
let host1_certificate: String = // load certificate from a file
let host2_certificate: String = // load certificate from a file
let mut ssl = Ssl::default();
ssl.add_trusted_cert(&host1_certificate).unwrap();
ssl.add_trusted_cert(&host2_certificate).unwrap();
cluster.set_ssl(&mut ssl);
// (Optional) How long to wait for connecting before bailing out:
cluster.set_connect_timeout(time::Duration::seconds(5));
// (Optional) How long to wait before attempting to reconnect after connection
// failure or disconnect:
cluster.set_reconnect_wait_time(100); // 100 ms
// (Optional) Set load balancing policy:
cluster.set_load_balance_round_robin();
Finally, we can connect and a Session
is returned:
let session = match cluster.connect() {
Ok(s) => s,
Err(e) => {
eprintln!("error: Failed to connect to Cassandra: {}", e);
exit(1)
}
}
The returned Session
object is Send+Sync
, so it can be accessed from multiple threads concurrently.
Querying
Let's assume there is table test
in keyspace keyspace1
with the following schema:
CREATE TABLE test(pk BIGINT PRIMARY KEY, data VARCHAR)
In order to be able to quickly query it multiple times, prepare the statement first:
let read_cql = "SELECT data FROM keyspace1.test WHERE pk = ?";
let prepared: PreparedStatement = session
.prepare(read_cql)
.unwrap()
.await
.expect("Failed to prepare");
It’s important to prepare each statement only once and store the prepared
object(s) somewhere for future use. Preparing a statement requires a roundtrip to the server and takes a significant amount of time. In production code you will probably put all the prepared statements together with the session object (or a reference to it) into a struct passed wherever database access is needed. Prepared statements are also Send+Sync
so they can be used from many threads without synchronization!
In order to execute the statement prepared earlier, call bind
to get the one-time Statement
and set parameters of it. E.g. to print the row with primary key 10:
let key: i64 = 10;
let mut statement: Statement = prepared.bind();
statement.set_consistency(Consistency::LOCAL_QUORUM);
statement.set_retry_policy(RetryPolicy::downgrading_consistency_new());
statement.bind(0, key).unwrap();
let result = session.execute(&statement).await.expect("Failed to execute");
match result.first_row().unwrap() {
Some(row) => println!("{}", row.get_column(0).unwrap().get_str().unwrap()),
None => println!("No row found")
}
In order to receive more than one row, use result.iter()
:
for row in result.iter() {
println!("{}", row.get_column(0).unwrap().get_string().unwrap());
}
CDRS
- CDRS website: https://github.com/AlexPikalov/cdrs
- CDRS-async website: https://github.com/AlexPikalov/cdrs-async
- CDRS-tokio website: https://github.com/krojew/cdrs-tokio
- Licenses: Apache v2, MIT
CDRS is a family of Cassandra drivers developed from scratch in Rust.T he original (oldest) version came with a synchronous, blocking API only. Later asynchronous versions using async-std
and tokio
runtimes were developed as separate projects.
An obvious advantage of a pure-Rust driver is a much simpler installation, both for developers and for application users. No dependency on third-party C++ libraries means the only thing you need is Cargo.
Unfortunately at the moment of writing this blog post, CDRS is looking for maintainers and the last update CDRS-async labels itself as an "alpha version." The following sections will focus on CDRS-tokio, because it offers a performant async API and seems to be well-maintained.
CDRS features include:
- Connects to Apache Cassandra, DataStax Enterprise and ScyllaDB
- CQL protocol versions 3 and 4
- Simple, prepared and batch statements
- Result set paging
- Password authentication
- Pluggable authentication strategies
- SSL
- Load balancing
- Connection pooling
- Compression: LZ4, Snappy
- Cassandra-to-Rust data deserialization
- Server events listening
- Query tracing information
Project Setup
Add the following dependencies to Cargo.toml
:
[dependencies]
cdrs-tokio = "2.1.0"
chrono = "0.4" # to allow representing Cassandra Timestamp by DateTime
tokio = { version = "1", features = ["full"] }
Connecting
In CDRS you first setup connection to each node of the cluster separately, then all of these node configurations together make a cluster configuration allowing you to connect and obtain a Session
:
use cdrs_tokio::authenticators::{StaticPasswordAuthenticator, Authenticator};
use cdrs_tokio::cluster::{NodeTcpConfigBuilder, ClusterTcpConfig, session, TcpConnectionPool};
use cdrs_tokio::load_balancing::RoundRobin;
use cdrs_tokio::cluster::session::Session;
use tokio::time::Duration;
let authenticator = StaticPasswordAuthenticator::new("user", "password");
let node_address = "127.0.0.1:9042";
let node = NodeTcpConfigBuilder::new(node_address, authenticator)
.max_size(5)
.min_idle(Some(4))
.max_lifetime(Some(Duration::from_secs(60)))
.idle_timeout(Some(Duration::from_secs(60)))
.build();
let cluster_config = ClusterTcpConfig(vec![node]);
let session = session::new(&cluster_config, RoundRobin::new())
.await
.expect("connect");
One problem that's not immediately visible in this code is the type of created session. Let's add an explicit type spec:
let session: Session<roundrobin<tcpconnectionpool<staticpasswordauthenticator>>> =
session::new(&cluster_config, RoundRobin::new())
.await
.expect("connect");</roundrobin<tcpconnectionpool<staticpasswordauthenticator>
Some of the configuration like authenticator or load balancing is encoded in the compile-time type of the session. What if we don't want to hard-code an authenticator, but configure it at runtime instead? Authenticator
is a trait, so Rust trait-objects could be used.
let authenticator: Box<dyn authenticator=""> =
Box::new(StaticPasswordAuthenticator::new("user", "password"));</dyn>
Unfortunately Authenticator
trait is not object-safe, hence the line above does not work:
error[E0038]: the trait `cdrs_tokio::authenticators::Authenticator` cannot be made into an object
--> src/cdrs_session.rs:13:24
|
13 | let authenticator: Box<dyn authenticator=""> =
| ^^^^^^^^^^^^^^^^^^^^^^ `cdrs_tokio::authenticators::Authenticator` cannot be made into an object
|
= note: the trait cannot be made into an object because it requires `Self: Sized`
= note: for a trait to be "object safe" it needs to allow building a vtable to allow the call to be resolvable dynamically; for more information visit https://doc.rust-lang.org/reference/items/traits.html#object-safety
This means that until the issue is resolved, CDRS is not a good fit in applications that need to configure the database connection in runtime (e.g. based on config files).
Querying
Same as with DataStax driver, in order to achieve best performance, it is recommended to use prepared statements:
use cdrs_tokio::query::{PreparedQuery, PrepareExecutor};
let statement: PreparedQuery =
session
.prepare("SELECT data FROM keyspace1.test WHERE pk = ?")
.await
.expect("prepare query");
To issue a query, just run exec_with_values
method and pass a statement reference and parameters to it. Again, it is very important to supply correct integer types matching exactly the schema of your table. Cassandra data types are signed.
use cdrs_tokio::query_values;
use cdrs_tokio::types::prelude::Row;
let key = query_values!(10i64);
let result: Vec<row> =
session.exec_with_values(&statement, key)
.await
.expect("execute query")
.get_body()
.unwrap()
.into_rows()
.unwrap();
for row in result {
let value: String = row.get_by_index(0).unwrap().unwrap();
println!("{}", value);
}</row>
The result is just a vector of rows. You can access the column values by calling get_by_index()
method, which is capable of automatically converting the column value to common Rust types.
Object Mapper
A nice feature of CDRS is the ability to automatically derive code for deserializing raw Row
objects into structures. This is supported by an additional crate:
[dependencies]
...
cdrs-tokio-helpers-derive = "1.1.1"
For example, let's assume the following table for storing users of our application:
CREATE TABLE users (
id BIGINT PRIMARY KEY,
login VARCHAR,
name VARCHAR,
emails LIST<varchar>,
created TIMESTAMP
);</varchar>
In the application, you can map that to a Rust struct:
// These imports are very important, even if they
// look as unused. CDRS derive macros use them and the program
// would not compile if these are omitted.
use cdrs_tokio::types::from_cdrs::FromCDRSByName;
use cdrs_tokio::frame::traits::TryFromRow;
use cdrs_tokio_helpers_derive::*;
#[derive(Clone, Debug, TryFromRow, PartialEq)]
struct User {
id: i64,
login: String,
name: String,
emails: Vec<string>,
created: DateTime<utc>,
}</utc></string>
Now, instead of unpacking fields from the received rows manually, we can just call the auto-generated try_from_row
method to convert the row into an object:
for row in result {
let user: User = User::try_from_row(row).unwrap();
println!("{:?}", user);
}
Note that the conversion might fail if the database schema doesn't match the struct, so in the production code you need to handle the error value properly instead of just unwrap
-ping the result blindly.
ScyllaDB Driver
- Website: https://github.com/scylladb/scylla-rust-driver
- License: Apache v2, MIT
This is a driver written by the ScyllaDB team from scratch in Rust. It is compatible with recent versions of Apache Cassandra and DataStax Enterprise. At the moment of writing this blog post, the driver is labeled as "in early development and is not for production nor officially supported."
Features include:
- Asynchronous API based on Tokio
- CQL protocol version 4
- Simple, prepared and batch statements
- Query paging
- Token-aware routing
- Shard-aware routing (specific to ScyllaDB)
- Compression (LZ4 and Snappy algorithms)
- Configurable load balancing policies
- Driver-side metrics
Some important features like authentication and SSL are under development.
Project Setup
Similar to CDRS-tokio, this driver requires only a dependency entry in Cargo.toml
, however the project hasn't been published to crates.io yet, so a github link must be given:
[dependencies]
scylla = { git = "https://github.com/scylladb/scylla-rust-driver", branch = "main"}
scylla-macros = { git = "https://github.com/scylladb/scylla-rust-driver", branch = "main"}
Connecting
The connection ceremony in ScyllaDB driver is kept to the absolute minimum. There is just one builder object – SessionBuilder
responsible for configuring and obtaining the Session
:
use scylla::{SessionBuilder, Session};
let node = "127.0.0.1:9042";
let session: Session = SessionBuilder::new()
.known_node(node)
.build()
.await
.expect("connect");
There isn't much configuration available yet, however one of the things you can already set is load balancing policy. And this time, contrary to CDRS, it is properly dynamic so you can select it at runtime!
use scylla::transport::load_balancing::{
RoundRobinPolicy, LoadBalancingPolicy, DCAwareRoundRobinPolicy};
// In production code, you could load it from a config file:
let local_dc = Some(String::from("DC1"));
let lb_policy: Box<dyn loadbalancingpolicy="">> =
match local_dc {
Some(dc) => Box::new(DCAwareRoundRobinPolicy::new(dc)),
None => Box::new(RoundRobinPolicy::new())
};
let session: Session = SessionBuilder::new()
.known_node(node)
.load_balancing(lb_policy)
.build()
.await
.expect("connect");
</dyn>
Querying
Querying isn't much different from the other drivers. Prepare the statement, execute it with parameters, and get the result as a vector of Row
s. Query parameters can be given as a tuple or vector, but tuples are preferred because they accept values differing in types, while vectors do not. Each returned Row
contains a vector of CQLValue
s which can be easily converted to a desired Rust type by calling one of the provided as_
or into_
methods. Unfortunately the number of supported types is limited only to integers, strings, internet addresses, lists and sets. Maps, dates, timestamps, uuids, etc. are on the TODO list.
let statement= session
.prepare("SELECT data FROM keyspace1.test WHERE pk = ?")
.await
.expect("prepare query");
let key: i64 = 10;
let result = session
// pass parameters to the statement as a tuple
// the comma after key looks weird, but is is actually essential to make a tuple
// instead of a single int value
.execute(&statement, (key,))
.await
.expect("execute query");
if let Some(rows) = result {
for row in rows {
if !row.columns.is_empty() {
let data = row.columns[0].as_ref().unwrap().as_text().unwrap();
println!("{}", data);
}
}
}
Object Mapper
ScyllaDB driver offers derive macros for automatic conversion of a Row
into a user-defined Rust struct. Annotate your struct with #[derive(FromRow)]
and call into_typed
on returned rows:
use scylla::cql_to_rust::FromRow;
use scylla::macros::FromRow;
#[derive(Debug, FromRow)]
struct User {
id: i64,
login: String,
name: String,
emails: Vec<string>,
}
let statement = session
.prepare("SELECT id, login, name, emails FROM keyspace1.users WHERE id = ?")
.await
.expect("prepare query");
let key: i64 = 10;
let result = session
.execute(&statement, (key,))
.await
.expect("execute query");
if let Some(rows) = result {
for row in rows {
let user: User = row.into_typed().expect("deserialize User");
println!("{:?}", user);
}
}</string>
If you don't want to define a struct in some cases, into_typed
can also deserialize to tuples, which is handy for receiving simple results:
if let Some(rows) = result {
for row in rows {
let user: (i64, String, String, Vec<string>) =
row.into_typed().expect("deserialize User");
println!("{:?}", user);
}
}</string>
Summary
We presented three different drivers. At the moment DataStax C++ Driver with Rust bindings is the most stable, mature, and feature-rich option, even though due to its C++ origin, it is a bit more complex to install than the other two options. For now, this is our only recommendation for production-grade systems.
CDRS is the most feature-rich among the pure-Rust drivers, is easy to install, and comes in three variants: blocking, async-std, tokio. Unfortunately it has the most verbose API and has a few rough edges I've run into while writing this post. We hope the situation gets better as Rust gains popularity.
ScyllaDB driver is the new hotness, and although at present it is a bit limited in features, it shows a great potential. I like the general feel of the API and its simplicity.
Stay tuned for Part 2, where I will cover parallelism and performance!