11# (c) Copyright IBM Corp. 2025
22
3+
34try :
5+ import contextvars
46 from typing import Any , Callable , Dict , List , Optional , Tuple
57
68 import confluent_kafka # noqa: F401
79 import wrapt
810 from confluent_kafka import Consumer , Producer
11+ from opentelemetry import context , trace
912 from opentelemetry .trace import SpanKind
1013
1114 from instana .log import logger
1215 from instana .propagators .format import Format
16+ from instana .singletons import get_tracer
1317 from instana .util .traceutils import (
1418 get_tracer_tuple ,
1519 tracing_is_off ,
1620 )
21+ from instana .span .span import InstanaSpan
22+
23+ consumer_token : Dict [str , Any ] = {}
24+ consumer_span = contextvars .ContextVar ("confluent_kafka_consumer_span" )
1725
1826 # As confluent_kafka is a wrapper around the C-developed librdkafka
1927 # (provided automatically via binary wheels), we have to create new classes
@@ -105,78 +113,142 @@ def create_span(
105113 headers : Optional [List [Tuple [str , bytes ]]] = [],
106114 exception : Optional [str ] = None ,
107115 ) -> None :
108- tracer , parent_span , _ = get_tracer_tuple ()
109- parent_context = (
110- parent_span .get_span_context ()
111- if parent_span
112- else tracer .extract (
113- Format .KAFKA_HEADERS ,
114- headers ,
115- disable_w3c_trace_context = True ,
116+ try :
117+ span = consumer_span .get (None )
118+ if span is not None :
119+ close_consumer_span (span )
120+
121+ tracer , parent_span , _ = get_tracer_tuple ()
122+
123+ if not tracer :
124+ tracer = get_tracer ()
125+ is_suppressed = False
126+
127+ if topic :
128+ is_suppressed = tracer .exporter ._HostAgent__is_endpoint_ignored (
129+ "kafka" ,
130+ span_type ,
131+ topic ,
132+ )
133+
134+ if not is_suppressed and headers :
135+ for header_name , header_value in headers :
136+ if header_name == "x_instana_l_s" and header_value == b"0" :
137+ is_suppressed = True
138+ break
139+
140+ if is_suppressed :
141+ return
142+
143+ parent_context = (
144+ parent_span .get_span_context ()
145+ if parent_span
146+ else (
147+ tracer .extract (
148+ Format .KAFKA_HEADERS ,
149+ headers ,
150+ disable_w3c_trace_context = True ,
151+ )
152+ if tracer .exporter .options .kafka_trace_correlation
153+ else None
154+ )
155+ )
156+ span = tracer .start_span (
157+ "kafka-consumer" , span_context = parent_context , kind = SpanKind .CONSUMER
116158 )
117- )
118- with tracer .start_as_current_span (
119- "kafka-consumer" , span_context = parent_context , kind = SpanKind .CONSUMER
120- ) as span :
121159 if topic :
122160 span .set_attribute ("kafka.service" , topic )
123161 span .set_attribute ("kafka.access" , span_type )
124-
125162 if exception :
126163 span .record_exception (exception )
164+ span .end ()
165+
166+ save_consumer_span_into_context (span )
167+ except Exception as e :
168+ logger .debug (f"Error while creating kafka-consumer span: { e } " )
169+
170+ def save_consumer_span_into_context (span : "InstanaSpan" ) -> None :
171+ ctx = trace .set_span_in_context (span )
172+ token = context .attach (ctx )
173+ consumer_token ["token" ] = token
174+ consumer_span .set (span )
175+
176+ def close_consumer_span (span : "InstanaSpan" ) -> None :
177+ if span .is_recording ():
178+ span .end ()
179+ consumer_span .set (None )
180+ if "token" in consumer_token :
181+ context .detach (consumer_token .pop ("token" , None ))
182+
183+ def clear_context () -> None :
184+ context .attach (trace .set_span_in_context (None ))
185+ consumer_token .clear ()
186+ consumer_span .set (None )
127187
128188 def trace_kafka_consume (
129189 wrapped : Callable [..., InstanaConfluentKafkaConsumer .consume ],
130190 instance : InstanaConfluentKafkaConsumer ,
131191 args : Tuple [int , str , Tuple [Any , ...]],
132192 kwargs : Dict [str , Any ],
133193 ) -> List [confluent_kafka .Message ]:
134- if tracing_is_off ():
135- return wrapped (* args , ** kwargs )
136-
137194 res = None
138195 exception = None
139196
140197 try :
141198 res = wrapped (* args , ** kwargs )
199+ for message in res :
200+ create_span ("consume" , message .topic (), message .headers ())
201+ return res
142202 except Exception as exc :
143203 exception = exc
144- finally :
145- if res :
146- for message in res :
147- create_span ("consume" , message .topic (), message .headers ())
148- else :
149- create_span ("consume" , exception = exception )
204+ create_span ("consume" , exception = exception )
205+
206+ def trace_kafka_close (
207+ wrapped : Callable [..., InstanaConfluentKafkaConsumer .close ],
208+ instance : InstanaConfluentKafkaConsumer ,
209+ args : Tuple [Any , ...],
210+ kwargs : Dict [str , Any ],
211+ ) -> None :
212+ try :
213+ # Close any existing consumer span before closing the consumer
214+ span = consumer_span .get (None )
215+ if span is not None :
216+ close_consumer_span (span )
217+
218+ # Execute the actual close operation
219+ result = wrapped (* args , ** kwargs )
150220
151- return res
221+ logger .debug ("Kafka consumer closed and spans cleaned up" )
222+ return result
223+
224+ except Exception as exc :
225+ logger .debug (f"Error while closing Kafka consumer: { exc } " )
226+ # Still try to clean up the span even if close fails
227+ span = consumer_span .get (None )
228+ if span is not None :
229+ close_consumer_span (span )
230+ raise
152231
153232 def trace_kafka_poll (
154233 wrapped : Callable [..., InstanaConfluentKafkaConsumer .poll ],
155234 instance : InstanaConfluentKafkaConsumer ,
156235 args : Tuple [int , str , Tuple [Any , ...]],
157236 kwargs : Dict [str , Any ],
158237 ) -> Optional [confluent_kafka .Message ]:
159- if tracing_is_off ():
160- return wrapped (* args , ** kwargs )
161-
162238 res = None
163239 exception = None
164240
165241 try :
166242 res = wrapped (* args , ** kwargs )
243+ create_span ("poll" , res .topic (), res .headers ())
244+ return res
167245 except Exception as exc :
168246 exception = exc
169- finally :
170- if res :
171- create_span ("poll" , res .topic (), res .headers ())
172- else :
173- create_span (
174- "poll" ,
175- next (iter (instance .list_topics ().topics )),
176- exception = exception ,
177- )
178-
179- return res
247+ create_span (
248+ "poll" ,
249+ next (iter (instance .list_topics ().topics )),
250+ exception = exception ,
251+ )
180252
181253 # Apply the monkey patch
182254 confluent_kafka .Producer = InstanaConfluentKafkaProducer
@@ -189,6 +261,9 @@ def trace_kafka_poll(
189261 InstanaConfluentKafkaConsumer , "consume" , trace_kafka_consume
190262 )
191263 wrapt .wrap_function_wrapper (InstanaConfluentKafkaConsumer , "poll" , trace_kafka_poll )
264+ wrapt .wrap_function_wrapper (
265+ InstanaConfluentKafkaConsumer , "close" , trace_kafka_close
266+ )
192267
193268 logger .debug ("Instrumenting Kafka (confluent_kafka)" )
194269except ImportError :
0 commit comments