Skip to content

Commit 16e2661

Browse files
Add configurable number of threads to run consumer
1 parent 6de8c13 commit 16e2661

File tree

1 file changed

+24
-19
lines changed

1 file changed

+24
-19
lines changed

analytics/client.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class Client(object):
2828
def __init__(self, write_key=None, host=None, debug=False,
2929
max_queue_size=10000, send=True, on_error=None, flush_at=100,
3030
flush_interval=0.5, gzip=False, max_retries=3,
31-
sync_mode=False, timeout=15):
31+
sync_mode=False, timeout=15, thread=1):
3232
require('write_key', write_key, string_types)
3333

3434
self.queue = queue.Queue(max_queue_size)
@@ -45,21 +45,25 @@ def __init__(self, write_key=None, host=None, debug=False,
4545
self.log.setLevel(logging.DEBUG)
4646

4747
if sync_mode:
48-
self.consumer = None
48+
self.consumers = None
4949
else:
50-
self.consumer = Consumer(self.queue, write_key, host=host, on_error=on_error,
51-
flush_at=flush_at, flush_interval=flush_interval,
52-
gzip=gzip, retries=max_retries, timeout=timeout)
53-
54-
# if we've disabled sending, just don't start the consumer
50+
# On program exit, allow the consumer thread to exit cleanly.
51+
# This prevents exceptions and a messy shutdown when the interpreter is
52+
# destroyed before the daemon thread finishes execution. However, it
53+
# is *not* the same as flushing the queue! To guarantee all messages
54+
# have been delivered, you'll still need to call flush().
5555
if send:
56-
# On program exit, allow the consumer thread to exit cleanly.
57-
# This prevents exceptions and a messy shutdown when the interpreter is
58-
# destroyed before the daemon thread finishes execution. However, it
59-
# is *not* the same as flushing the queue! To guarantee all messages
60-
# have been delivered, you'll still need to call flush().
6156
atexit.register(self.join)
62-
self.consumer.start()
57+
for n in range(thread):
58+
self.consumers = []
59+
consumer = Consumer(self.queue, write_key, host=host, on_error=on_error,
60+
flush_at=flush_at, flush_interval=flush_interval,
61+
gzip=gzip, retries=max_retries, timeout=timeout)
62+
self.consumers.append(consumer)
63+
64+
# if we've disabled sending, just don't start the consumer
65+
if send:
66+
consumer.start()
6367

6468
def identify(self, user_id=None, traits=None, context=None, timestamp=None,
6569
anonymous_id=None, integrations=None, message_id=None):
@@ -263,12 +267,13 @@ def flush(self):
263267

264268
def join(self):
265269
"""Ends the consumer thread once the queue is empty. Blocks execution until finished"""
266-
self.consumer.pause()
267-
try:
268-
self.consumer.join()
269-
except RuntimeError:
270-
# consumer thread has not started
271-
pass
270+
for consumer in self.consumers:
271+
consumer.pause()
272+
try:
273+
consumer.join()
274+
except RuntimeError:
275+
# consumer thread has not started
276+
pass
272277

273278
def shutdown(self):
274279
"""Flush all messages and cleanly shutdown the client"""

0 commit comments

Comments
 (0)