44# pylint: disable=wrong-import-order
55# pylint: disable=attribute-defined-outside-init
66import builtins
7+ import json
78import os
89import re
910import socket
1011from copy import deepcopy
12+ from pathlib import Path
1113from unittest import mock
1214
1315import pytest
2224from logprep .connector .confluent_kafka .input import logger
2325from logprep .factory import Factory
2426from logprep .factory_error import InvalidConfigurationError
27+ from logprep .util .helper import get_dotted_field_value
2528from tests .unit .connector .base import BaseInputTestCase
26- from tests .unit .connector .test_confluent_kafka_common import (
27- CommonConfluentKafkaTestCase ,
28- )
2929
3030KAFKA_STATS_JSON_PATH = "tests/testdata/kafka_stats_return_value.json"
3131
3232
33- class TestConfluentKafkaInput (BaseInputTestCase , CommonConfluentKafkaTestCase ):
33+ @mock .patch ("confluent_kafka.Consumer" , new = mock .MagicMock ())
34+ @mock .patch ("confluent_kafka.Producer" , new = mock .MagicMock ())
35+ class TestConfluentKafkaInput (BaseInputTestCase ):
3436 CONFIG = {
3537 "type" : "confluentkafka_input" ,
3638 "kafka_config" : {"bootstrap.servers" : "testserver:9092" , "group.id" : "testgroup" },
@@ -61,14 +63,64 @@ class TestConfluentKafkaInput(BaseInputTestCase, CommonConfluentKafkaTestCase):
6163 "logprep_number_of_errors" ,
6264 ]
6365
66+ def setup_method (self ):
67+ super ().setup_method ()
68+ self .object ._consumer = mock .MagicMock ()
69+ self .object ._admin = mock .MagicMock ()
70+
71+ def test_client_id_is_set_to_hostname (self ):
72+ self .object .setup ()
73+ assert self .object ._kafka_config .get ("client.id" ) == socket .getfqdn ()
74+
75+ def test_create_fails_for_unknown_option (self ):
76+ kafka_config = deepcopy (self .CONFIG )
77+ kafka_config .update ({"unknown_option" : "bad value" })
78+ with pytest .raises (TypeError , match = r"unexpected keyword argument" ):
79+ _ = Factory .create ({"test connector" : kafka_config })
80+
81+ def test_error_callback_logs_error (self ):
82+ self .object .metrics .number_of_errors = 0
83+ with mock .patch ("logging.Logger.error" ) as mock_error :
84+ test_error = Exception ("test error" )
85+ self .object ._error_callback (test_error )
86+ mock_error .assert_called ()
87+ mock_error .assert_called_with ("%s: %s" , self .object .describe (), test_error )
88+ assert self .object .metrics .number_of_errors == 1
89+
90+ def test_stats_callback_sets_metric_objetc_attributes (self ):
91+ librdkafka_metrics = tuple (
92+ filter (lambda x : x .startswith ("librdkafka" ), self .expected_metrics )
93+ )
94+ for metric in librdkafka_metrics :
95+ setattr (self .object .metrics , metric , 0 )
96+
97+ json_string = Path (KAFKA_STATS_JSON_PATH ).read_text ("utf8" )
98+ self .object ._stats_callback (json_string )
99+ stats_dict = json .loads (json_string )
100+ for metric in librdkafka_metrics :
101+ metric_name = metric .replace ("librdkafka_" , "" ).replace ("cgrp_" , "cgrp." )
102+ metric_value = get_dotted_field_value (stats_dict , metric_name )
103+ assert getattr (self .object .metrics , metric ) == metric_value , metric
104+
105+ def test_stats_set_age_metric_explicitly (self ):
106+ self .object .metrics .librdkafka_age = 0
107+ json_string = Path (KAFKA_STATS_JSON_PATH ).read_text ("utf8" )
108+ self .object ._stats_callback (json_string )
109+ assert self .object .metrics .librdkafka_age == 1337
110+
111+ def test_kafka_config_is_immutable (self ):
112+ self .object .setup ()
113+ with pytest .raises (TypeError ):
114+ self .object ._config .kafka_config ["client.id" ] = "test"
115+
64116 def test_get_next_returns_none_if_no_records (self ):
65- with mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer :
117+ with mock .patch .object (self .object , "_consumer" ) as mock_consumer :
66118 mock_consumer .poll .return_value = None
67119 event = self .object .get_next (1 )
68120 assert event is None
69121
70122 def test_get_next_raises_critical_input_exception_for_invalid_confluent_kafka_record (self ):
71- with mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer :
123+ with mock .patch .object (self .object , "_consumer" ) as mock_consumer :
72124 mock_record = mock .MagicMock ()
73125 mock_record .error = mock .MagicMock (
74126 return_value = KafkaError (
@@ -96,12 +148,12 @@ def test_get_next_raises_critical_input_exception_for_invalid_confluent_kafka_re
96148 def test_shut_down_calls_consumer_close (
97149 self ,
98150 ):
99- with mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer :
151+ with mock .patch .object (self .object , "_consumer" ) as mock_consumer :
100152
101153 # When the first "with" block ends, it tries to delete the mocked _consumer from self.object.
102154 # But self.object.shut_down() removes all attributes from self.object, so this causes AttributeError.
103155 # To prevent that, we mock builtins.hasattr so shut_down doesn’t try to delete attributes in this test.
104- with mock .patch .object (builtins , "hasattr" , autospec = True , return_value = False ):
156+ with mock .patch .object (builtins , "hasattr" , return_value = False ):
105157 self .object .shut_down ()
106158
107159 mock_consumer .close .assert_called ()
@@ -110,7 +162,7 @@ def test_batch_finished_callback_calls_store_offsets(self):
110162 message = "test message"
111163
112164 with (
113- mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer ,
165+ mock .patch .object (self .object , "_consumer" ) as mock_consumer ,
114166 mock .patch .object (self .object , "_last_valid_record" , new = message ),
115167 ):
116168 self .object .batch_finished_callback ()
@@ -120,15 +172,15 @@ def test_batch_finished_callback_calls_store_offsets(self):
120172
121173 def test_batch_finished_callback_does_not_call_store_offsets (self ):
122174 with (
123- mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer ,
175+ mock .patch .object (self .object , "_consumer" ) as mock_consumer ,
124176 mock .patch .object (self .object , "_last_valid_record" , new = None ),
125177 ):
126178 self .object .batch_finished_callback ()
127179 mock_consumer .store_offsets .assert_not_called ()
128180
129181 def test_batch_finished_callback_raises_input_warning_on_kafka_exception (self ):
130182 with (
131- mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer ,
183+ mock .patch .object (self .object , "_consumer" ) as mock_consumer ,
132184 mock .patch .object (self .object , "_last_valid_record" , new = {0 : "message" }),
133185 ):
134186 return_sequence = [KafkaException ("test error" ), None ]
@@ -142,7 +194,7 @@ def raise_generator(return_sequence):
142194 self .object .batch_finished_callback ()
143195
144196 def test_get_next_raises_critical_input_error_if_not_a_dict (self ):
145- with mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer :
197+ with mock .patch .object (self .object , "_consumer" ) as mock_consumer :
146198 mock_record = mock .MagicMock ()
147199 mock_record .error .return_value = None
148200 mock_record .value .return_value = b'[{"element":"in list"}]'
@@ -153,7 +205,7 @@ def test_get_next_raises_critical_input_error_if_not_a_dict(self):
153205 self .object .get_next (1 )
154206
155207 def test_get_next_raises_critical_input_error_if_invalid_json (self ):
156- with mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer :
208+ with mock .patch .object (self .object , "_consumer" ) as mock_consumer :
157209 mock_record = mock .MagicMock ()
158210 mock_record .error .return_value = None
159211 mock_record .value .return_value = b"I'm not valid json"
@@ -164,7 +216,7 @@ def test_get_next_raises_critical_input_error_if_invalid_json(self):
164216 self .object .get_next (1 )
165217
166218 def test_get_event_returns_event_and_raw_event (self ):
167- with mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer :
219+ with mock .patch .object (self .object , "_consumer" ) as mock_consumer :
168220 mock_record = mock .MagicMock ()
169221 mock_record .error .return_value = None
170222 mock_record .value .return_value = b'{"element":"in list"}'
@@ -262,8 +314,8 @@ def test_commit_callback_sets_committed_offsets(self):
262314 call_args = 666 , {"description" : "topic: test_input_raw - partition: 99" }
263315 mock_add .assert_called_with (* call_args )
264316
265- @ mock . patch ( "logprep.connector.confluent_kafka.input.Consumer" )
266- def test_default_config_is_injected ( self , mock_consumer ):
317+ def test_default_config_is_injected ( self ):
318+ kafka_input = Factory . create ({ "kafka_input" : self . CONFIG })
267319 injected_config = {
268320 "enable.auto.offset.store" : "false" ,
269321 "enable.auto.commit" : "true" ,
@@ -275,12 +327,14 @@ def test_default_config_is_injected(self, mock_consumer):
275327 "group.id" : "testgroup" ,
276328 "group.instance.id" : f"{ socket .getfqdn ().strip ('.' )} -PipelineNone-pid{ os .getpid ()} " ,
277329 "logger" : logger ,
278- "on_commit" : self . object ._commit_callback ,
279- "stats_cb" : self . object ._stats_callback ,
280- "error_cb" : self . object ._error_callback ,
330+ "on_commit" : kafka_input ._commit_callback ,
331+ "stats_cb" : kafka_input ._stats_callback ,
332+ "error_cb" : kafka_input ._error_callback ,
281333 }
282- _ = self .object ._consumer
283- mock_consumer .assert_called_with (injected_config )
334+
335+ with mock .patch ("logprep.connector.confluent_kafka.input.Consumer" ) as mock_consumer :
336+ _ = kafka_input ._consumer
337+ mock_consumer .assert_called_with (injected_config )
284338
285339 @mock .patch ("logprep.connector.confluent_kafka.input.Consumer" )
286340 def test_auto_offset_store_and_auto_commit_are_managed_by_connector (self , mock_consumer ):
@@ -412,30 +466,30 @@ def test_revoke_callback_logs_error_if_consumer_closed(self, caplog):
412466 assert re .search (r"ERROR.*Consumer is closed" , caplog .text )
413467
414468 def test_health_returns_true_if_no_error (self ):
415- with mock .patch .object (self .object , "_admin" , autospec = True ) as mock_admin :
469+ with mock .patch .object (self .object , "_admin" ) as mock_admin :
416470 metadata = mock .MagicMock ()
417471 metadata .topics = [self .object ._config .topic ]
418472 mock_admin .list_topics .return_value = metadata
419473
420474 assert self .object .health ()
421475
422476 def test_health_returns_false_if_topic_not_present (self ):
423- with mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer :
477+ with mock .patch .object (self .object , "_consumer" ) as mock_consumer :
424478 metadata = mock .MagicMock ()
425479 metadata .topics = ["not_the_topic" ]
426480 mock_consumer .list_topics .return_value = metadata
427481
428482 assert not self .object .health ()
429483
430484 def test_health_returns_false_on_kafka_exception (self ):
431- with mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer :
485+ with mock .patch .object (self .object , "_consumer" ) as mock_consumer :
432486 mock_consumer .list_topics .side_effect = KafkaException ("test error" )
433487
434488 assert not self .object .health ()
435489
436490 def test_health_logs_error_on_kafka_exception (self ):
437491 with (
438- mock .patch .object (self .object , "_consumer" , autospec = True ) as mock_consumer ,
492+ mock .patch .object (self .object , "_consumer" ) as mock_consumer ,
439493 mock .patch ("logging.Logger.error" ) as mock_error ,
440494 ):
441495 mock_consumer .list_topics .side_effect = KafkaException ("test error" )
@@ -445,13 +499,14 @@ def test_health_logs_error_on_kafka_exception(self):
445499 mock_error .assert_called ()
446500
447501 def test_health_counts_metrics_on_kafka_exception (self ):
448- self .object .metrics .number_of_errors = 0
502+ kafka_input = Factory .create ({"kafka_input" : self .CONFIG })
503+ kafka_input .metrics .number_of_errors = 0
449504
450- with mock .patch .object (self . object , "_consumer" , autospec = True ) as mock_consumer :
505+ with mock .patch .object (kafka_input , "_consumer" ) as mock_consumer :
451506 mock_consumer .list_topics .side_effect = KafkaException ("test error" )
452507
453- assert not self . object .health ()
454- assert self . object .metrics .number_of_errors == 1
508+ assert not kafka_input .health ()
509+ assert kafka_input .metrics .number_of_errors == 1
455510
456511 @pytest .mark .parametrize (
457512 ["kafka_config_update" , "expected_admin_client_config" ],
0 commit comments