1515
1616from unittest import IsolatedAsyncioTestCase , mock
1717
18+ import aiokafka
19+
1820from opentelemetry .instrumentation .aiokafka .utils import (
1921 AIOKafkaContextGetter ,
2022 AIOKafkaContextSetter ,
2325 _create_consumer_span ,
2426 _extract_send_partition ,
2527 _get_span_name ,
28+ _wrap_getmany ,
2629 _wrap_getone ,
2730 _wrap_send ,
2831)
@@ -174,7 +177,7 @@ async def wrap_send_helper(
174177 @mock .patch (
175178 "opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group"
176179 )
177- async def test_wrap_next (
180+ async def test_wrap_getone (
178181 self ,
179182 extract_consumer_group : mock .MagicMock ,
180183 extract_client_id : mock .MagicMock ,
@@ -184,12 +187,12 @@ async def test_wrap_next(
184187 ) -> None :
185188 tracer = mock .MagicMock ()
186189 consume_hook = mock .AsyncMock ()
187- original_next_callback = mock .AsyncMock ()
190+ original_getone_callback = mock .AsyncMock ()
188191 kafka_consumer = mock .MagicMock ()
189192
190- wrapped_next = _wrap_getone (tracer , consume_hook )
191- record = await wrapped_next (
192- original_next_callback , kafka_consumer , self .args , self .kwargs
193+ wrapped_getone = _wrap_getone (tracer , consume_hook )
194+ record = await wrapped_getone (
195+ original_getone_callback , kafka_consumer , self .args , self .kwargs
193196 )
194197
195198 extract_bootstrap_servers .assert_called_once_with (
@@ -203,10 +206,10 @@ async def test_wrap_next(
203206 extract_consumer_group .assert_called_once_with (kafka_consumer )
204207 consumer_group = extract_consumer_group .return_value
205208
206- original_next_callback .assert_awaited_once_with (
209+ original_getone_callback .assert_awaited_once_with (
207210 * self .args , ** self .kwargs
208211 )
209- self .assertEqual (record , original_next_callback .return_value )
212+ self .assertEqual (record , original_getone_callback .return_value )
210213
211214 extract .assert_called_once_with (
212215 record .headers , getter = _aiokafka_getter
@@ -225,6 +228,85 @@ async def test_wrap_next(
225228 self .kwargs ,
226229 )
227230
231+ @mock .patch ("opentelemetry.propagate.extract" )
232+ @mock .patch (
233+ "opentelemetry.instrumentation.aiokafka.utils._create_consumer_span"
234+ )
235+ @mock .patch (
236+ "opentelemetry.instrumentation.aiokafka.utils._enrich_getmany_topic_span"
237+ )
238+ @mock .patch (
239+ "opentelemetry.instrumentation.aiokafka.utils._enrich_getmany_poll_span"
240+ )
241+ @mock .patch (
242+ "opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
243+ )
244+ @mock .patch (
245+ "opentelemetry.instrumentation.aiokafka.utils._extract_client_id"
246+ )
247+ @mock .patch (
248+ "opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group"
249+ )
250+ async def test_wrap_getmany (
251+ self ,
252+ extract_consumer_group : mock .MagicMock ,
253+ extract_client_id : mock .MagicMock ,
254+ extract_bootstrap_servers : mock .MagicMock ,
255+ enrich_getmany_poll_span : mock .MagicMock ,
256+ enrich_getmany_topic_span : mock .MagicMock ,
257+ _create_consumer_span : mock .MagicMock ,
258+ extract : mock .MagicMock ,
259+ ) -> None :
260+ tracer = mock .MagicMock ()
261+ consume_hook = mock .AsyncMock ()
262+ record_mock = mock .MagicMock ()
263+ original_getmany_callback = mock .AsyncMock (
264+ return_value = {
265+ aiokafka .TopicPartition (topic = "topic_1" , partition = 0 ): [
266+ record_mock
267+ ]
268+ }
269+ )
270+ kafka_consumer = mock .MagicMock ()
271+
272+ wrapped_getmany = _wrap_getmany (tracer , consume_hook )
273+ records = await wrapped_getmany (
274+ original_getmany_callback , kafka_consumer , self .args , self .kwargs
275+ )
276+
277+ extract_bootstrap_servers .assert_called_once_with (
278+ kafka_consumer ._client
279+ )
280+ bootstrap_servers = extract_bootstrap_servers .return_value
281+
282+ extract_client_id .assert_called_once_with (kafka_consumer ._client )
283+ client_id = extract_client_id .return_value
284+
285+ extract_consumer_group .assert_called_once_with (kafka_consumer )
286+ consumer_group = extract_consumer_group .return_value
287+
288+ original_getmany_callback .assert_awaited_once_with (
289+ * self .args , ** self .kwargs
290+ )
291+ self .assertEqual (records , original_getmany_callback .return_value )
292+
293+ extract .assert_called_once_with (
294+ record_mock .headers , getter = _aiokafka_getter
295+ )
296+ context = extract .return_value
297+
298+ _create_consumer_span .assert_called_once_with (
299+ tracer ,
300+ consume_hook ,
301+ record_mock ,
302+ context ,
303+ bootstrap_servers ,
304+ client_id ,
305+ consumer_group ,
306+ self .args ,
307+ self .kwargs ,
308+ )
309+
228310 @mock .patch ("opentelemetry.trace.set_span_in_context" )
229311 @mock .patch ("opentelemetry.context.attach" )
230312 @mock .patch (
0 commit comments