Skip to content

Commit a608c43

Browse files
committed
Create span only after record is received while polling
1 parent 24eadcf commit a608c43

File tree

2 files changed

+19
-15
lines changed
  • instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka

2 files changed

+19
-15
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2929
([#2355](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2355))
3030
- AwsLambdaInstrumentor sets `cloud.account.id` span attribute
3131
([#2367](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2367))
32+
- Create span only after record is received while polling
33+
([#1678](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1678))
3234

3335
## Version 1.23.0/0.44b0 (2024-02-23)
3436

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -356,27 +356,29 @@ def wrap_produce(func, instance, tracer, args, kwargs):
356356

357357
@staticmethod
358358
def wrap_poll(func, instance, tracer, args, kwargs):
359-
if instance._current_consume_span:
360-
_end_current_consume_span(instance)
361-
362-
with tracer.start_as_current_span(
363-
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
364-
):
365-
record = func(*args, **kwargs)
366-
if record:
367-
_create_new_consume_span(instance, tracer, [record])
359+
record = func(*args, **kwargs)
360+
if record:
361+
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
362+
span_name = _get_span_name("recv", record.topic())
363+
with tracer.start_as_current_span(
364+
span_name,
365+
end_on_exit=True,
366+
kind=trace.SpanKind.CONSUMER,
367+
context=ctx,
368+
) as current_consume_span:
368369
_enrich_span(
369-
instance._current_consume_span,
370+
current_consume_span,
370371
record.topic(),
371372
record.partition(),
372373
record.offset(),
373374
operation=MessagingOperationValues.PROCESS,
374375
)
375-
instance._current_context_token = context.attach(
376-
trace.set_span_in_context(instance._current_consume_span)
377-
)
378-
379-
return record
376+
propagate.inject(
377+
record.headers(),
378+
setter=_kafka_setter,
379+
)
380+
return record
381+
return None
380382

381383
@staticmethod
382384
def wrap_consume(func, instance, tracer, args, kwargs):

0 commit comments

Comments
 (0)