2828)
2929from opentelemetry .trace import SpanKind
3030
31+ SEND_RETURN_VALUE = None
32+
33+
34+ async def original_send (
35+ topic ,
36+ value = None ,
37+ key = None ,
38+ partition = None ,
39+ timestamp_ms = None ,
40+ headers = None ,
41+ ):
42+ return SEND_RETURN_VALUE
43+
3144
3245class TestUtils (IsolatedAsyncioTestCase ):
3346 def setUp (self ) -> None :
@@ -109,6 +122,36 @@ async def test_wrap_send_with_topic_as_kwarg(
109122 extract_bootstrap_servers ,
110123 )
111124
125+ @mock .patch (
126+ "opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
127+ )
128+ @mock .patch (
129+ "opentelemetry.instrumentation.aiokafka.utils._extract_send_partition"
130+ )
131+ @mock .patch (
132+ "opentelemetry.instrumentation.aiokafka.utils._enrich_send_span"
133+ )
134+ @mock .patch ("opentelemetry.trace.set_span_in_context" )
135+ @mock .patch ("opentelemetry.propagate.inject" )
136+ async def test_wrap_send_with_headers_as_args (
137+ self ,
138+ inject : mock .MagicMock ,
139+ set_span_in_context : mock .MagicMock ,
140+ enrich_span : mock .MagicMock ,
141+ extract_send_partition : mock .AsyncMock ,
142+ extract_bootstrap_servers : mock .MagicMock ,
143+ ) -> None :
144+ # like send_and_wait
145+ self .args = [self .topic_name , None , None , None , None , None ]
146+ self .kwargs = {}
147+ await self .wrap_send_helper (
148+ inject ,
149+ set_span_in_context ,
150+ enrich_span ,
151+ extract_send_partition ,
152+ extract_bootstrap_servers ,
153+ )
154+
112155 async def wrap_send_helper (
113156 self ,
114157 inject : mock .MagicMock ,
@@ -120,6 +163,8 @@ async def wrap_send_helper(
120163 tracer = mock .MagicMock ()
121164 produce_hook = mock .AsyncMock ()
122165 original_send_callback = mock .AsyncMock ()
166+ original_send_callback .side_effect = original_send
167+ original_send_callback .return_value = SEND_RETURN_VALUE
123168 kafka_producer = mock .MagicMock ()
124169 expected_span_name = _get_span_name ("send" , self .topic_name )
125170
@@ -131,9 +176,7 @@ async def wrap_send_helper(
131176 extract_bootstrap_servers .assert_called_once_with (
132177 kafka_producer .client
133178 )
134- extract_send_partition .assert_awaited_once_with (
135- kafka_producer , self .args , self .kwargs
136- )
179+ extract_send_partition .assert_awaited_once ()
137180 tracer .start_as_current_span .assert_called_once_with (
138181 expected_span_name , kind = SpanKind .PRODUCER
139182 )
@@ -154,11 +197,9 @@ async def wrap_send_helper(
154197 self .headers , context = context , setter = _aiokafka_setter
155198 )
156199
157- produce_hook .assert_awaited_once_with ( span , self . args , self . kwargs )
200+ produce_hook .assert_awaited_once ( )
158201
159- original_send_callback .assert_awaited_once_with (
160- * self .args , ** self .kwargs
161- )
202+ original_send_callback .assert_awaited_once ()
162203 self .assertEqual (retval , original_send_callback .return_value )
163204
164205 @mock .patch ("opentelemetry.propagate.extract" )
0 commit comments