Skip to content

Commit 6c2a3da

Browse files
committed
Create span only after record is received while polling
1 parent ffbbb4d commit 6c2a3da

File tree

1 file changed

+16
-29
lines changed
  • instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka

1 file changed

+16
-29
lines changed

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -325,39 +325,26 @@ def wrap_produce(func, instance, tracer, args, kwargs):
325325

326326
@staticmethod
327327
def wrap_poll(func, instance, tracer, args, kwargs):
328-
if instance._current_consume_span:
329-
context.detach(instance._current_context_token)
330-
instance._current_context_token = None
331-
instance._current_consume_span.end()
332-
instance._current_consume_span = None
333-
334-
with tracer.start_as_current_span(
335-
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
336-
):
337-
record = func(*args, **kwargs)
338-
if record:
339-
links = []
340-
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
341-
if ctx:
342-
for item in ctx.values():
343-
if hasattr(item, "get_span_context"):
344-
links.append(Link(context=item.get_span_context()))
345-
346-
instance._current_consume_span = tracer.start_span(
347-
name=f"{record.topic()} process",
348-
links=links,
349-
kind=SpanKind.CONSUMER,
350-
)
351-
328+
record = func(*args, **kwargs)
329+
if record:
330+
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
331+
span_name = _get_span_name("recv", record.topic())
332+
with tracer.start_as_current_span(
333+
span_name,
334+
end_on_exit=True,
335+
kind=trace.SpanKind.CONSUMER,
336+
context=ctx,
337+
) as current_consume_span:
352338
_enrich_span(
353-
instance._current_consume_span,
339+
current_consume_span,
354340
record.topic(),
355341
record.partition(),
356342
record.offset(),
357343
operation=MessagingOperationValues.PROCESS,
358344
)
359-
instance._current_context_token = context.attach(
360-
trace.set_span_in_context(instance._current_consume_span)
361-
)
362345

363-
return record
346+
propagate.inject(
347+
record.headers(),
348+
setter=_kafka_setter,
349+
)
350+
return record

0 commit comments

Comments
 (0)