Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3113](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3113))
- `opentelemetry-instrumentation-grpc` Fix error when using gprc versions <= 1.50.0 with unix sockets.
([[#3393](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3393)])
- `opentelemetry-instrumentation-aiokafka` Fix send_and_wait method no headers kwargs error.
([[#3332](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3332)])

## Version 1.31.0/0.52b0 (2025-03-12)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ def _extract_send_headers(args: Tuple[Any], kwargs: Dict[str, Any]):
return _extract_argument("headers", 5, None, args, kwargs)


def _move_headers_to_kwargs(
args: Tuple[Any], kwargs: Dict[str, Any]
) -> Tuple[Tuple[Any], Dict[str, Any]]:
"""Move headers from args to kwargs"""
if len(args) > 5:
kwargs["headers"] = args[5]
return args[:5], kwargs


async def _extract_send_partition(
instance: aiokafka.AIOKafkaProducer,
args: Tuple[Any],
Expand Down Expand Up @@ -260,6 +269,7 @@ async def _traced_send(
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> None:
args, kwargs = _move_headers_to_kwargs(args, kwargs)
headers = _extract_send_headers(args, kwargs)
if headers is None:
headers = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,43 @@ def _compare_spans(
self.assertEqual(
expected_span["attributes"], dict(span.attributes)
)

async def test_send_and_wait(self) -> None:
AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)

producer = await self.producer_factory()
add_message_mock: mock.AsyncMock = (
producer._message_accumulator.add_message
)
add_message_mock.side_effect = [mock.AsyncMock()(), mock.AsyncMock()()]

tracer = self.tracer_provider.get_tracer(__name__)
with tracer.start_as_current_span("test_span") as span:
await producer.send_and_wait("topic_1", b"value_1")

add_message_mock.assert_awaited_with(
TopicPartition(topic="topic_1", partition=1),
None,
b"value_1",
40.0,
timestamp_ms=None,
headers=[("traceparent", mock.ANY)],
)
assert (
add_message_mock.call_args_list[0]
.kwargs["headers"][0][1]
.startswith(
f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
)
)

await producer.send_and_wait("topic_2", b"value_2")
add_message_mock.assert_awaited_with(
TopicPartition(topic="topic_2", partition=1),
None,
b"value_2",
40.0,
timestamp_ms=None,
headers=[("traceparent", mock.ANY)],
)