Skip to content

Commit b207e06

Browse files
authored
Enhanced the BufferTimeoutManager to flush the librdkafka queue (#2103)
1 parent caca3cd commit b207e06

File tree

3 files changed

+320
-3
lines changed

3 files changed

+320
-3
lines changed

src/confluent_kafka/experimental/aio/producer/_buffer_timeout_manager.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ async def _monitor_timeout(self):
106106

107107
# Check if buffer has been inactive for too long
108108
time_since_activity = time.time() - manager._last_activity
109-
if (time_since_activity >= manager._timeout and
110-
not manager._batch_processor.is_buffer_empty()):
109+
if (time_since_activity >= manager._timeout):
111110

112111
try:
113112
# Flush the buffer due to timeout
@@ -125,6 +124,11 @@ async def _flush_buffer_due_to_timeout(self):
125124
This method handles the complete timeout flush workflow:
126125
1. Create batches from the batch processor
127126
2. Execute batches from the batch processor
127+
3. Flush librdkafka queue to ensure messages are delivered
128128
"""
129-
# Create batches from current buffer
129+
# Create batches from current buffer and send to librdkafka queue
130130
await self._batch_processor.flush_buffer()
131+
132+
# Flush librdkafka queue to ensure messages are delivered to broker
133+
# 0 timeout means non-blocking flush
134+
await self._kafka_executor.flush_librdkafka_queue(0)

src/confluent_kafka/experimental/aio/producer/_kafka_batch_executor.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import asyncio
1616
import logging
1717

18+
from .. import _common
19+
1820
logger = logging.getLogger(__name__)
1921

2022

@@ -88,6 +90,23 @@ def _produce_batch_and_poll():
8890
loop = asyncio.get_running_loop()
8991
return await loop.run_in_executor(self._executor, _produce_batch_and_poll)
9092

93+
async def flush_librdkafka_queue(self, timeout=-1):
94+
"""Flush the librdkafka queue and wait for all messages to be delivered
95+
96+
This method awaits until all outstanding produce requests are completed
97+
or the timeout is reached, unless the timeout is set to 0 (non-blocking).
98+
99+
Args:
100+
timeout: Maximum time to wait in seconds:
101+
- -1 = wait indefinitely (default)
102+
- 0 = non-blocking, return immediately
103+
- >0 = wait up to timeout seconds
104+
105+
Returns:
106+
Number of messages still in queue after flush attempt
107+
"""
108+
return await _common.async_call(self._executor, self._producer.flush, timeout)
109+
91110
def _handle_partial_failures(self, batch_messages):
92111
"""Handle messages that failed during produce_batch
93112
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Unit tests for the BufferTimeoutManager class (_buffer_timeout_manager.py)
4+
5+
This module tests the BufferTimeoutManager class to ensure proper
6+
buffer timeout monitoring and automatic flush handling.
7+
"""
8+
9+
import asyncio
10+
import unittest
11+
import time
12+
from unittest.mock import Mock, AsyncMock
13+
14+
from confluent_kafka.experimental.aio.producer._buffer_timeout_manager import BufferTimeoutManager
15+
16+
17+
class TestBufferTimeoutManager(unittest.TestCase):
18+
"""Test cases for BufferTimeoutManager class"""
19+
20+
def setUp(self):
21+
self.loop = asyncio.new_event_loop()
22+
asyncio.set_event_loop(self.loop)
23+
24+
# Create mock dependencies
25+
self.mock_batch_processor = Mock()
26+
self.mock_kafka_executor = Mock()
27+
28+
# Configure mock methods as async
29+
self.mock_batch_processor.flush_buffer = AsyncMock()
30+
self.mock_batch_processor.is_buffer_empty = Mock(return_value=False)
31+
self.mock_kafka_executor.flush_librdkafka_queue = AsyncMock(return_value=0)
32+
33+
# Create timeout manager with 1 second timeout
34+
self.timeout_manager = BufferTimeoutManager(
35+
self.mock_batch_processor,
36+
self.mock_kafka_executor,
37+
timeout=1.0
38+
)
39+
40+
def tearDown(self):
41+
# Stop any running timeout monitoring tasks
42+
if hasattr(self.timeout_manager, '_timeout_task') and self.timeout_manager._timeout_task:
43+
self.timeout_manager.stop_timeout_monitoring()
44+
self.loop.close()
45+
46+
def test_initialization(self):
47+
"""Test that BufferTimeoutManager initializes correctly"""
48+
self.assertEqual(self.timeout_manager._batch_processor, self.mock_batch_processor)
49+
self.assertEqual(self.timeout_manager._kafka_executor, self.mock_kafka_executor)
50+
self.assertEqual(self.timeout_manager._timeout, 1.0)
51+
self.assertFalse(self.timeout_manager._running)
52+
self.assertIsNone(self.timeout_manager._timeout_task)
53+
self.assertIsInstance(self.timeout_manager._last_activity, float)
54+
self.assertGreater(self.timeout_manager._last_activity, 0)
55+
56+
def test_mark_activity(self):
57+
"""Test that mark_activity updates the last activity timestamp"""
58+
initial_time = self.timeout_manager._last_activity
59+
time.sleep(0.01) # Ensure time difference
60+
61+
self.timeout_manager.mark_activity()
62+
63+
self.assertGreater(self.timeout_manager._last_activity, initial_time)
64+
65+
def test_start_timeout_monitoring(self):
66+
"""Test that timeout monitoring task starts correctly"""
67+
async def async_test():
68+
self.timeout_manager.start_timeout_monitoring()
69+
70+
self.assertTrue(self.timeout_manager._running)
71+
self.assertIsNotNone(self.timeout_manager._timeout_task)
72+
self.assertFalse(self.timeout_manager._timeout_task.done())
73+
74+
# Clean up
75+
self.timeout_manager.stop_timeout_monitoring()
76+
77+
self.loop.run_until_complete(async_test())
78+
79+
def test_stop_timeout_monitoring(self):
80+
"""Test that timeout monitoring task stops correctly"""
81+
async def async_test():
82+
self.timeout_manager.start_timeout_monitoring()
83+
self.assertTrue(self.timeout_manager._running)
84+
85+
self.timeout_manager.stop_timeout_monitoring()
86+
87+
self.assertFalse(self.timeout_manager._running)
88+
# Task should be cancelled or None
89+
self.assertTrue(
90+
self.timeout_manager._timeout_task is None or
91+
self.timeout_manager._timeout_task.done()
92+
)
93+
94+
self.loop.run_until_complete(async_test())
95+
96+
def test_start_timeout_monitoring_disabled(self):
97+
"""Test that timeout monitoring doesn't start when timeout is 0"""
98+
manager = BufferTimeoutManager(
99+
self.mock_batch_processor,
100+
self.mock_kafka_executor,
101+
timeout=0 # Disabled
102+
)
103+
104+
manager.start_timeout_monitoring()
105+
106+
self.assertFalse(manager._running)
107+
self.assertIsNone(manager._timeout_task)
108+
109+
def test_flush_buffer_due_to_timeout(self):
110+
"""Test that _flush_buffer_due_to_timeout calls both flush methods"""
111+
async def async_test():
112+
# Call the flush method
113+
await self.timeout_manager._flush_buffer_due_to_timeout()
114+
115+
# Verify both flush operations were called
116+
self.mock_batch_processor.flush_buffer.assert_called_once()
117+
self.mock_kafka_executor.flush_librdkafka_queue.assert_called_once()
118+
119+
self.loop.run_until_complete(async_test())
120+
121+
def test_flush_buffer_due_to_timeout_order(self):
122+
"""Test that flush operations are called in the correct order"""
123+
call_order = []
124+
125+
async def track_batch_flush():
126+
call_order.append('batch_processor')
127+
128+
async def track_kafka_flush(timeout=0):
129+
call_order.append('kafka_executor')
130+
131+
self.mock_batch_processor.flush_buffer.side_effect = track_batch_flush
132+
self.mock_kafka_executor.flush_librdkafka_queue.side_effect = track_kafka_flush
133+
134+
async def async_test():
135+
await self.timeout_manager._flush_buffer_due_to_timeout()
136+
137+
# Verify order: batch processor first, then kafka executor
138+
self.assertEqual(call_order, ['batch_processor', 'kafka_executor'])
139+
140+
self.loop.run_until_complete(async_test())
141+
142+
def test_flush_buffer_due_to_timeout_batch_processor_error(self):
143+
"""Test that errors from batch processor are propagated"""
144+
self.mock_batch_processor.flush_buffer.side_effect = Exception("Batch flush failed")
145+
146+
async def async_test():
147+
with self.assertRaises(Exception) as context:
148+
await self.timeout_manager._flush_buffer_due_to_timeout()
149+
150+
self.assertEqual(str(context.exception), "Batch flush failed")
151+
# Batch processor flush was attempted
152+
self.mock_batch_processor.flush_buffer.assert_called_once()
153+
# Kafka executor flush should not be called if batch processor fails
154+
self.mock_kafka_executor.flush_librdkafka_queue.assert_not_called()
155+
156+
self.loop.run_until_complete(async_test())
157+
158+
def test_flush_buffer_due_to_timeout_kafka_executor_error(self):
159+
"""Test that errors from kafka executor are propagated"""
160+
self.mock_kafka_executor.flush_librdkafka_queue.side_effect = Exception("Kafka flush failed")
161+
162+
async def async_test():
163+
with self.assertRaises(Exception) as context:
164+
await self.timeout_manager._flush_buffer_due_to_timeout()
165+
166+
self.assertEqual(str(context.exception), "Kafka flush failed")
167+
# Both flush operations were attempted
168+
self.mock_batch_processor.flush_buffer.assert_called_once()
169+
self.mock_kafka_executor.flush_librdkafka_queue.assert_called_once()
170+
171+
self.loop.run_until_complete(async_test())
172+
173+
def test_monitor_timeout_triggers_flush(self):
174+
"""Test that timeout monitoring triggers flush when buffer is inactive"""
175+
async def async_test():
176+
# Set last activity to past time (simulating inactivity)
177+
self.timeout_manager._last_activity = time.time() - 2.0 # 2 seconds ago
178+
179+
# Start monitoring
180+
self.timeout_manager.start_timeout_monitoring()
181+
182+
# Wait for timeout check to trigger
183+
# Check interval for 1s timeout should be 0.5s, add buffer time
184+
await asyncio.sleep(0.7)
185+
186+
# Stop monitoring
187+
self.timeout_manager.stop_timeout_monitoring()
188+
189+
# Verify flush was called
190+
self.mock_batch_processor.flush_buffer.assert_called()
191+
self.mock_kafka_executor.flush_librdkafka_queue.assert_called()
192+
193+
self.loop.run_until_complete(async_test())
194+
195+
def test_monitor_timeout_flushes_librdkafka_even_when_buffer_empty(self):
196+
"""Test that timeout monitoring flushes librdkafka queue even when local buffer is empty
197+
This ensures that messages sitting in librdkafka's internal queue get delivered,
198+
even if there are no messages in our local buffer.
199+
"""
200+
# Configure buffer as empty
201+
self.mock_batch_processor.is_buffer_empty.return_value = True
202+
203+
async def async_test():
204+
# Set last activity to past time
205+
self.timeout_manager._last_activity = time.time() - 2.0
206+
207+
# Start monitoring
208+
self.timeout_manager.start_timeout_monitoring()
209+
210+
# Wait for potential timeout check
211+
await asyncio.sleep(0.7)
212+
213+
# Stop monitoring
214+
self.timeout_manager.stop_timeout_monitoring()
215+
216+
# flush_buffer() returns early since buffer is empty
217+
self.mock_batch_processor.flush_buffer.assert_called_once()
218+
# BUT librdkafka queue flush should still happen
219+
self.mock_kafka_executor.flush_librdkafka_queue.assert_called_once()
220+
221+
self.loop.run_until_complete(async_test())
222+
223+
def test_monitor_timeout_does_not_flush_recent_activity(self):
224+
"""Test that timeout monitoring doesn't flush if buffer has recent activity"""
225+
async def async_test():
226+
# Set last activity to recent time (within timeout)
227+
self.timeout_manager._last_activity = time.time() - 0.3 # 0.3 seconds ago
228+
229+
# Start monitoring
230+
self.timeout_manager.start_timeout_monitoring()
231+
232+
# Wait for potential timeout check (less than timeout duration)
233+
# 0.3s (initial) + 0.4s (sleep) = 0.7s < 1.0s timeout
234+
await asyncio.sleep(0.4)
235+
236+
# Stop monitoring
237+
self.timeout_manager.stop_timeout_monitoring()
238+
239+
# Verify flush was NOT called since activity is recent
240+
self.mock_batch_processor.flush_buffer.assert_not_called()
241+
self.mock_kafka_executor.flush_librdkafka_queue.assert_not_called()
242+
243+
self.loop.run_until_complete(async_test())
244+
245+
def test_monitor_timeout_updates_activity_after_flush(self):
246+
"""Test that timeout monitoring updates activity timestamp after successful flush"""
247+
async def async_test():
248+
# Set last activity to past time
249+
self.timeout_manager._last_activity = time.time() - 2.0
250+
initial_time = self.timeout_manager._last_activity
251+
252+
# Start monitoring
253+
self.timeout_manager.start_timeout_monitoring()
254+
255+
# Wait for timeout check to trigger
256+
await asyncio.sleep(0.7)
257+
258+
# Stop monitoring
259+
self.timeout_manager.stop_timeout_monitoring()
260+
261+
# Verify activity was updated after flush
262+
self.assertGreater(self.timeout_manager._last_activity, initial_time)
263+
264+
self.loop.run_until_complete(async_test())
265+
266+
def test_weak_reference_cleanup(self):
267+
"""Test that weak references allow proper garbage collection"""
268+
async def async_test():
269+
# Create a manager with timeout monitoring
270+
manager = BufferTimeoutManager(
271+
self.mock_batch_processor,
272+
self.mock_kafka_executor,
273+
timeout=1.0
274+
)
275+
manager.start_timeout_monitoring()
276+
277+
# Wait a bit for task to start
278+
await asyncio.sleep(0.1)
279+
280+
# Get reference to the task before deleting manager
281+
task = manager._timeout_task
282+
self.assertIsNotNone(task)
283+
self.assertFalse(task.done())
284+
285+
# Stop the manager first to ensure clean shutdown
286+
manager.stop_timeout_monitoring()
287+
288+
# Wait a bit for the cancellation to take effect
289+
await asyncio.sleep(0.8)
290+
291+
# Verify the task stopped when we explicitly stopped monitoring
292+
self.assertTrue(task.done())
293+
294+
self.loop.run_until_complete(async_test())

0 commit comments

Comments
 (0)