Skip to content

Commit 645e06b

Browse files
Improve kafka consumer highwater offset collection time (#20716)
* Improve kafka consumer highwater offset collection time * update seed mock client * fix the failing test_oauth_config * fetch highwater offset if there are actual partitions to check * linter errors
1 parent 4e668de commit 645e06b

File tree

4 files changed

+80
-51
lines changed

4 files changed

+80
-51
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve kafka consumer highwater offset collection time

kafka_consumer/datadog_checks/kafka_consumer/client.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def __init__(self, config, log) -> None:
1313
self.log = log
1414
self._kafka_client = None
1515
self._consumer = None
16+
self._cluster_metadata = None
1617

1718
@property
1819
def kafka_client(self):
@@ -106,18 +107,48 @@ def consumer_offsets_for_times(self, partitions):
106107
)
107108
]
108109

110+
def _list_topics(self):
111+
if self._cluster_metadata:
112+
return self._cluster_metadata
113+
114+
try:
115+
self.request_metadata_update()
116+
117+
except KafkaException as e:
118+
self.log.error("Received exception when listing topics: %s", e)
119+
120+
return self._cluster_metadata
121+
122+
def get_topic_partitions(self):
123+
topic_partitions = {}
124+
try:
125+
cluster_metadata = self._list_topics()
126+
for topic in cluster_metadata.topics:
127+
topic_metadata = cluster_metadata.topics[topic]
128+
partitions = list(topic_metadata.partitions)
129+
topic_partitions[topic] = partitions
130+
131+
except KafkaException as e:
132+
self.log.error("Received exception when listing topics: %s", e)
133+
134+
return topic_partitions
135+
109136
def get_partitions_for_topic(self, topic):
137+
partitions = []
110138
try:
111-
cluster_metadata = self.kafka_client.list_topics(topic, timeout=self.config._request_timeout)
139+
cluster_metadata = self._list_topics()
112140
except KafkaException as e:
113141
self.log.error("Received exception when getting partitions for topic %s: %s", topic, e)
114142
return []
115-
topic_metadata = cluster_metadata.topics[topic]
116-
return list(topic_metadata.partitions)
143+
144+
if topic in cluster_metadata.topics:
145+
topic_metadata = cluster_metadata.topics[topic]
146+
partitions = list(topic_metadata.partitions)
147+
return partitions
117148

118149
def request_metadata_update(self):
119150
# https://github.com/confluentinc/confluent-kafka-python/issues/594
120-
self.kafka_client.list_topics(None, timeout=self.config._request_timeout)
151+
self._cluster_metadata = self.kafka_client.list_topics(None, timeout=self.config._request_timeout)
121152

122153
def list_consumer_groups(self):
123154
groups = []

kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py

Lines changed: 27 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -343,60 +343,43 @@ def get_highwater_offsets(self, consumer_offsets):
343343
self.log.debug('Getting highwater offsets')
344344

345345
cluster_id = ""
346+
dd_consumer_group = "datadog-agent"
346347
highwater_offsets = {}
347-
topics_with_consumer_offset = set()
348-
topic_partition_with_consumer_offset = set()
348+
topic_partitions_to_check = set()
349349

350-
for consumer_group, offsets in consumer_offsets.items():
351-
self.log.debug('CONSUMER GROUP: %s', consumer_group)
352-
topic_partitions_for_highwater_offsets = set()
353-
self.client.open_consumer(consumer_group)
354-
cluster_id, topics = self.client.consumer_get_cluster_id_and_list_topics(consumer_group)
355-
356-
if not self.config._monitor_all_broker_highwatermarks:
357-
for topic, partition in offsets:
358-
topics_with_consumer_offset.add(topic)
359-
topic_partition_with_consumer_offset.add((topic, partition))
360-
361-
for topic, partitions in topics:
350+
if self.config._monitor_all_broker_highwatermarks:
351+
all_topic_partitions = self.client.get_topic_partitions()
352+
for topic in all_topic_partitions:
362353
if topic in KAFKA_INTERNAL_TOPICS:
363354
self.log.debug("Skipping internal topic %s", topic)
364355
continue
365-
if not self.config._monitor_all_broker_highwatermarks and topic not in topics_with_consumer_offset:
366-
self.log.debug("Skipping non-relevant topic %s", topic)
367-
continue
368356

369-
for partition in partitions:
370-
if (topic, partition) in highwater_offsets:
371-
self.log.debug(
372-
'Highwater offset already collected for topic %s with partition %s', topic, partition
373-
)
357+
for partition in all_topic_partitions[topic]:
358+
topic_partitions_to_check.add((topic, partition))
359+
360+
else:
361+
for _, offsets in consumer_offsets.items():
362+
for topic, partition in offsets:
363+
if topic in KAFKA_INTERNAL_TOPICS:
364+
self.log.debug("Skipping internal topic %s", topic)
374365
continue
375-
if (
376-
self.config._monitor_all_broker_highwatermarks
377-
or (topic, partition) in topic_partition_with_consumer_offset
378-
):
379-
topic_partitions_for_highwater_offsets.add((topic, partition))
380-
self.log.debug('TOPIC: %s', topic)
381-
self.log.debug('PARTITION: %s', partition)
382-
else:
383-
self.log.debug("Skipping non-relevant partition %s of topic %s", partition, topic)
384366

385-
if topic_partitions_for_highwater_offsets:
386-
self.log.debug(
387-
'Querying %s highwater offsets for consumer group %s',
388-
len(topic_partitions_for_highwater_offsets),
389-
consumer_group,
390-
)
391-
for topic, partition, offset in self.client.consumer_offsets_for_times(
392-
partitions=topic_partitions_for_highwater_offsets
393-
):
394-
highwater_offsets[(topic, partition)] = offset
395-
else:
396-
self.log.debug('No new highwater offsets to query for consumer group %s', consumer_group)
367+
topic_partitions_to_check.add((topic, partition))
397368

398-
self.client.close_consumer()
369+
self.client.open_consumer(dd_consumer_group)
370+
cluster_id, _ = self.client.consumer_get_cluster_id_and_list_topics(dd_consumer_group)
371+
self.log.debug(
372+
'Querying %s highwater offsets for consumer group %s',
373+
len(topic_partitions_to_check),
374+
dd_consumer_group,
375+
)
376+
if topic_partitions_to_check:
377+
for topic, partition, offset in self.client.consumer_offsets_for_times(
378+
partitions=topic_partitions_to_check
379+
):
380+
highwater_offsets[(topic, partition)] = offset
399381

382+
self.client.close_consumer()
400383
self.log.debug('Got %s highwater offsets', len(highwater_offsets))
401384
return highwater_offsets, cluster_id
402385

kafka_consumer/tests/test_unit.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def fake_consumer_offsets_for_times(partitions):
3636
def seed_mock_client(cluster_id="cluster_id"):
3737
"""Set some common defaults for the mock client to kafka."""
3838
client = mock.create_autospec(KafkaClient)
39-
client.list_consumer_groups.return_value = ["consumer_group1"]
39+
client.list_consumer_groups.return_value = ["consumer_group1", "datadog-agent"]
4040
client.get_partitions_for_topic.return_value = ['partition1']
4141
client.list_consumer_group_offsets.return_value = [("consumer_group1", [("topic1", "partition1", 2)])]
4242
client.describe_consumer_group.return_value = 'STABLE'
@@ -133,6 +133,20 @@ def test_tls_verify_is_string(tls_verify, expected, check, kafka_instance):
133133

134134
mock_client = mock.MagicMock()
135135
mock_client.get_highwater_offsets.return_value = ({}, "")
136+
mock_client.consumer_get_cluster_id_and_list_topics.return_value = (
137+
"cluster_id",
138+
# topics
139+
[
140+
# Used in unit tets
141+
('topic1', ["partition1"]),
142+
('topic2', ["partition2"]),
143+
# Copied from integration tests
144+
('dc', [0, 1]),
145+
('unconsumed_topic', [0, 1]),
146+
('marvel', [0, 1]),
147+
('__consumer_offsets', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
148+
],
149+
)
136150

137151

138152
@pytest.mark.parametrize(
@@ -468,7 +482,7 @@ def test_load_broker_timestamps_empty(
468482

469483
def test_client_init(kafka_instance, check, dd_run_check):
470484
"""
471-
We only open a connection to a consumer once per consumer group.
485+
We only open a connection to datadog-agent consumer once.
472486
473487
Doing so more often degrades performance, as described in this issue:
474488
https://github.com/DataDog/integrations-core/issues/19564
@@ -478,7 +492,7 @@ def test_client_init(kafka_instance, check, dd_run_check):
478492
check.client = mock_client
479493
dd_run_check(check)
480494

481-
assert check.client.open_consumer.mock_calls == [mock.call("consumer_group1")]
495+
assert check.client.open_consumer.mock_calls == [mock.call("datadog-agent")]
482496

483497

484498
def test_resolve_start_offsets():

0 commit comments

Comments
 (0)