Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka_actions/changelog.d/23004.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Return early from consume_messages when end of partition is reached instead of waiting for timeout
7 changes: 4 additions & 3 deletions kafka_actions/datadog_checks/kafka_actions/kafka_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
from typing import TYPE_CHECKING, Any

from confluent_kafka import Consumer, KafkaException, Producer, TopicPartition
from confluent_kafka import Consumer, KafkaError, KafkaException, Producer, TopicPartition
from confluent_kafka.admin import AdminClient, ConfigResource, NewTopic, OffsetSpec, ResourceType

try:
Expand Down Expand Up @@ -272,10 +272,11 @@ def consume_messages(
msg = consumer.poll(timeout=poll_timeout)

if msg is None:
continue
self.log.debug("Poll returned None (no more messages available), stopping consumption")
break
Comment on lines 274 to +276

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve the configured timeout when poll() is idle

In KafkaActionsClient.consume_messages, Consumer.poll() returning None only means the 1-second poll_timeout elapsed without a record, not that the topic is exhausted. _action_read_messages still passes the user-configured timeout_ms into this generator, so on sparse topics or producers slower than 1 msg/s this change exits after the first idle second instead of waiting for the remaining timeout and can miss messages that arrive later within the requested window.

Useful? React with 👍 / 👎.


if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
if msg.error().code() == KafkaError._PARTITION_EOF:
self.log.debug("Reached end of partition")
continue
else:
Expand Down
24 changes: 24 additions & 0 deletions kafka_actions/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,5 +524,29 @@ def test_start_timestamp_overrides_start_offset(self, dd_run_check, aggregator):
assert call_kwargs['start_timestamp'] == 1700000000000


class TestConsumeMessagesEarlyReturn:
"""Test that consume_messages returns early on None poll or partition EOF."""

def test_none_poll_breaks_immediately(self):
from unittest.mock import MagicMock

consumer = MagicMock()
metadata = MagicMock()
metadata.topics = {'t': MagicMock(partitions={0: MagicMock()})}
consumer.list_topics.return_value = metadata
consumer.poll.side_effect = [MockKafkaMessage(key=b'k', value=b'v', partition=0, offset=0), None]

import logging

from datadog_checks.kafka_actions.kafka_client import KafkaActionsClient

client = KafkaActionsClient({'kafka_connect_str': 'localhost:9092'}, logging.getLogger('test'))
with patch.object(client, 'get_consumer', return_value=consumer):
result = list(client.consume_messages(topic='t', start_offset=0, max_messages=1000, timeout_ms=30000))

assert len(result) == 1
assert consumer.poll.call_count == 2


if __name__ == '__main__':
pytest.main([__file__, '-vv'])
Loading