Skip to content

Commit 2a314e6

Browse files
authored
Merge pull request segmentio#161 from pberganza-applaudostudios/feature/configurable-concurrency
Feature/configurable concurrency
2 parents 6de8c13 + d81f651 commit 2a314e6

File tree

2 files changed

+31
-23
lines changed

2 files changed

+31
-23
lines changed

analytics/client.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class Client(object):
2828
def __init__(self, write_key=None, host=None, debug=False,
2929
max_queue_size=10000, send=True, on_error=None, flush_at=100,
3030
flush_interval=0.5, gzip=False, max_retries=3,
31-
sync_mode=False, timeout=15):
31+
sync_mode=False, timeout=15, thread=1):
3232
require('write_key', write_key, string_types)
3333

3434
self.queue = queue.Queue(max_queue_size)
@@ -45,21 +45,25 @@ def __init__(self, write_key=None, host=None, debug=False,
4545
self.log.setLevel(logging.DEBUG)
4646

4747
if sync_mode:
48-
self.consumer = None
48+
self.consumers = None
4949
else:
50-
self.consumer = Consumer(self.queue, write_key, host=host, on_error=on_error,
51-
flush_at=flush_at, flush_interval=flush_interval,
52-
gzip=gzip, retries=max_retries, timeout=timeout)
53-
54-
# if we've disabled sending, just don't start the consumer
50+
# On program exit, allow the consumer thread to exit cleanly.
51+
# This prevents exceptions and a messy shutdown when the interpreter is
52+
# destroyed before the daemon thread finishes execution. However, it
53+
# is *not* the same as flushing the queue! To guarantee all messages
54+
# have been delivered, you'll still need to call flush().
5555
if send:
56-
# On program exit, allow the consumer thread to exit cleanly.
57-
# This prevents exceptions and a messy shutdown when the interpreter is
58-
# destroyed before the daemon thread finishes execution. However, it
59-
# is *not* the same as flushing the queue! To guarantee all messages
60-
# have been delivered, you'll still need to call flush().
6156
atexit.register(self.join)
62-
self.consumer.start()
57+
for n in range(thread):
58+
self.consumers = []
59+
consumer = Consumer(self.queue, write_key, host=host, on_error=on_error,
60+
flush_at=flush_at, flush_interval=flush_interval,
61+
gzip=gzip, retries=max_retries, timeout=timeout)
62+
self.consumers.append(consumer)
63+
64+
# if we've disabled sending, just don't start the consumer
65+
if send:
66+
consumer.start()
6367

6468
def identify(self, user_id=None, traits=None, context=None, timestamp=None,
6569
anonymous_id=None, integrations=None, message_id=None):
@@ -263,12 +267,13 @@ def flush(self):
263267

264268
def join(self):
265269
"""Ends the consumer thread once the queue is empty. Blocks execution until finished"""
266-
self.consumer.pause()
267-
try:
268-
self.consumer.join()
269-
except RuntimeError:
270-
# consumer thread has not started
271-
pass
270+
for consumer in self.consumers:
271+
consumer.pause()
272+
try:
273+
consumer.join()
274+
except RuntimeError:
275+
# consumer thread has not started
276+
pass
272277

273278
def shutdown(self):
274279
"""Flush all messages and cleanly shutdown the client"""

analytics/test/client.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,14 @@ def test_shutdown(self):
256256
# 1. client queue is empty
257257
# 2. consumer thread has stopped
258258
self.assertTrue(client.queue.empty())
259-
self.assertFalse(client.consumer.is_alive())
259+
for consumer in client.consumers:
260+
self.assertFalse(consumer.is_alive())
260261

261262
def test_synchronous(self):
262263
client = Client('testsecret', sync_mode=True)
263264

264265
success, message = client.identify('userId')
265-
self.assertIsNone(client.consumer)
266+
self.assertFalse(client.consumers)
266267
self.assertTrue(client.queue.empty())
267268
self.assertTrue(success)
268269

@@ -333,8 +334,10 @@ def mock_post_fn(*args, **kwargs):
333334

334335
def test_user_defined_timeout(self):
335336
client = Client('testsecret', timeout=10)
336-
self.assertEquals(client.consumer.timeout, 10)
337+
for consumer in client.consumers:
338+
self.assertEquals(consumer.timeout, 10)
337339

338340
def test_default_timeout_15(self):
339341
client = Client('testsecret')
340-
self.assertEquals(client.consumer.timeout, 15)
342+
for consumer in client.consumers:
343+
self.assertEquals(consumer.timeout, 15)

0 commit comments

Comments
 (0)