DataStax Python Driver: A Multiprocessing Example for Improved Bulk Data Throughput

class QueryManager(object):
def __init__(self, cluster):
self._setup(cluster)
def _setup(self, cluster):
self.session = cluster.connect()
self.session.row_factory = tuple_factory
self.prepared = self.session.prepare('SELECT * FROM system.local WHERE key=?')
def get_results(self, params):
return self._results_from_concurrent(params)
def _results_from_concurrent(self, params):
return [results[1] for results in execute_concurrent_with_args(self.session, self.prepared, params)]
./single_process_concurrent.py 10000
10000 queries in 4.32866883278 seconds (2310.17903802/s)
class QueryManager(object):
batch_size = 10
def __init__(self, cluster, process_count=None):
self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(cluster,))
@classmethod
def _setup(cls, cluster):
cls.session = cluster.connect()
cls.session.row_factory = tuple_factory
cls.prepared = cls.session.prepare('SELECT * FROM system.local WHERE key=?')
def close_pool(self):
self.pool.close()
self.pool.join()
def get_results(self, params):
results = self.pool.map(_get_multiproc, params, self.batch_size)
return results
@classmethod
def _execute_request(cls, params):
return cls.session.execute(cls.prepared, params)
def _get_multiproc(params):
return QueryManager._execute_request(params)
./multiprocess_execute.py 10000
10000 queries in 1.9765329361 seconds (5059.36421163/s)
./multiprocess_execute.py 10000 4
10000 queries in 3.204007864 seconds (3121.09096621/s)
class QueryManager(object):
concurrency = 100 # chosen to match the default in execute_concurrent_with_args
def __init__(self, cluster, process_count=None):
self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(cluster,))
@classmethod
def _setup(cls, cluster):
cls.session = cluster.connect()
cls.session.row_factory = tuple_factory
cls.prepared = cls.session.prepare('SELECT * FROM system.local WHERE key=?')
def close_pool(self):
self.pool.close()
self.pool.join()
def get_results(self, params):
params = list(params)
results = self.pool.map(_multiprocess_get, (params[n:n + self.concurrency] for n in xrange(0, len(params), self.concurrency)))
return list(itertools.chain(*results))
@classmethod
def _results_from_concurrent(cls, params):
return [results[1] for results in execute_concurrent_with_args(cls.session, cls.prepared, params)]
def _multiprocess_get(params):
return QueryManager._results_from_concurrent(params)
./multiprocess_concurrent.py 10000 5
10000 queries in 1.56024098396 seconds (6409.26632667/s)