Skip to content

Commit 91e44b2

Browse files
kmeinerzkmeinerz1ekneg54
authored
Fix kafka input (#811)
* fixed assign callback * added changelog entry * fix revoke callback --------- Co-authored-by: kmeinerz1 <kaithomas.meinerz@bwi.de> Co-authored-by: Jörg Zimmermann <joerg.zimmermann@bwi.de>
1 parent 74cc0df commit 91e44b2

File tree

3 files changed

+14
-12
lines changed

3 files changed

+14
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
- Fixed endless loading in logprep test config
1414
- prevent the auto rule tester from loading rules directly defined inside the config, since they break the auto rule tester and can't have tests anyways
1515
- Fixed typo and broken link in documentation
16+
- Fixed assign_callback error in confluentkafka input
1617

1718
## 16.1.0
1819
### Deprecations

logprep/connector/confluent_kafka/input.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from functools import cached_property, partial
3434
from socket import getfqdn
3535
from types import MappingProxyType
36-
from typing import Optional, Tuple, Union
36+
from typing import Any, Optional, Tuple, Union
3737

3838
import msgspec
3939
from attrs import define, field, validators
@@ -500,7 +500,7 @@ def batch_finished_callback(self) -> None:
500500
except KafkaException as error:
501501
raise InputWarning(self, f"{error}, {self._last_valid_record}") from error
502502

503-
def _assign_callback(self, topic_partitions: list[TopicPartition]) -> None:
503+
def _assign_callback(self, _, topic_partitions: list[TopicPartition]) -> None:
504504
for topic_partition in topic_partitions:
505505
offset, partition = topic_partition.offset, topic_partition.partition
506506
member_id = self._get_memberid()
@@ -517,7 +517,7 @@ def _assign_callback(self, topic_partitions: list[TopicPartition]) -> None:
517517
self.metrics.committed_offsets.add_with_labels(offset, labels)
518518
self.metrics.current_offsets.add_with_labels(offset, labels)
519519

520-
def _revoke_callback(self, topic_partitions: list[TopicPartition]) -> None:
520+
def _revoke_callback(self, _: Any, topic_partitions: list[TopicPartition]) -> None:
521521

522522
for topic_partition in topic_partitions:
523523
self.metrics.number_of_warnings += 1

tests/unit/connector/test_confluent_kafka_input.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,10 @@ def test_commit_callback_counts_commit_success(self):
247247
def test_commit_callback_sets_committed_offsets(self):
248248
mock_add = mock.MagicMock()
249249
self.object.metrics.committed_offsets.add_with_labels = mock_add
250-
topic_partion = mock.MagicMock()
251-
topic_partion.partition = 99
252-
topic_partion.offset = 666
253-
self.object._commit_callback(None, [topic_partion])
250+
topic_partition = mock.MagicMock()
251+
topic_partition.partition = 99
252+
topic_partition.offset = 666
253+
self.object._commit_callback(None, [topic_partition])
254254
call_args = 666, {"description": "topic: test_input_raw - partition: 99"}
255255
mock_add.assert_called_with(*call_args)
256256

@@ -361,13 +361,14 @@ def test_commit_callback_sets_offset_to_0_for_special_offsets(self, _):
361361
}
362362
self.object.metrics.committed_offsets.add_with_labels.assert_called_with(0, expected_labels)
363363

364-
def test_assign_callback_sets_offsets_and_logs_info(self):
364+
@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
365+
def test_assign_callback_sets_offsets_and_logs_info(self, mock_consumer):
365366
self.object.metrics.committed_offsets.add_with_labels = mock.MagicMock()
366367
self.object.metrics.current_offsets.add_with_labels = mock.MagicMock()
367368
mock_partitions = [mock.MagicMock()]
368369
mock_partitions[0].offset = OFFSET_BEGINNING
369370
with mock.patch("logging.Logger.info") as mock_info:
370-
self.object._assign_callback(mock_partitions)
371+
self.object._assign_callback(mock_consumer, mock_partitions)
371372
expected_labels = {
372373
"description": f"topic: test_input_raw - partition: {mock_partitions[0].partition}"
373374
}
@@ -380,23 +381,23 @@ def test_revoke_callback_logs_warning_and_counts(self):
380381
self.object.output_connector = mock.MagicMock()
381382
mock_partitions = [mock.MagicMock()]
382383
with mock.patch("logging.Logger.warning") as mock_warning:
383-
self.object._revoke_callback(mock_partitions)
384+
self.object._revoke_callback(None, mock_partitions)
384385
mock_warning.assert_called()
385386
assert self.object.metrics.number_of_warnings == 1
386387

387388
def test_revoke_callback_calls_batch_finished_callback(self):
388389
self.object.output_connector = mock.MagicMock()
389390
self.object.batch_finished_callback = mock.MagicMock()
390391
mock_partitions = [mock.MagicMock()]
391-
self.object._revoke_callback(mock_partitions)
392+
self.object._revoke_callback(None, mock_partitions)
392393
self.object.batch_finished_callback.assert_called()
393394

394395
def test_revoke_callback_logs_error_if_consumer_closed(self, caplog):
395396
with mock.patch.object(self.object, "_consumer") as mock_consumer:
396397
mock_consumer.memberid = mock.MagicMock()
397398
mock_consumer.memberid.side_effect = RuntimeError("Consumer is closed")
398399
mock_partitions = [mock.MagicMock()]
399-
self.object._revoke_callback(mock_partitions)
400+
self.object._revoke_callback(None, mock_partitions)
400401
assert re.search(r"ERROR.*Consumer is closed", caplog.text)
401402

402403
def test_health_returns_true_if_no_error(self):

0 commit comments

Comments
 (0)