Skip to content

Commit f1724e6

Browse files
committed
Improve handling of high message volumes:
Block when trying to produce a message and the librdkafka internal queue is full. Allow specifying a timeout for producer.close. Provide an interface to check the queue size.
1 parent b3bdf1d commit f1724e6

File tree

2 files changed

+61
-8
lines changed

2 files changed

+61
-8
lines changed

adc/producer.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,26 @@ def write(self,
4747
"Either configure a topic when constructing the Producer, "
4848
"or specify the topic argument to write()")
4949
self.logger.debug("writing message to %s", topic)
50+
produce_kwargs = {"headers": headers, "key": key}
5051
if delivery_callback is not None:
51-
self._producer.produce(topic, msg, headers=headers, key=key,
52-
on_delivery=delivery_callback)
53-
else:
54-
self._producer.produce(topic, msg, headers=headers, key=key,)
52+
produce_kwargs["on_delivery"] = delivery_callback
53+
while True:
54+
try:
55+
self._producer.produce(topic, msg, **produce_kwargs)
56+
break
57+
except BufferError:
58+
# It's hard to know what the size limit on the buffer is, so we will try to
59+
# wait until the number of items in it decreases (or it is empty, in case all
60+
# messages were successfully sent in the time it took to handle the error).
61+
buffer_len = len(self._producer)
62+
self.logger.debug(f"Blocking due to BufferError, buffer size: {buffer_len}")
63+
while buffer_len > 0 and len(self._producer) == buffer_len:
64+
self._producer.poll(0.01)
65+
66+
def queued_message_count(self):
67+
"""Get the number of messages waiting to be sent to the broker.
68+
"""
69+
return len(self._producer)
5570

5671
def flush(self, timeout: timedelta = timedelta(seconds=10)) -> int:
5772
"""Attempt to flush enqueued messages. Return the number of messages still
@@ -65,9 +80,9 @@ def flush(self, timeout: timedelta = timedelta(seconds=10)) -> int:
6580
self.logger.debug("flushed all messages")
6681
return n
6782

68-
def close(self) -> int:
83+
def close(self, timeout: timedelta = timedelta(seconds=10)) -> int:
6984
self.logger.debug("shutting down producer")
70-
return self.flush()
85+
return self.flush(timeout)
7186

7287
def __enter__(self) -> 'Producer':
7388
return self

tests/test_kafka_integration.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,44 @@ def test_message_with_key(self):
8484
self.assertEqual(msg.value(), b"can you hear me?")
8585
self.assertEqual(msg.key(), b"test_msg")
8686

87+
def test_message_with_callback(self):
88+
"""Try writing a message into the Kafka broker, with a delivery callback.
89+
"""
90+
topic = "test_message_with_callback"
91+
callback_called = False
92+
callback_error = False
93+
94+
def callback(err, msg):
95+
nonlocal callback_called
96+
nonlocal callback_error
97+
callback_called = True
98+
callback_error = err is not None
99+
100+
ap = adc.producer
101+
with ap.Producer(ap.ProducerConfig(broker_urls=[self.kafka.address],
102+
topic=topic, auth=self.kafka.auth)) as producer:
103+
producer.write("message data", delivery_callback=callback)
104+
producer.flush()
105+
consumer = adc.consumer.Consumer(adc.consumer.ConsumerConfig(
106+
broker_urls=[self.kafka.address],
107+
group_id="test_consumer",
108+
auth=self.kafka.auth,
109+
))
110+
consumer.subscribe(topic)
111+
stream = consumer.stream()
112+
113+
msg = next(stream)
114+
if msg.error() is not None:
115+
raise Exception(msg.error())
116+
# give the producer's background thread time to notice the ack from the broker and
117+
# fire the callback
118+
producer._producer.poll(0.1)
119+
120+
self.assertEqual(msg.topic(), topic)
121+
self.assertEqual(msg.value(), b"message data")
122+
self.assertEqual(callback_called, True)
123+
self.assertEqual(callback_error, False)
124+
87125
def test_reset_to_end(self):
88126
# Write a few messages.
89127
topic = "test_reset_to_end"
@@ -486,7 +524,7 @@ def poll_for_kafka_broker_address(self, maxiter=20, sleep=timedelta(milliseconds
486524
"""Block until the Docker daemon tells us the IP and Port of the Kafa broker.
487525
488526
Returns the ip and port as a string in the form "ip:port."
489-
"""
527+
"""
490528
i = 0
491529
while (not self.query_kafka_broker_address()) and i < maxiter:
492530
logger.info("polling to wait for container to acquire port...")
@@ -508,7 +546,7 @@ def query_kafka_broker_address(self):
508546
port = addrs[0]['HostPort']
509547
return f"{ip}:{port}"
510548

511-
def poll_for_kafka_active(self, maxiter=20, sleep=timedelta(milliseconds=500)):
549+
def poll_for_kafka_active(self, maxiter=40, sleep=timedelta(milliseconds=500)):
512550
"""Block until Kafka's network listener is accepting connections."""
513551
i = 0
514552
while (not self.query_kafka_active()) and i < maxiter:

0 commit comments

Comments
 (0)