Skip to content

Commit 685e2ee

Browse files
committed
fix(confluent-kafka): adapt python tracer to trace-test-suite
Signed-off-by: Cagri Yonca <[email protected]>
1 parent d47eccf commit 685e2ee

File tree

2 files changed

+225
-76
lines changed

2 files changed

+225
-76
lines changed

src/instana/instrumentation/kafka/confluent_kafka_python.py

Lines changed: 118 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
# (c) Copyright IBM Corp. 2025
22

3+
34
try:
5+
import contextvars
46
from typing import Any, Callable, Dict, List, Optional, Tuple
57

68
import confluent_kafka # noqa: F401
79
import wrapt
810
from confluent_kafka import Consumer, Producer
11+
from opentelemetry import context, trace
912
from opentelemetry.trace import SpanKind
1013

1114
from instana.log import logger
1215
from instana.propagators.format import Format
16+
from instana.singletons import get_tracer
1317
from instana.util.traceutils import (
1418
get_tracer_tuple,
1519
tracing_is_off,
1620
)
21+
from instana.span.span import InstanaSpan
22+
23+
consumer_token = None
24+
consumer_span = contextvars.ContextVar("confluent_kafka_consumer_span")
1725

1826
# As confluent_kafka is a wrapper around the C-developed librdkafka
1927
# (provided automatically via binary wheels), we have to create new classes
@@ -47,6 +55,9 @@ def poll(
4755
) -> Optional[confluent_kafka.Message]:
4856
return super().poll(timeout)
4957

58+
def close(self) -> None:
59+
return super().close()
60+
5061
def trace_kafka_produce(
5162
wrapped: Callable[..., InstanaConfluentKafkaProducer.produce],
5263
instance: InstanaConfluentKafkaProducer,
@@ -105,78 +116,145 @@ def create_span(
105116
headers: Optional[List[Tuple[str, bytes]]] = [],
106117
exception: Optional[str] = None,
107118
) -> None:
108-
tracer, parent_span, _ = get_tracer_tuple()
109-
parent_context = (
110-
parent_span.get_span_context()
111-
if parent_span
112-
else tracer.extract(
113-
Format.KAFKA_HEADERS,
114-
headers,
115-
disable_w3c_trace_context=True,
119+
try:
120+
span = consumer_span.get(None)
121+
if span is not None:
122+
close_consumer_span(span)
123+
124+
tracer, parent_span, _ = get_tracer_tuple()
125+
126+
if not tracer:
127+
tracer = get_tracer()
128+
is_suppressed = False
129+
130+
if topic:
131+
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
132+
"kafka",
133+
span_type,
134+
topic,
135+
)
136+
137+
if not is_suppressed and headers:
138+
for header_name, header_value in headers:
139+
if header_name == "x_instana_l_s" and header_value == b"0":
140+
is_suppressed = True
141+
break
142+
143+
if is_suppressed:
144+
return
145+
146+
parent_context = (
147+
parent_span.get_span_context()
148+
if parent_span
149+
else (
150+
tracer.extract(
151+
Format.KAFKA_HEADERS,
152+
headers,
153+
disable_w3c_trace_context=True,
154+
)
155+
if tracer.exporter.options.kafka_trace_correlation
156+
else None
157+
)
158+
)
159+
span = tracer.start_span(
160+
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
116161
)
117-
)
118-
with tracer.start_as_current_span(
119-
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
120-
) as span:
121162
if topic:
122163
span.set_attribute("kafka.service", topic)
123164
span.set_attribute("kafka.access", span_type)
124-
125165
if exception:
126166
span.record_exception(exception)
167+
span.end()
168+
169+
save_consumer_span_into_context(span)
170+
except Exception as e:
171+
logger.debug(
172+
f"Error while creating kafka-consumer span: {e}"
173+
) # pragma: no cover
174+
175+
def save_consumer_span_into_context(span: "InstanaSpan") -> None:
176+
global consumer_token
177+
ctx = trace.set_span_in_context(span)
178+
consumer_token = context.attach(ctx)
179+
consumer_span.set(span)
180+
181+
def close_consumer_span(span: "InstanaSpan") -> None:
182+
global consumer_token
183+
if span.is_recording():
184+
span.end()
185+
consumer_span.set(None)
186+
if consumer_token is not None:
187+
context.detach(consumer_token)
188+
consumer_token = None
189+
190+
def clear_context() -> None:
191+
global consumer_token
192+
context.attach(trace.set_span_in_context(None))
193+
consumer_token = None
194+
consumer_span.set(None)
127195

128196
def trace_kafka_consume(
129197
wrapped: Callable[..., InstanaConfluentKafkaConsumer.consume],
130198
instance: InstanaConfluentKafkaConsumer,
131199
args: Tuple[int, str, Tuple[Any, ...]],
132200
kwargs: Dict[str, Any],
133201
) -> List[confluent_kafka.Message]:
134-
if tracing_is_off():
135-
return wrapped(*args, **kwargs)
136-
137202
res = None
138203
exception = None
139204

140205
try:
141206
res = wrapped(*args, **kwargs)
207+
for message in res:
208+
create_span("consume", message.topic(), message.headers())
209+
return res
142210
except Exception as exc:
143211
exception = exc
144-
finally:
145-
if res:
146-
for message in res:
147-
create_span("consume", message.topic(), message.headers())
148-
else:
149-
create_span("consume", exception=exception)
212+
create_span("consume", exception=exception)
150213

151-
return res
214+
def trace_kafka_close(
215+
wrapped: Callable[..., InstanaConfluentKafkaConsumer.close],
216+
instance: InstanaConfluentKafkaConsumer,
217+
args: Tuple[Any, ...],
218+
kwargs: Dict[str, Any],
219+
) -> None:
220+
try:
221+
# Close any existing consumer span before closing the consumer
222+
span = consumer_span.get(None)
223+
if span is not None:
224+
close_consumer_span(span)
225+
226+
# Execute the actual close operation
227+
res = wrapped(*args, **kwargs)
228+
229+
logger.debug("Kafka consumer closed and spans cleaned up")
230+
return res
231+
232+
except Exception:
233+
# Still try to clean up the span even if close fails
234+
span = consumer_span.get(None)
235+
if span is not None:
236+
close_consumer_span(span)
152237

153238
def trace_kafka_poll(
154239
wrapped: Callable[..., InstanaConfluentKafkaConsumer.poll],
155240
instance: InstanaConfluentKafkaConsumer,
156241
args: Tuple[int, str, Tuple[Any, ...]],
157242
kwargs: Dict[str, Any],
158243
) -> Optional[confluent_kafka.Message]:
159-
if tracing_is_off():
160-
return wrapped(*args, **kwargs)
161-
162244
res = None
163245
exception = None
164246

165247
try:
166248
res = wrapped(*args, **kwargs)
249+
create_span("poll", res.topic(), res.headers())
250+
return res
167251
except Exception as exc:
168252
exception = exc
169-
finally:
170-
if res:
171-
create_span("poll", res.topic(), res.headers())
172-
else:
173-
create_span(
174-
"poll",
175-
next(iter(instance.list_topics().topics)),
176-
exception=exception,
177-
)
178-
179-
return res
253+
create_span(
254+
"poll",
255+
next(iter(instance.list_topics().topics)),
256+
exception=exception,
257+
)
180258

181259
# Apply the monkey patch
182260
confluent_kafka.Producer = InstanaConfluentKafkaProducer
@@ -189,6 +267,9 @@ def trace_kafka_poll(
189267
InstanaConfluentKafkaConsumer, "consume", trace_kafka_consume
190268
)
191269
wrapt.wrap_function_wrapper(InstanaConfluentKafkaConsumer, "poll", trace_kafka_poll)
270+
wrapt.wrap_function_wrapper(
271+
InstanaConfluentKafkaConsumer, "close", trace_kafka_close
272+
)
192273

193274
logger.debug("Instrumenting Kafka (confluent_kafka)")
194275
except ImportError:

0 commit comments

Comments
 (0)