diff --git a/CHANGELOG.md b/CHANGELOG.md index 775043035f..f83913b4e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -265,6 +265,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2355](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2355)) - AwsLambdaInstrumentor sets `cloud.account.id` span attribute ([#2367](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2367)) +- Create span only after record is received while polling + ([#1678](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1678)) ### Added diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 3d1cc79c93..f7510ee247 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -377,27 +377,29 @@ def wrap_produce(func, instance, tracer, args, kwargs): @staticmethod def wrap_poll(func, instance, tracer, args, kwargs): - if instance._current_consume_span: - _end_current_consume_span(instance) - - with tracer.start_as_current_span( - "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER - ): - record = func(*args, **kwargs) - if record: - _create_new_consume_span(instance, tracer, [record]) + record = func(*args, **kwargs) + if record: + ctx = propagate.extract(record.headers(), getter=_kafka_getter) + span_name = _get_span_name("recv", record.topic()) + with tracer.start_as_current_span( + span_name, + end_on_exit=True, + kind=trace.SpanKind.CONSUMER, + context=ctx, + ) as current_consume_span: _enrich_span( - instance._current_consume_span, + current_consume_span, record.topic(), record.partition(), record.offset(), operation=MessagingOperationValues.PROCESS, ) - instance._current_context_token = context.attach( - trace.set_span_in_context(instance._current_consume_span) - ) - - return record + propagate.inject( + record.headers(), + setter=_kafka_setter, + ) + return record + return None @staticmethod def wrap_consume(func, instance, tracer, args, kwargs):