DataStax Python Driver: A Multiprocessing Example for Improved Bulk Data Throughput

By Adam Holmberg -  June 23, 2015 | 8 Comments

One question we get a lot from application integrators is “how to make it faster”. In languages such as C++ and Java, this is often a matter of using asynchronous request execution, and tuning IO threads to maximally utilize the platform (e.g. multiple cores). Most DataStax drivers have utility functions for efficiently making concurrent asynchronous requests, and the Python driver is no exception. This works well for workloads that are IO bound. However, applications working with large datasets quickly become CPU bound as (de-)serialization dominates. Unfortunately a Python process is mostly relegated to a single effective CPU due to the Global Interpreter Lock (GIL). The Python threading documentation explains:

Screen Shot 2015-06-16 at 4.01.24 PM

While Python might not be the best choice for bulk-processing workloads, its ease-of-use and raft of scientific processing libraries still make it attractive for experimentation and analysis with large datasets. The demand is there — so how can we make this faster using the driver?

There is work to be done in making single-thread processing more efficient overall, to squeeze the most possible out of a single interpreter. In lieu of that, the most direct way to escape this limitation is to use the multiprocessing package from the Python standard library. This module provides simple, yet powerful abstractions over process management and inter-process communication (IPC), allowing applications to easily spawn multiple Python instances and marshal data between them.

In this post I use a small example workload to show how to use multiprocessing with the DataStax Python Driver to achieve higher throughput using multiple CPUs. The code referenced in this article is available in complete, self-contained examples in a repo here. The test setup uses a single node Cassandra cluster, running on the same host as the clients. Throughput figures cited here are only for illustration of limitations, and discussion of relative performance.

Single Process Baseline

To introduce the problem, let’s start with a standard, single-process implementation of a contrived workload. We’ll have a class that will manage my cluster and session, and produce query results for a sequence of parameters, using a predefined statement. We’ll query system.local to simplify setup, and because it has a variety of data for deserialization (and plenty — it can be upwards of 6KB when using vnodes). The parameter stream for this workload will simply be a sequence of keys for the table (always ‘local’ in this case).

class QueryManager(object):

    def __init__(self, cluster):
        self._setup(cluster)

    def _setup(self, cluster):
        self.session = cluster.connect()
        self.session.row_factory = tuple_factory
        self.prepared = self.session.prepare('SELECT * FROM system.local WHERE key=?')

    def get_results(self, params):
        return self._results_from_concurrent(params)

    def _results_from_concurrent(self, params):
        return [results[1] for results in execute_concurrent_with_args(self.session, self.prepared, params)]

Running this workload against a single local node shows the problem at hand:

Screen Shot 2015-06-17 at 11.50.50 AM

The Python interpreter is pegged near 100% (effectively one core), and the Cassandra java process is not pushed too hard. Meanwhile, other cores remain idle. As a baseline, this pattern takes over four seconds running 10,000 queries:

./single_process_concurrent.py 10000
10000 queries in 4.32866883278 seconds (2310.17903802/s)

We should be able to make this faster putting more cores to work.

Multi-processing Single Queries

The muliprocessing package makes it easy to distribute work among multiple processes, using the concept of a Pool. We can easily take advantage of this by updating our QueryManager to spawn a pool, and distribute request processing.

class QueryManager(object):

    batch_size = 10

    def __init__(self, cluster, process_count=None):
        self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(cluster,))

    @classmethod
    def _setup(cls, cluster):
        cls.session = cluster.connect()
        cls.session.row_factory = tuple_factory
        cls.prepared = cls.session.prepare('SELECT * FROM system.local WHERE key=?')

    def close_pool(self):
        self.pool.close()
        self.pool.join()

    def get_results(self, params):
        results = self.pool.map(_get_multiproc, params, self.batch_size)
        return results

    @classmethod
    def _execute_request(cls, params):
        return cls.session.execute(cls.prepared, params)

def _get_multiproc(params):
    return QueryManager._execute_request(params)

The pool allows us to specify an initializer, which is run in each subprocess after it has spawned. Note that the session is setup here, after forking to ensure that the event loop thread is initialized properly (internal threads do not survive the fork, and the cluster object is not designed to reinitialize in this case). Each subprocess has its own Session instance. In get_results we’re simply mapping each request to the pool, and returning the results. The package handles marshaling parameters and assembling results from processes in the right order. Note that the form of input parameters and output results are the same here, as in the single process version.

In comparison to the baseline:

./multiprocess_execute.py 10000
10000 queries in 1.9765329361 seconds (5059.36421163/s)

Running this for the same iterations shows better than 2x improvement in speed. Not bad for just a few lines of code. In this case we’re actually bumping up against the CPU capacity of this single machine, so this rate is limited by the local setup. Looking at the CPU utilization shows some load spread among multiple Python runtimes, and the server being pushed harder:

Screen Shot 2015-06-17 at 1.17.26 PM

But what if we don’t want to devote *all* of a host to this task? The package allows you to configure how many subprocess are started as part of the pool. Left unspecified, it defaults to the number of cores on the machine. Instead, we can specify a lower number to contain the CPU. Doing this with the present design results in lower load, but also diminished throughput:

./multiprocess_execute.py 10000 4
10000 queries in 3.204007864 seconds (3121.09096621/s)

Screen Shot 2015-06-17 at 1.30.41 PM

We can see that the worker processes are not fully utilized, which means this design is not as efficient as it could be in terms of requests per process overhead. To rectify this, we will introduce batching and make use of concurrent execution inside the pool.

Multi-processing Concurrent Queries

In the previous section we achieved improved throughput by simply mapping input parameters to single request executors. However, this simple approach did not result in full interpreter utilization, and thus requires more overhead for a given throughput. With only a few more tweaks, we can transform our work to take advantage of concurrent execution within each subprocess.

class QueryManager(object):

    concurrency = 100  # chosen to match the default in execute_concurrent_with_args

    def __init__(self, cluster, process_count=None):
        self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(cluster,))

    @classmethod
    def _setup(cls, cluster):
        cls.session = cluster.connect()
        cls.session.row_factory = tuple_factory
        cls.prepared = cls.session.prepare('SELECT * FROM system.local WHERE key=?')

    def close_pool(self):
        self.pool.close()
        self.pool.join()

    def get_results(self, params):
        params = list(params)
        results = self.pool.map(_multiprocess_get, (params[n:n + self.concurrency] for n in xrange(0, len(params), self.concurrency)))
        return list(itertools.chain(*results))

    @classmethod
    def _results_from_concurrent(cls, params):
        return [results[1] for results in execute_concurrent_with_args(cls.session, cls.prepared, params)]

def _multiprocess_get(params):
    return QueryManager._results_from_concurrent(params)

In the interest of comparison to the other designs, we are still accepting the same form of input, and producing the same list of results. In reality, an application would decide on an execution model, and likely process results in-place rather than doing this unnecessary transformation.

Now, we’re taking input parameters and batching them into work units of 100 requests. These batches are executed concurrently in the subprocess, and the results are chained back into a single list. Running this script shows the worker processes better utilized, and even a better absolute throughput when compared to the previous design given more workers.

./multiprocess_concurrent.py 10000 5
10000 queries in 1.56024098396 seconds (6409.26632667/s)

Screen Shot 2015-06-17 at 2.09.56 PM

Here again, the single-host setup is limiting, but the point is made.

Where to draw the line

The multiprocessing package makes it so easy to build on pooled processes, perhaps the most interesting part of constructing an application like this is deciding how to portion the work. In the examples above we had a known query and we simply marshalled parameters for that query. Another, more general design might pass various queries along with parameters.

Work units are not limited to request execution inputs, though. For example, a bulk loading task might accept filenames, which the subprocesses then divvy up and read to produce input. A query task might accept date ranges (or any type of ranges or discrete buckets) to query in parallel for further processing in the parent process.

Whatever the work breakdown, it’s fairly straightforward to apply these patterns for a given bulk processing application.

Resource Usage

Multiprocessing is a convenient way of escaping the GIL and taking advantage of multiple CPU cores available on most modern hardware. Naturally, it’s not free of trade-offs. Here are a few things to keep in mind when considering this pattern:

Session per Process

Each subprocess requires its own driver session for communication with the cluster. This needs to be setup in the initializer, after the pool has spawned.

This incurs some overhead in the form of memory and TCP connections. This is usually not an issue on most systems. However, depending on your deployment model, you may need to be cognizant of spawning many sessions concurrently. This can cause connection failures (backlog overflows), and load spikes on a cluster. To mitigate this, you may introduce setup throttling using synchronization primitives provided by multiprocessing.

Pool Overhead

This pattern is most suitable for bulk tasks in which the throughput increase outweighs the cost of initializing the pool. Consider having long-lived processes that can start the pool and amortize that cost over as much work as possible.

There is also some additional latency due to IPC between the parent and worker processes. This is minimized by finding the right work unit, and often inflating data on the worker side. Pooling may not be appropriate for latency sensitive applications, or those with small query loads.

Conclusion

It is common for Python applications using this driver to become CPU-bound doing (de-)serialization in the GIL. In this post we saw how easy it is to use a standard Python package, multiprocessing, to achieve higher concurrency and throughput via process pools. This pattern is easily applied to a variety of bulk-processing applications with only a few lines of code.



Comments

  1. Daiyue says:

    Hi, I am using the code in Multi-processing Concurrent Queries for inserting rows in Cassandra, but constantly got a ‘TypeError: can’t pickle _thread.lock objects’ error. I am passing the ‘cluster’ as the ‘initargs’ as well. I am running the code on Python 3.5. I am wondering does the example code run in Python 3?

    cheers

    1. Adam Holmberg says:

      You will find that not all objects can be pickled and serialized for IPC. You may need to decouple the data from objects with things like synchronization primitives.

      The code was written in Python 2, but it doesn’t take much to make it work in Python 3:
      https://github.com/aholmberg/driver-multiprocessing/tree/py3

      1. Daiyue says:

        Hi, could you provide an example for decoupling data. The code I created based on the Multi-processing Concurrent Queries (Python 3) is as following,

        class QueryManager(object):

        concurrency = 100 # chosen to match the default in execute_concurrent_with_args

        def __init__(self, session, process_count=None):
        self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,))

        @classmethod
        def _setup(cls, session):
        cls.session = session
        cls.prepared = cls.session.prepare(“””
        INSERT INTO test_table (key1, key2, key3, key4, key5) VALUES (?, ?, ?, ?, ?)
        “””)

        def close_pool(self):
        self.pool.close()
        self.pool.join()

        def get_results(self, params):
        results = self.pool.map(_multiprocess_write, (params[n:n+self.concurrency] for n in range(0, len(params), self.concurrency)))
        return list(itertools.chain(*results))

        @classmethod
        def _results_from_concurrent(cls, params):
        return [results[1] for results in execute_concurrent_with_args(cls.session, cls.prepared, params)]

        def _multiprocess_write(params):
        return QueryManager._results_from_concurrent(params)

        if __name__ == ‘__main__’:

        processes = 2

        # connect cluster
        cluster = Cluster(contact_points=[‘127.0.0.1’], port=9042)
        session = cluster.connect()

        # database name is a concatenation of client_id and system_id
        keyspace_name = ‘unit_test_0’

        # drop keyspace if it already exists in a cluster
        try:
        session.execute(“DROP KEYSPACE IF EXISTS ” + keyspace_name)
        except:
        pass

        create_keyspace_query = “CREATE KEYSPACE ” + keyspace_name \
        + ” WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’: ‘1’};”
        session.execute(create_keyspace_query)

        # use a session’s keyspace
        session.set_keyspace(keyspace_name)

        # drop table if it already exists in the keyspace
        try:
        session.execute(“DROP TABLE IF EXISTS ” + “test_table”)
        except:
        pass

        # create a table for invoices in the keyspace
        create_test_table = “CREATE TABLE test_table(”

        keys = “key1 text,\n” \
        “key2 text,\n” \
        “key3 text,\n” \
        “key4 text,\n” \
        “key5 text,\n”

        create_invoice_table_query += keys
        create_invoice_table_query += “PRIMARY KEY (key1))”
        session.execute(create_test_table)

        qm = QueryManager(session, processes)

        params = list()
        for row in range(100000):
        key = ‘test’ + str(row)
        params.append([key, ‘test’, ‘test’, ‘test’, ‘test’])

        start = time.time()
        rows = qm.get_results(params)
        delta = time.time() – start
        log.info(fm(‘Cassandra inserts 100k dummy rows for ‘, delta, ‘ secs’))

        1. Adam Holmberg says:

          I don’t think it will be productive doing this in the comments of a blog post. If you have specific questions, please address them to the mailing list or stack overflow tagged “cassandra” and “python”.

  2. Rohan Sachdeva says:

    Hi, I was going through the blog. Its really great.

    Can you please answer my queries:-

    1. Can we use threads and achieve the same performance using multi threading concurrent operations.

    2. Is it ok to use multiple clusters per process? although the clusters are pointing to same keyspace.

    3. In your example how do you ensure that, the session and cluster will be shutdown for all the spawned process.

    Thanks in advance.

    Best Regards,
    Rohan

    1. Adam Holmberg says:

      We generally prefer the mailing list or stack overflow for this type of discussion:
      Are you willing to post your questions there?

  3. Yuri Ch says:

    Examples do not work under Windows’ Python throwing exception described at http://stackoverflow.com/questions/37942249/cassandra-multiprocessing-cant-pickle-thread-lock-objects

    everything works under Linux though

    1. Adam Holmberg says:

      Thanks for the input.

      I replied to that post on SO, and there was more wrong there than simple platform issues (the code was not as written and described here).

      In any event, this is not released software. The blog post and example are meant to demonstrate a pattern. They are not universally applicable.

Comments

Your email address will not be published. Required fields are marked *




Subscribe for newsletter:

Tel. +1 (408) 933-3120 sales@datastax.com Offices France Germany

DataStax Enterprise is powered by the best distribution of Apache Cassandra™.

© 2017 DataStax, All Rights Reserved. DataStax, Titan, and TitanDB are registered trademark of DataStax, Inc. and its subsidiaries in the United States and/or other countries.
Apache Cassandra, Apache, Tomcat, Lucene, Solr, Hadoop, Spark, TinkerPop, and Cassandra are trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.