Asynchronous queries with the Java driver

By Olivier Michallat -  October 31, 2014 | 10 Comments

The DataStax Java driver for Cassandra uses an asynchronous architecture. This allows client code to get query results in a non-blocking way, via Future instances. In this post, we take a closer look at this concept, and use it to implement a client-side equivalent to the SELECT...IN query.

Asynchronous query result: ResultSetFuture

Here is a high-level overview of the execution of a client query with Session#executeAsync:

future_sequence_diagram

  1. Apart from sending the query to Cassandra, the driver registers an internal ResponseHandler, which will process the response when it is available. It then gives control back to the caller, returning a ResultSetFuture which represents the future completion of the query. This object implements Java's Future; at this point, its isDone method returns false.
  2. When Cassandra returns the response, the driver notifies the ResponseHandler (many handlers can be registered for different queries, so the match is made with the stream id, a unique identifier that was initially sent with the request). The handler will in turn complete the future. This is all executed in an I/O thread managed by Netty, the underlying networking framework.
  3. At some point, the client code will invoke the future's get method to obtain the result. This will block if the future has not yet completed.

You can observe this process with this (admittedly naive) code snippet:

ResultSetFuture future = session.executeAsync("SELECT release_version FROM system.local");

while (!future.isDone()) {
    logger.debug("Waiting for request to complete");
}
ResultSet rs = future.get();
logger.debug("Got response: {}", rs.one().getString("release_version"));

On my laptop, the future takes about 4 milliseconds to complete, which gives the main thread time for a few iterations in the loop. Of course, that loop is for demonstration purposes only; you don't need it since the call to get is blocking.

get also has a variant that waits for a given amount of time. If you decide to give up on the future after the timeout has elapsed, it's good practice to cancel it:

try {
    ResultSet rs = future.get(5, TimeUnit.SECONDS);
    ... // do something with the results
} catch (TimeoutException e) {
    future.cancel(true);
    ... // the query did not complete within 5 seconds, switch to plan B
}

There are also non-blocking approaches, as we'll see in the next section.

Note: the driver is asynchronous by nature; synchronous methods like Session#execute are mere wrappers that call the asynchronous version, then immediately get the future's result (example).

A better future: ListenableFuture

Future is a nice abstraction, but it's a bit limited in its use: you can either check periodically if it's done, or wait for its result in a blocking manner. That's why the ResultSetFutures returned by the Java driver extend ListenableFuture. This interface is part of Google's Guava library; it's a specialized Future that allows the execution of callbacks upon completion.

To illustrate that, let's consider the task of updating a hypothetical GUI with the result of a query:

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;

ResultSetFuture future = session.executeAsync("SELECT release_version FROM system.local");
Futures.addCallback(future,
    new FutureCallback<ResultSet>() {
        @Override public void onSuccess(ResultSet result) {
            gui.setMessage("Cassandra version is " + result.one().getString("release_version"));
        }

        @Override public void onFailure(Throwable t) {
            gui.setMessage("Error while reading Cassandra version: " + t.getMessage());
        }
    },
    MoreExecutors.sameThreadExecutor()
);

Note the last argument to addCallback: it is an executor responsible for providing the thread which will execute the callback. With sameThreadExecutor, this will be the client thread if the future has already completed by the time we register the callback, or the Netty I/O thread otherwise. This is fine if the callback is lightweight; for more compute-intensive tasks, consider providing your own executor to avoid blocking I/O threads for too long.

Guava provides several utility methods to work with ListenableFutures, most of which are exposed by the Futures class. In the next section, we're going to see an interesting use for two of them.

Case study: multi-partition query, a.k.a. "client-side SELECT...IN"

A common use-case with Cassandra is to retrieve the same data from various partitions. The most straightforward approach is to use a CQL SELECT...IN query:

CREATE TABLE IF NOT EXISTS users (id uuid PRIMARY KEY, name text);

SELECT * FROM users WHERE id IN (
    e6af74a8-4711-4609-a94f-2cbfab9695e5,
    281336f4-2a52-4535-847c-11a4d3682ec1,
    c32b8d37-89bd-4dfe-a7d5-5f0258692d05
);

This is not necessarily optimal: this query will be sent to a coordinator node, which will then have to query replicas for each partition key. Considering that we have a smart token-aware driver, it would be more efficient to send an individual query for each partition key (SELECT * FROM users WHERE id = ?), which would reach the right replica directly. Then all that's left is to collate the results client-side.

Solution 1: return all the results as a list

First the easy part: given the query string and a list of partition keys, we execute the query for each partition key. This produces a list of futures:

private static List<ResultSetFuture> sendQueries(Session session, String query, Object[] partitionKeys) {
    List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(partitionKeys.length);
    for (Object partitionKey : partitionKeys)
        futures.add(session.executeAsync(query, partitionKey));
    return futures;
}

Now we use Guava's successfulAsList to transform the List<Future<ResultSet>> into a Future<List<ResultSet>> (slightly pedantic side note: in functional programming terms, this is similar to the sequence operation on a traversable functor):

public static Future<List<ResultSet>> queryAllAsList(Session session, String query, Object... partitionKeys) {
    List<ResultSetFuture> futures = sendQueries(session, query, partitionKeys);
    return Futures.successfulAsList(futures);
}

The client gets a single future containing the list of results:

Future<List<ResultSet>> future = ResultSets.queryAllAsList(session,
    "SELECT * FROM users WHERE id = ?",
    UUID.fromString("e6af74a8-4711-4609-a94f-2cbfab9695e5"), UUID.fromString("281336f4-2a52-4535-847c-11a4d3682ec1") //...
);
for (ResultSet rs : future.get()) {
    ... // process the result set    
}

There is one drawback: the compound future only completes after the slowest response has arrived. The client won't have access to any of the results before that. It would be valuable to get the results as they become available, to start processing them right away, or stream them to a consumer. Let's see another approach to fix that.

Solution 2: return a list of futures in completion order

This is almost similar, except that we use another utility method from Guava:

public static List<ListenableFuture<ResultSet>> queryAll(Session session, String query, Object... partitionKeys) {
    List<ResultSetFuture> futures = sendQueries(session, query, partitionKeys);
    return Futures.inCompletionOrder(futures);
}

Note: inCompletionOrder is available since Guava 17.0; the Java driver currently uses an older version, so we need to override the dependency in our POM.

The client now gets a list of futures, but they are guaranteed to be in completion order. So it can retrieve the results sequentially, with the guarantee that it won't wait unnecessarily while other results were available:

List<ListenableFuture<ResultSet>> futures = ResultSets.queryAll(session,
    "SELECT * FROM users WHERE id = ?",
    UUID.fromString("e6af74a8-4711-4609-a94f-2cbfab9695e5"), UUID.fromString("281336f4-2a52-4535-847c-11a4d3682ec1") //...
);
for (ListenableFuture<ResultSet> future : futures) {
    ResultSet rs = future.get();
    ... // process the result set    
}

(The magic behind inCompletionOrder is that the futures it returns are in fact delegates, that get resolved sequentially each time one of the original futures completes — see the source code for more details).

The next solution uses the same approach, but with a different API.

Solution 3: return an RxJava Observable

RxJava describes itself as "a library for composing asynchronous and event-based programs". One of its core abstractions is Observable, which can be seen as a concurrent iterator. An observable emits a sequence of values over time. Observers can register to an observable, to be notified as the values become available.

This translates really well to our case: once again, we start with the list of futures from our asynchronous queries; then we transform each future into an observable, and finally merge all the observables into a single one.

public static Observable<ResultSet> queryAllAsObservable(Session session, String query, Object... partitionKeys) {
    List<ResultSetFuture> futures = sendQueries(session, query, partitionKeys);
    Scheduler scheduler = Schedulers.io();
    List<Observable<ResultSet>> observables = Lists.transform(futures, (ResultSetFuture future) -> Observable.from(future, scheduler));
    return Observable.merge(observables);
}

Note that we need to provide a Scheduler instance to Observable.from, otherwise each individual observator blocks on the corresponding future.

Here's the skeleton code to register an observer:

Observable<ResultSet> results = ResultSets.queryAllAsObservable(session,
    "SELECT * FROM users WHERE id = ?",
    UUID.fromString("e6af74a8-4711-4609-a94f-2cbfab9695e5"), UUID.fromString("281336f4-2a52-4535-847c-11a4d3682ec1") //...
);

results.subscribe(new Observer<ResultSet>() {
    @Override public void onNext(ResultSet resultSet) {
        ... // process the result set
    }

    @Override public void onError(Throwable throwable) {
        ... // process the error
    }

    @Override public void onCompleted() {
        // no more results
    }
});

Conclusion

The Java driver allows you to take advantage of its non-blocking nature through its executeAsync method. Guava and RxJava provide powerful combinators to transform and compose these asynchronous results. We hope this article gave you the motivation to further explore the APIs and use reactive patterns in your code. The examples in this article were kept simple for brevity, but they could be expanded in various ways:

  1. support for composite partition keys;
  2. better error handling (for example, successfulAsList sets a null element at the position of any future that failed, which is not ideal in our case);
  3. build an Observable<Row> rather than an Observable<ResultSet> (this is left as an exercise to the reader — hint: you'll need the flatMapIterable method).

The code samples are available in a GitHub repository.

Edit 2014/10/31: provide a Scheduler in solution 3 to make the compound observable truly asynchronous. Thanks Duy Hai Doan for spotting this.









DataStax has many ways for you to advance in your career and knowledge.

You can take free classes, get certified, or read one of our many white papers.



register for classes

get certified

DBA's Guide to NoSQL







Comments

  1. sedovav says:

    Is there any limit on the amount of simultaneous session.executeAsync calls? I get “com.datastax.driver.core.exceptions.DriverException: Timeout during read” anyway.

    1. Carl Wang says:

      yes, default fetch size is 5000.

  2. Chandan Kumar says:

    If you use Java 8, you can use its Stream API, to parallelize the query like this:

    public List queryAll(final Session session, String query, final List partitionKeys) {

    String qry = “SELECT * FROM users WHERE id = ?”;

    final PreparedStatement prepStmt = session.prepare(query);

    return partitionKeys.parallelStream().map((k)->{
    ResultSet rs = session.execute(prepStmt.bind(k));
    return rs.one();
    }).collect(Collectors.toList());

    }

    1. Kevin Gallardo says:

      I believe this is not correct.
      While using parallelStream() will allow you to simultaneously run multiple synchronous queries at the same time, you will be limited to 1 query for each application thread the parallelStream can provide you with.
      Using the executeAsync() methods of the driver, one single thread can handle multiple queries at the same time, because the multiplexing is done at the protocol level, between the client and the server, not by the client itself.

  3. Jack Repenning says:

    In the “Solution 2: return a list of futures in completion order” solution, does the “process the result set” block need to be thread-safe?

  4. Sameer Nadgouda says:

    Hi,
    i am running below query. But i wanted to find out if the results for each query is present in Cassandra or not.
    qList.stream()
    .forEach(a -> {
    resultSetList.add(session.executeAsync(mapPrepared.get(a.gettable()).bind(a.getname(),a.getlastname())));
    });
    List rsApt = Futures.allAsList(resultSetList).get();

    Since Futures.allAsList promisses that the ordering is maintained i loop through it to find out which query gave me empty results. I wanted to know if this is a good practice thank you.

  5. Ivan says:

    Do I understand this correctly, asyc access can also be implemented in the application with same performance? I.e. the underlying protocol is still synchronous, but cassandra driver just wraps each request into the future, without any buffering, or other optimizations?

  6. Sateesh Siripurapu says:

    Hi ,

    Is anyone facing read timeout exception while we are trying to fetch millions of distinct records into the future and perform future.get on it ??

    On the first place how do we fetch say 20 million records . when i provide fetch size as 50k it fails after 3 million records.

    1. Neo says:

      Yes, I have the same problem, do you find a solution for this?

      1. Narendra says:

        use Spark to solve you problem, fetch data using spark and process, in you case data is huge so spark will work perfectly

Comments

Your email address will not be published. Required fields are marked *




Subscribe for newsletter:

Tel. +1 (408) 933-3120 sales@datastax.com Offices France Germany

DataStax Enterprise is powered by the best distribution of Apache Cassandra™.

© 2017 DataStax, All Rights Reserved. DataStax, Titan, and TitanDB are registered trademark of DataStax, Inc. and its subsidiaries in the United States and/or other countries.
Apache Cassandra, Apache, Tomcat, Lucene, Solr, Hadoop, Spark, TinkerPop, and Cassandra are trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.