Skip to content

Commit 381c5f5

Browse files
committed
Create span only after record is received while polling
1 parent 24eadcf commit 381c5f5

File tree

1 file changed

+17
-15
lines changed
  • instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka

1 file changed

+17
-15
lines changed

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -356,27 +356,29 @@ def wrap_produce(func, instance, tracer, args, kwargs):
356356

357357
@staticmethod
358358
def wrap_poll(func, instance, tracer, args, kwargs):
359-
if instance._current_consume_span:
360-
_end_current_consume_span(instance)
361-
362-
with tracer.start_as_current_span(
363-
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
364-
):
365-
record = func(*args, **kwargs)
366-
if record:
367-
_create_new_consume_span(instance, tracer, [record])
359+
record = func(*args, **kwargs)
360+
if record:
361+
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
362+
span_name = _get_span_name("recv", record.topic())
363+
with tracer.start_as_current_span(
364+
span_name,
365+
end_on_exit=True,
366+
kind=trace.SpanKind.CONSUMER,
367+
context=ctx,
368+
) as current_consume_span:
368369
_enrich_span(
369-
instance._current_consume_span,
370+
current_consume_span,
370371
record.topic(),
371372
record.partition(),
372373
record.offset(),
373374
operation=MessagingOperationValues.PROCESS,
374375
)
375-
instance._current_context_token = context.attach(
376-
trace.set_span_in_context(instance._current_consume_span)
377-
)
378-
379-
return record
376+
propagate.inject(
377+
record.headers(),
378+
setter=_kafka_setter,
379+
)
380+
return record
381+
return None
380382

381383
@staticmethod
382384
def wrap_consume(func, instance, tracer, args, kwargs):

0 commit comments

Comments
 (0)