diff --git a/CHANGELOG.md b/CHANGELOG.md index 390cdcd897..1f68c9dd6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3113](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3113)) - `opentelemetry-instrumentation-grpc` Fix error when using gprc versions <= 1.50.0 with unix sockets. ([[#3393](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3393)]) +- `opentelemetry-instrumentation-aiokafka` Fix send_and_wait method no headers kwargs error. + ([[#3332](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3332)]) ## Version 1.31.0/0.52b0 (2025-03-12) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index cae0d97717..12eb5df636 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -68,6 +68,15 @@ def _extract_send_headers(args: Tuple[Any], kwargs: Dict[str, Any]): return _extract_argument("headers", 5, None, args, kwargs) +def _move_headers_to_kwargs( + args: Tuple[Any], kwargs: Dict[str, Any] +) -> Tuple[Tuple[Any], Dict[str, Any]]: + """Move headers from args to kwargs""" + if len(args) > 5: + kwargs["headers"] = args[5] + return args[:5], kwargs + + async def _extract_send_partition( instance: aiokafka.AIOKafkaProducer, args: Tuple[Any], @@ -260,6 +269,7 @@ async def _traced_send( args: Tuple[Any], kwargs: Dict[str, Any], ) -> None: + args, kwargs = _move_headers_to_kwargs(args, kwargs) headers = _extract_send_headers(args, kwargs) if headers is None: headers = [] diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py index 8211566239..259747f0d7 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py @@ -318,3 +318,43 @@ def _compare_spans( self.assertEqual( expected_span["attributes"], dict(span.attributes) ) + + async def test_send_and_wait(self) -> None: + AIOKafkaInstrumentor().uninstrument() + AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider) + + producer = await self.producer_factory() + add_message_mock: mock.AsyncMock = ( + producer._message_accumulator.add_message + ) + add_message_mock.side_effect = [mock.AsyncMock()(), mock.AsyncMock()()] + + tracer = self.tracer_provider.get_tracer(__name__) + with tracer.start_as_current_span("test_span") as span: + await producer.send_and_wait("topic_1", b"value_1") + + add_message_mock.assert_awaited_with( + TopicPartition(topic="topic_1", partition=1), + None, + b"value_1", + 40.0, + timestamp_ms=None, + headers=[("traceparent", mock.ANY)], + ) + assert ( + add_message_mock.call_args_list[0] + .kwargs["headers"][0][1] + .startswith( + f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode() + ) + ) + + await producer.send_and_wait("topic_2", b"value_2") + add_message_mock.assert_awaited_with( + TopicPartition(topic="topic_2", partition=1), + None, + b"value_2", + 40.0, + timestamp_ms=None, + headers=[("traceparent", mock.ANY)], + )