Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `opentelemetry-instrumentation-aiokafka` Wrap `AIOKafkaConsumer.getone()` instead of `AIOKafkaConsumer.__anext__`
([#2874](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2874))
- `opentelemetry-instrumentation-confluent-kafka` Fix to allow `topic` to be extracted from `kwargs` in `produce()`
([#2901])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2901)

## Version 1.27.0/0.48b0 (2024-08-28)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ def wrap_produce(func, instance, tracer, args, kwargs):
headers = []
kwargs["headers"] = headers

topic = KafkaPropertiesExtractor.extract_produce_topic(args)
topic = KafkaPropertiesExtractor.extract_produce_topic(
args, kwargs
)
_enrich_span(
span,
topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ def _extract_argument(key, position, default_value, args, kwargs):
return kwargs.get(key, default_value)

@staticmethod
def extract_produce_topic(args):
def extract_produce_topic(args, kwargs):
"""extract topic from `produce` method arguments in Producer class"""
if len(args) > 0:
return args[0]
return "unknown"
return kwargs.get("topic") or (args[0] if args else "unknown")

@staticmethod
def extract_produce_headers(args, kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,15 @@ def _compare_spans(self, spans, expected_spans):
expected_attribute_value, span.attributes[attribute_key]
)

def _assert_topic(self, expected_topic: str) -> None:
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
span = span_list[0]
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
expected_topic,
)

def test_producer_poll(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
message_queue = []
Expand All @@ -299,6 +308,7 @@ def test_producer_poll(self) -> None:
producer.produce(topic="topic-1", key="key-1", value="value-1")
msg = producer.poll()
self.assertIsNotNone(msg)
self._assert_topic("topic-1")

def test_producer_flush(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
Expand All @@ -315,3 +325,4 @@ def test_producer_flush(self) -> None:
producer.produce(topic="topic-1", key="key-1", value="value-1")
msg = producer.flush()
self.assertIsNotNone(msg)
self._assert_topic("topic-1")