TechnologyJune 23, 2015

DataStax Python Driver: A Multiprocessing Example for Improved Bulk Data Throughput

Adam Holmberg
Adam Holmberg
DataStax Python Driver: A Multiprocessing Example for Improved Bulk Data Throughput

class QueryManager(object):

    def __init__(self, cluster):

        self._setup(cluster)

    def _setup(self, cluster):

        self.session = cluster.connect()

        self.session.row_factory = tuple_factory

        self.prepared = self.session.prepare('SELECT * FROM system.local WHERE key=?')

    def get_results(self, params):

        return self._results_from_concurrent(params)

    def _results_from_concurrent(self, params):

        return [results[1] for results in execute_concurrent_with_args(self.session, self.prepared, params)]

./single_process_concurrent.py 10000

10000 queries in 4.32866883278 seconds (2310.17903802/s)

class QueryManager(object):

    batch_size = 10

    def __init__(self, cluster, process_count=None):

        self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(cluster,))

    @classmethod

    def _setup(cls, cluster):

        cls.session = cluster.connect()

        cls.session.row_factory = tuple_factory

        cls.prepared = cls.session.prepare('SELECT * FROM system.local WHERE key=?')

    def close_pool(self):

        self.pool.close()

        self.pool.join()

    def get_results(self, params):

        results = self.pool.map(_get_multiproc, params, self.batch_size)

        return results

    @classmethod

    def _execute_request(cls, params):

        return cls.session.execute(cls.prepared, params)

def _get_multiproc(params):

    return QueryManager._execute_request(params)

./multiprocess_execute.py 10000

10000 queries in 1.9765329361 seconds (5059.36421163/s)

./multiprocess_execute.py 10000 4

10000 queries in 3.204007864 seconds (3121.09096621/s)

class QueryManager(object):

    concurrency = 100  # chosen to match the default in execute_concurrent_with_args

    def __init__(self, cluster, process_count=None):

        self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(cluster,))

    @classmethod

    def _setup(cls, cluster):

        cls.session = cluster.connect()

        cls.session.row_factory = tuple_factory

        cls.prepared = cls.session.prepare('SELECT * FROM system.local WHERE key=?')

    def close_pool(self):

        self.pool.close()

        self.pool.join()

    def get_results(self, params):

        params = list(params)

        results = self.pool.map(_multiprocess_get, (params[n:n + self.concurrency] for n in xrange(0, len(params), self.concurrency)))

        return list(itertools.chain(*results))

    @classmethod

    def _results_from_concurrent(cls, params):

        return [results[1] for results in execute_concurrent_with_args(cls.session, cls.prepared, params)]

def _multiprocess_get(params):

    return QueryManager._results_from_concurrent(params)

./multiprocess_concurrent.py 10000 5

10000 queries in 1.56024098396 seconds (6409.26632667/s)

Discover more
Drivers
Share

Open-Source,
Scale-Out, Cloud-Native
NoSQL Database

Astra DB is scale-out NoSQL built on Apache Cassandra™. Handle any workload with zero downtime and zero lock-in at global scale.

Company
Resources
Cloud Partners

DataStax, is a registered trademark of DataStax, Inc.. Apache, Apache Cassandra, Cassandra, Apache Pulsar, and Pulsar are either registered trademarks or trademarks of the Apache Software Foundation.

United States