@@ -103,11 +103,11 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
103103import wrapt
104104from confluent_kafka import Consumer , Producer
105105
106- from opentelemetry import context , propagate , trace
106+ from opentelemetry import propagate , trace
107107from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
108108from opentelemetry .instrumentation .utils import unwrap
109109from opentelemetry .semconv .trace import MessagingOperationValues
110- from opentelemetry .trace import Link , SpanKind , Tracer
110+ from opentelemetry .trace import Tracer
111111
112112from .package import _instruments
113113from .utils import (
@@ -325,39 +325,27 @@ def wrap_produce(func, instance, tracer, args, kwargs):
325325
326326 @staticmethod
327327 def wrap_poll (func , instance , tracer , args , kwargs ):
328- if instance ._current_consume_span :
329- context .detach (instance ._current_context_token )
330- instance ._current_context_token = None
331- instance ._current_consume_span .end ()
332- instance ._current_consume_span = None
333-
334- with tracer .start_as_current_span (
335- "recv" , end_on_exit = True , kind = trace .SpanKind .CONSUMER
336- ):
337- record = func (* args , ** kwargs )
338- if record :
339- links = []
340- ctx = propagate .extract (record .headers (), getter = _kafka_getter )
341- if ctx :
342- for item in ctx .values ():
343- if hasattr (item , "get_span_context" ):
344- links .append (Link (context = item .get_span_context ()))
345-
346- instance ._current_consume_span = tracer .start_span (
347- name = f"{ record .topic ()} process" ,
348- links = links ,
349- kind = SpanKind .CONSUMER ,
350- )
351-
328+ record = func (* args , ** kwargs )
329+ if record :
330+ ctx = propagate .extract (record .headers (), getter = _kafka_getter )
331+ span_name = _get_span_name ("recv" , record .topic ())
332+ with tracer .start_as_current_span (
333+ span_name ,
334+ end_on_exit = True ,
335+ kind = trace .SpanKind .CONSUMER ,
336+ context = ctx ,
337+ ) as current_consume_span :
352338 _enrich_span (
353- instance . _current_consume_span ,
339+ current_consume_span ,
354340 record .topic (),
355341 record .partition (),
356342 record .offset (),
357343 operation = MessagingOperationValues .PROCESS ,
358344 )
359- instance ._current_context_token = context .attach (
360- trace .set_span_in_context (instance ._current_consume_span )
361- )
362345
363- return record
346+ propagate .inject (
347+ record .headers (),
348+ setter = _kafka_setter ,
349+ )
350+ return record
351+ return None
0 commit comments