TechnologyJune 23, 2015

DataStax Python Driver: A Multiprocessing Example for Improved Bulk Data Throughput

Adam Holmberg
Adam Holmberg
DataStax Python Driver: A Multiprocessing Example for Improved Bulk Data Throughput
class QueryManager(object):
    def __init__(self, cluster):
        self._setup(cluster)
    def _setup(self, cluster):
        self.session = cluster.connect()
        self.session.row_factory = tuple_factory
        self.prepared = self.session.prepare('SELECT * FROM system.local WHERE key=?')
    def get_results(self, params):
        return self._results_from_concurrent(params)
    def _results_from_concurrent(self, params):
        return [results[1] for results in execute_concurrent_with_args(self.session, self.prepared, params)]
./single_process_concurrent.py 10000
10000 queries in 4.32866883278 seconds (2310.17903802/s)
class QueryManager(object):
    batch_size = 10
    def __init__(self, cluster, process_count=None):
        self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(cluster,))
    @classmethod
    def _setup(cls, cluster):
        cls.session = cluster.connect()
        cls.session.row_factory = tuple_factory
        cls.prepared = cls.session.prepare('SELECT * FROM system.local WHERE key=?')
    def close_pool(self):
        self.pool.close()
        self.pool.join()
    def get_results(self, params):
        results = self.pool.map(_get_multiproc, params, self.batch_size)
        return results
    @classmethod
    def _execute_request(cls, params):
        return cls.session.execute(cls.prepared, params)
def _get_multiproc(params):
    return QueryManager._execute_request(params)
./multiprocess_execute.py 10000
10000 queries in 1.9765329361 seconds (5059.36421163/s)
./multiprocess_execute.py 10000 4
10000 queries in 3.204007864 seconds (3121.09096621/s)
class QueryManager(object):
    concurrency = 100  # chosen to match the default in execute_concurrent_with_args
    def __init__(self, cluster, process_count=None):
        self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(cluster,))
    @classmethod
    def _setup(cls, cluster):
        cls.session = cluster.connect()
        cls.session.row_factory = tuple_factory
        cls.prepared = cls.session.prepare('SELECT * FROM system.local WHERE key=?')
    def close_pool(self):
        self.pool.close()
        self.pool.join()
    def get_results(self, params):
        params = list(params)
        results = self.pool.map(_multiprocess_get, (params[n:n + self.concurrency] for n in xrange(0, len(params), self.concurrency)))
        return list(itertools.chain(*results))
    @classmethod
    def _results_from_concurrent(cls, params):
        return [results[1] for results in execute_concurrent_with_args(cls.session, cls.prepared, params)]
def _multiprocess_get(params):
    return QueryManager._results_from_concurrent(params)
./multiprocess_concurrent.py 10000 5
10000 queries in 1.56024098396 seconds (6409.26632667/s)
Discover more
Drivers
Share

One-stop Data API for Production GenAI

Astra DB gives JavaScript developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.