Skip to content

Commit e833d1a

Browse files
kmeinerzkmeinerz1ekneg54
authored
fix revoke callback error handling (#797)
* fix revoke callback error handling * Update CHANGELOG.md * refactored member_id Co-authored-by: kmeinerz1 <kaithomas.meinerz@bwi.de> Co-authored-by: Jörg Zimmermann <101292599+ekneg54@users.noreply.github.com> Co-authored-by: Jörg Zimmermann <joerg.zimmermann@bwi.de>
1 parent d7e495f commit e833d1a

File tree

5 files changed

+60
-27
lines changed

5 files changed

+60
-27
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
### Improvements
1010
### Bugfix
1111

12+
- Fixed logging error in _revoke_callback() by adding error handling
13+
1214
## 16.1.0
1315
### Deprecations
1416

logprep/connector/confluent_kafka/input.py

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,16 @@ class Metrics(Input.Metrics):
130130

131131
librdkafka_replyq: GaugeMetric = field(
132132
factory=lambda: GaugeMetric(
133-
description="Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll()",
133+
description=(
134+
"Number of ops (callbacks, events, etc) waiting in "
135+
"queue for application to serve with rd_kafka_poll()"
136+
),
134137
name="confluent_kafka_input_librdkafka_replyq",
135138
)
136139
)
137-
"""Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll()"""
140+
"""Number of ops (callbacks, events, etc) waiting in queue for application
141+
to serve with rd_kafka_poll()
142+
"""
138143
librdkafka_tx: GaugeMetric = field(
139144
factory=lambda: GaugeMetric(
140145
description="Total number of requests sent to Kafka brokers",
@@ -166,14 +171,22 @@ class Metrics(Input.Metrics):
166171
"""Total number of bytes received from Kafka brokers"""
167172
librdkafka_rxmsgs: GaugeMetric = field(
168173
factory=lambda: GaugeMetric(
169-
description="Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers.",
174+
description=(
175+
"Total number of messages consumed, not including ignored messages"
176+
"(due to offset, etc), from Kafka brokers."
177+
),
170178
name="confluent_kafka_input_librdkafka_rxmsgs",
171179
)
172180
)
173-
"""Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers."""
181+
"""Total number of messages consumed, not including ignored messages
182+
(due to offset, etc), from Kafka brokers.
183+
"""
174184
librdkafka_rxmsg_bytes: GaugeMetric = field(
175185
factory=lambda: GaugeMetric(
176-
description="Total number of message bytes (including framing) received from Kafka brokers",
186+
description=(
187+
"Total number of message bytes (including framing)"
188+
"received from Kafka brokers"
189+
),
177190
name="confluent_kafka_input_librdkafka_rxmsg_bytes",
178191
)
179192
)
@@ -195,11 +208,11 @@ class Metrics(Input.Metrics):
195208
"""Time elapsed since last rebalance (assign or revoke) (milliseconds)."""
196209
librdkafka_cgrp_rebalance_cnt: GaugeMetric = field(
197210
factory=lambda: GaugeMetric(
198-
description="Total number of rebalances (assign or revoke).",
211+
description="Total number of rebalance (assign or revoke).",
199212
name="confluent_kafka_input_librdkafka_cgrp_rebalance_cnt",
200213
)
201214
)
202-
"""Total number of rebalances (assign or revoke)."""
215+
"""Total number of rebalance (assign or revoke)."""
203216
librdkafka_cgrp_assignment_size: GaugeMetric = field(
204217
factory=lambda: GaugeMetric(
205218
description="Current assignment's partition count.",
@@ -316,7 +329,7 @@ def _error_callback(self, error: KafkaException) -> None:
316329
the error that occurred
317330
"""
318331
self.metrics.number_of_errors += 1
319-
logger.error(f"{self.describe()}: {error}")
332+
logger.error("%s: %s", self.describe(), error)
320333

321334
def _stats_callback(self, stats_raw: str) -> None:
322335
"""Callback for statistics data. This callback is triggered by poll()
@@ -487,12 +500,13 @@ def batch_finished_callback(self) -> None:
487500
except KafkaException as error:
488501
raise InputWarning(self, f"{error}, {self._last_valid_record}") from error
489502

490-
def _assign_callback(self, consumer, topic_partitions):
503+
def _assign_callback(self, topic_partitions: list[TopicPartition]) -> None:
491504
for topic_partition in topic_partitions:
492505
offset, partition = topic_partition.offset, topic_partition.partition
506+
member_id = self._get_memberid()
493507
logger.info(
494508
"%s was assigned to topic: %s | partition %s",
495-
consumer.memberid(),
509+
member_id,
496510
topic_partition.topic,
497511
partition,
498512
)
@@ -503,27 +517,38 @@ def _assign_callback(self, consumer, topic_partitions):
503517
self.metrics.committed_offsets.add_with_labels(offset, labels)
504518
self.metrics.current_offsets.add_with_labels(offset, labels)
505519

506-
def _revoke_callback(self, consumer, topic_partitions):
520+
def _revoke_callback(self, topic_partitions: list[TopicPartition]) -> None:
521+
507522
for topic_partition in topic_partitions:
508523
self.metrics.number_of_warnings += 1
524+
member_id = self._get_memberid()
509525
logger.warning(
510526
"%s to be revoked from topic: %s | partition %s",
511-
consumer.memberid(),
527+
member_id,
512528
topic_partition.topic,
513529
topic_partition.partition,
514530
)
515531
self.batch_finished_callback()
516532

517-
def _lost_callback(self, consumer, topic_partitions):
533+
def _lost_callback(self, topic_partitions: list[TopicPartition]) -> None:
518534
for topic_partition in topic_partitions:
519535
self.metrics.number_of_warnings += 1
536+
member_id = self._get_memberid()
520537
logger.warning(
521538
"%s has lost topic: %s | partition %s - try to reassign",
522-
consumer.memberid(),
539+
member_id,
523540
topic_partition.topic,
524541
topic_partition.partition,
525542
)
526543

544+
def _get_memberid(self) -> str | None:
545+
member_id = None
546+
try:
547+
member_id = self._consumer.memberid()
548+
except RuntimeError as error:
549+
logger.error("Failed to retrieve member ID: %s", error)
550+
return member_id
551+
527552
def shut_down(self) -> None:
528553
"""Close consumer, which also commits kafka offsets."""
529554
self._consumer.close()

logprep/connector/confluent_kafka/output.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ def _error_callback(self, error: KafkaException):
232232
the error that occurred
233233
"""
234234
self.metrics.number_of_errors += 1
235-
logger.error(f"{self.describe()}: {error}") # pylint: disable=logging-fstring-interpolation
235+
logger.error("%s: %s", self.describe(), error)
236236

237237
def _stats_callback(self, stats_raw: str) -> None:
238238
"""Callback for statistics data. This callback is triggered by poll()

tests/unit/connector/test_confluent_kafka_common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def test_error_callback_logs_error(self):
3434
test_error = Exception("test error")
3535
self.object._error_callback(test_error)
3636
mock_error.assert_called()
37-
mock_error.assert_called_with(f"{self.object.describe()}: {test_error}")
37+
mock_error.assert_called_with("%s: %s", self.object.describe(), test_error)
3838
assert self.object.metrics.number_of_errors == 1
3939

4040
def test_stats_callback_sets_metric_objetc_attributes(self):

tests/unit/connector/test_confluent_kafka_input.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
# pylint: disable=wrong-import-order
55
# pylint: disable=attribute-defined-outside-init
66
import os
7+
import re
78
import socket
89
from copy import deepcopy
910
from unittest import mock
1011

1112
import pytest
12-
from confluent_kafka import OFFSET_BEGINNING, KafkaError, KafkaException
13+
from confluent_kafka import OFFSET_BEGINNING, KafkaError, KafkaException # type: ignore
1314

1415
from logprep.abc.input import (
1516
CriticalInputError,
@@ -345,7 +346,7 @@ def test_lost_callback_counts_warnings_and_logs(self, mock_consumer):
345346
self.object.metrics.number_of_warnings = 0
346347
mock_partitions = [mock.MagicMock()]
347348
with mock.patch("logging.Logger.warning") as mock_warning:
348-
self.object._lost_callback(mock_consumer, mock_partitions)
349+
self.object._lost_callback(mock_partitions)
349350
mock_warning.assert_called()
350351
assert self.object.metrics.number_of_warnings == 1
351352

@@ -360,39 +361,44 @@ def test_commit_callback_sets_offset_to_0_for_special_offsets(self, _):
360361
}
361362
self.object.metrics.committed_offsets.add_with_labels.assert_called_with(0, expected_labels)
362363

363-
@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
364-
def test_assign_callback_sets_offsets_and_logs_info(self, mock_consumer):
364+
def test_assign_callback_sets_offsets_and_logs_info(self):
365365
self.object.metrics.committed_offsets.add_with_labels = mock.MagicMock()
366366
self.object.metrics.current_offsets.add_with_labels = mock.MagicMock()
367367
mock_partitions = [mock.MagicMock()]
368368
mock_partitions[0].offset = OFFSET_BEGINNING
369369
with mock.patch("logging.Logger.info") as mock_info:
370-
self.object._assign_callback(mock_consumer, mock_partitions)
370+
self.object._assign_callback(mock_partitions)
371371
expected_labels = {
372372
"description": f"topic: test_input_raw - partition: {mock_partitions[0].partition}"
373373
}
374374
mock_info.assert_called()
375375
self.object.metrics.committed_offsets.add_with_labels.assert_called_with(0, expected_labels)
376376
self.object.metrics.current_offsets.add_with_labels.assert_called_with(0, expected_labels)
377377

378-
@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
379-
def test_revoke_callback_logs_warning_and_counts(self, mock_consumer):
378+
def test_revoke_callback_logs_warning_and_counts(self):
380379
self.object.metrics.number_of_warnings = 0
381380
self.object.output_connector = mock.MagicMock()
382381
mock_partitions = [mock.MagicMock()]
383382
with mock.patch("logging.Logger.warning") as mock_warning:
384-
self.object._revoke_callback(mock_consumer, mock_partitions)
383+
self.object._revoke_callback(mock_partitions)
385384
mock_warning.assert_called()
386385
assert self.object.metrics.number_of_warnings == 1
387386

388-
@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
389-
def test_revoke_callback_calls_batch_finished_callback(self, mock_consumer):
387+
def test_revoke_callback_calls_batch_finished_callback(self):
390388
self.object.output_connector = mock.MagicMock()
391389
self.object.batch_finished_callback = mock.MagicMock()
392390
mock_partitions = [mock.MagicMock()]
393-
self.object._revoke_callback(mock_consumer, mock_partitions)
391+
self.object._revoke_callback(mock_partitions)
394392
self.object.batch_finished_callback.assert_called()
395393

394+
def test_revoke_callback_logs_error_if_consumer_closed(self, caplog):
395+
with mock.patch.object(self.object, "_consumer") as mock_consumer:
396+
mock_consumer.memberid = mock.MagicMock()
397+
mock_consumer.memberid.side_effect = RuntimeError("Consumer is closed")
398+
mock_partitions = [mock.MagicMock()]
399+
self.object._revoke_callback(mock_partitions)
400+
assert re.search(r"ERROR.*Consumer is closed", caplog.text)
401+
396402
def test_health_returns_true_if_no_error(self):
397403
self.object._admin = mock.MagicMock()
398404
metadata = mock.MagicMock()

0 commit comments

Comments
 (0)