Skip to content

Commit 522b7eb

Browse files
dd-octo-sts[bot]piochelepiotrclaude
authored
kafka_actions: Return early when end of partition is reached (#23004) (#23005)
* Return early from Kafka consume_messages when partition end is reached Instead of waiting until the global timeout, break immediately when poll() returns None or when all assigned partitions report EOF. This makes the read_messages action fast when there are no more messages to consume. * Add changelog entry for kafka_actions early return fix * Simplify early return tests to 2 concise tests * Remove enable.partition.eof and EOF tracking — just break on None --------- (cherry picked from commit 50338a5) Co-authored-by: Piotr WOLSKI <piotr.wolski42@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 5c2d77d commit 522b7eb

File tree

3 files changed

+29
-3
lines changed

3 files changed

+29
-3
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Return early from consume_messages when end of partition is reached instead of waiting for timeout

kafka_actions/datadog_checks/kafka_actions/kafka_client.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import time
99
from typing import TYPE_CHECKING, Any
1010

11-
from confluent_kafka import Consumer, KafkaException, Producer, TopicPartition
11+
from confluent_kafka import Consumer, KafkaError, KafkaException, Producer, TopicPartition
1212
from confluent_kafka.admin import AdminClient, ConfigResource, NewTopic, OffsetSpec, ResourceType
1313

1414
try:
@@ -272,10 +272,11 @@ def consume_messages(
272272
msg = consumer.poll(timeout=poll_timeout)
273273

274274
if msg is None:
275-
continue
275+
self.log.debug("Poll returned None (no more messages available), stopping consumption")
276+
break
276277

277278
if msg.error():
278-
if msg.error().code() == KafkaException._PARTITION_EOF:
279+
if msg.error().code() == KafkaError._PARTITION_EOF:
279280
self.log.debug("Reached end of partition")
280281
continue
281282
else:

kafka_actions/tests/test_unit.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,5 +524,29 @@ def test_start_timestamp_overrides_start_offset(self, dd_run_check, aggregator):
524524
assert call_kwargs['start_timestamp'] == 1700000000000
525525

526526

527+
class TestConsumeMessagesEarlyReturn:
528+
"""Test that consume_messages returns early on None poll or partition EOF."""
529+
530+
def test_none_poll_breaks_immediately(self):
531+
from unittest.mock import MagicMock
532+
533+
consumer = MagicMock()
534+
metadata = MagicMock()
535+
metadata.topics = {'t': MagicMock(partitions={0: MagicMock()})}
536+
consumer.list_topics.return_value = metadata
537+
consumer.poll.side_effect = [MockKafkaMessage(key=b'k', value=b'v', partition=0, offset=0), None]
538+
539+
import logging
540+
541+
from datadog_checks.kafka_actions.kafka_client import KafkaActionsClient
542+
543+
client = KafkaActionsClient({'kafka_connect_str': 'localhost:9092'}, logging.getLogger('test'))
544+
with patch.object(client, 'get_consumer', return_value=consumer):
545+
result = list(client.consume_messages(topic='t', start_offset=0, max_messages=1000, timeout_ms=30000))
546+
547+
assert len(result) == 1
548+
assert consumer.poll.call_count == 2
549+
550+
527551
if __name__ == '__main__':
528552
pytest.main([__file__, '-vv'])

0 commit comments

Comments
 (0)