Skip to content

Commit a8d0b73

Browse files
feat: kafka trace consume [backport 2.7] (#8757)
Backported because it contains a fix for #8752 Adds tracing and DSM support for https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.consume ## Checklist - [x] Change(s) are motivated and described in the PR description - [x] Testing strategy is described if automated tests are not included in the PR - [x] Risks are described (performance impact, potential for breakage, maintainability) - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed or label `changelog/no-changelog` is set - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)) - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] If this PR changes the public interface, I've notified `@DataDog/apm-tees`. - [x] If change touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. ## Reviewer Checklist - [x] Title is accurate - [x] All changes are related to the pull request's stated goal - [x] Description motivates each change - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - [x] Testing strategy adequately addresses listed risks - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] Release note makes sense to a user of the library - [x] Author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) Co-authored-by: Munir Abdinur <[email protected]>
1 parent 94ded6f commit a8d0b73

6 files changed

+383
-70
lines changed

ddtrace/contrib/kafka/patch.py

Lines changed: 77 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import sys
23

34
import confluent_kafka
45

@@ -115,8 +116,11 @@ def patch():
115116
for producer in (TracedProducer, TracedSerializingProducer):
116117
trace_utils.wrap(producer, "produce", traced_produce)
117118
for consumer in (TracedConsumer, TracedDeserializingConsumer):
118-
trace_utils.wrap(consumer, "poll", traced_poll)
119+
trace_utils.wrap(consumer, "poll", traced_poll_or_consume)
119120
trace_utils.wrap(consumer, "commit", traced_commit)
121+
122+
# Consume is not implemented in deserializing consumers
123+
trace_utils.wrap(TracedConsumer, "consume", traced_poll_or_consume)
120124
Pin().onto(confluent_kafka.Producer)
121125
Pin().onto(confluent_kafka.Consumer)
122126
Pin().onto(confluent_kafka.SerializingProducer)
@@ -136,6 +140,10 @@ def unpatch():
136140
if trace_utils.iswrapped(consumer.commit):
137141
trace_utils.unwrap(consumer, "commit")
138142

143+
# Consume is not implemented in deserializing consumers
144+
if trace_utils.iswrapped(TracedConsumer.consume):
145+
trace_utils.unwrap(TracedConsumer, "consume")
146+
139147
confluent_kafka.Producer = _Producer
140148
confluent_kafka.Consumer = _Consumer
141149
if _SerializingProducer is not None:
@@ -194,7 +202,7 @@ def traced_produce(func, instance, args, kwargs):
194202
return func(*args, **kwargs)
195203

196204

197-
def traced_poll(func, instance, args, kwargs):
205+
def traced_poll_or_consume(func, instance, args, kwargs):
198206
pin = Pin.get_from(instance)
199207
if not pin or not pin.enabled():
200208
return func(*args, **kwargs)
@@ -204,67 +212,79 @@ def traced_poll(func, instance, args, kwargs):
204212
start_ns = time_ns()
205213
# wrap in a try catch and raise exception after span is started
206214
err = None
215+
result = None
207216
try:
208-
message = func(*args, **kwargs)
217+
result = func(*args, **kwargs)
218+
return result
209219
except Exception as e:
210220
err = e
221+
raise err
222+
finally:
223+
if isinstance(result, confluent_kafka.Message):
224+
# poll returns a single message
225+
_instrument_message([result], pin, start_ns, instance, err)
226+
elif isinstance(result, list):
227+
# consume returns a list of messages,
228+
_instrument_message(result, pin, start_ns, instance, err)
229+
elif config.kafka.trace_empty_poll_enabled:
230+
_instrument_message([None], pin, start_ns, instance, err)
231+
232+
233+
def _instrument_message(messages, pin, start_ns, instance, err):
211234
ctx = None
212-
if message is not None and config.kafka.distributed_tracing_enabled and message.headers():
213-
ctx = Propagator.extract(dict(message.headers()))
214-
if message is not None or config.kafka.trace_empty_poll_enabled:
215-
with pin.tracer.start_span(
216-
name=schematize_messaging_operation(kafkax.CONSUME, provider="kafka", direction=SpanDirection.PROCESSING),
217-
service=trace_utils.ext_service(pin, config.kafka),
218-
span_type=SpanTypes.WORKER,
219-
child_of=ctx if ctx is not None else pin.tracer.context_provider.active(),
220-
activate=True,
221-
) as span:
222-
# reset span start time to before function call
223-
span.start_ns = start_ns
224-
225-
span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE)
226-
span.set_tag_str(COMPONENT, config.kafka.integration_name)
227-
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
228-
span.set_tag_str(kafkax.RECEIVED_MESSAGE, str(message is not None))
229-
span.set_tag_str(kafkax.GROUP_ID, instance._group_id)
235+
# First message is used to extract context and enrich datadog spans
236+
# This approach aligns with the opentelemetry confluent kafka semantics
237+
first_message = messages[0]
238+
if first_message and config.kafka.distributed_tracing_enabled and first_message.headers():
239+
ctx = Propagator.extract(dict(first_message.headers()))
240+
with pin.tracer.start_span(
241+
name=schematize_messaging_operation(kafkax.CONSUME, provider="kafka", direction=SpanDirection.PROCESSING),
242+
service=trace_utils.ext_service(pin, config.kafka),
243+
span_type=SpanTypes.WORKER,
244+
child_of=ctx if ctx is not None else pin.tracer.context_provider.active(),
245+
activate=True,
246+
) as span:
247+
# reset span start time to before function call
248+
span.start_ns = start_ns
249+
250+
for message in messages:
230251
if message is not None:
231-
core.set_item("kafka_topic", message.topic())
232-
core.dispatch("kafka.consume.start", (instance, message, span))
233-
234-
message_key = message.key() or ""
235-
message_offset = message.offset() or -1
236-
span.set_tag_str(kafkax.TOPIC, message.topic())
237-
238-
# If this is a deserializing consumer, do not set the key as a tag since we
239-
# do not have the serialization function
240-
if (
241-
(_DeserializingConsumer is not None and not isinstance(instance, _DeserializingConsumer))
242-
or isinstance(message_key, str)
243-
or isinstance(message_key, bytes)
244-
):
245-
span.set_tag_str(kafkax.MESSAGE_KEY, message_key)
246-
span.set_tag(kafkax.PARTITION, message.partition())
247-
is_tombstone = False
248-
try:
249-
is_tombstone = len(message) == 0
250-
except TypeError: # https://github.com/confluentinc/confluent-kafka-python/issues/1192
251-
pass
252-
span.set_tag_str(kafkax.TOMBSTONE, str(is_tombstone))
253-
span.set_tag(kafkax.MESSAGE_OFFSET, message_offset)
254-
span.set_tag(SPAN_MEASURED_KEY)
255-
rate = config.kafka.get_analytics_sample_rate()
256-
if rate is not None:
257-
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate)
258-
259-
# raise exception if one was encountered
260-
if err is not None:
261-
raise err
262-
return message
263-
else:
252+
core.set_item("kafka_topic", first_message.topic())
253+
core.dispatch("kafka.consume.start", (instance, first_message, span))
254+
255+
span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE)
256+
span.set_tag_str(COMPONENT, config.kafka.integration_name)
257+
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
258+
span.set_tag_str(kafkax.RECEIVED_MESSAGE, str(first_message is not None))
259+
span.set_tag_str(kafkax.GROUP_ID, instance._group_id)
260+
if messages[0] is not None:
261+
message_key = messages[0].key() or ""
262+
message_offset = messages[0].offset() or -1
263+
span.set_tag_str(kafkax.TOPIC, messages[0].topic())
264+
265+
# If this is a deserializing consumer, do not set the key as a tag since we
266+
# do not have the serialization function
267+
if (
268+
(_DeserializingConsumer is not None and not isinstance(instance, _DeserializingConsumer))
269+
or isinstance(message_key, str)
270+
or isinstance(message_key, bytes)
271+
):
272+
span.set_tag_str(kafkax.MESSAGE_KEY, message_key)
273+
span.set_tag(kafkax.PARTITION, messages[0].partition())
274+
is_tombstone = False
275+
try:
276+
is_tombstone = len(messages[0]) == 0
277+
except TypeError: # https://github.com/confluentinc/confluent-kafka-python/issues/1192
278+
pass
279+
span.set_tag_str(kafkax.TOMBSTONE, str(is_tombstone))
280+
span.set_tag(kafkax.MESSAGE_OFFSET, message_offset)
281+
span.set_tag(SPAN_MEASURED_KEY)
282+
rate = config.kafka.get_analytics_sample_rate()
283+
if rate is not None:
284+
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate)
285+
264286
if err is not None:
265-
raise err
266-
else:
267-
return message
287+
span.set_exc_info(*sys.exc_info())
268288

269289

270290
def traced_commit(func, instance, args, kwargs):
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
features:
3+
- |
4+
kafka: Adds tracing and DSM support for ``confluent_kafka.Consumer.consume()``. Previously only `confluent_kafka.Consumer.poll` was instrumented.

tests/contrib/kafka/test_kafka.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,16 @@ def dummy_tracer():
9090

9191

9292
@pytest.fixture
93-
def tracer():
93+
def should_filter_empty_polls():
94+
yield True
95+
96+
97+
@pytest.fixture
98+
def tracer(should_filter_empty_polls):
9499
patch()
95100
t = Tracer()
96-
t.configure(settings={"FILTERS": [KafkaConsumerPollFilter()]})
101+
if should_filter_empty_polls:
102+
t.configure(settings={"FILTERS": [KafkaConsumerPollFilter()]})
97103
# disable backoff because it makes these tests less reliable
98104
t._writer._send_payload_with_backoff = t._writer._send_payload
99105
try:
@@ -266,6 +272,42 @@ def test_commit(producer, consumer, kafka_topic):
266272
consumer.commit(message)
267273

268274

275+
@pytest.mark.snapshot(ignores=["metrics.kafka.message_offset"])
276+
def test_commit_with_consume_single_message(producer, consumer, kafka_topic):
277+
with override_config("kafka", dict(trace_empty_poll_enabled=False)):
278+
producer.produce(kafka_topic, PAYLOAD, key=KEY)
279+
producer.flush()
280+
# One message is consumed and one span is generated.
281+
messages = consumer.consume(num_messages=1)
282+
assert len(messages) == 1
283+
consumer.commit(messages[0])
284+
285+
286+
@pytest.mark.snapshot(ignores=["metrics.kafka.message_offset"])
287+
def test_commit_with_consume_with_multiple_messages(producer, consumer, kafka_topic):
288+
with override_config("kafka", dict(trace_empty_poll_enabled=False)):
289+
producer.produce(kafka_topic, PAYLOAD, key=KEY)
290+
producer.produce(kafka_topic, PAYLOAD, key=KEY)
291+
producer.flush()
292+
# Two messages are consumed but only ONE span is generated
293+
messages = consumer.consume(num_messages=2)
294+
assert len(messages) == 2
295+
296+
297+
@pytest.mark.snapshot(ignores=["metrics.kafka.message_offset", "meta.error.stack"])
298+
@pytest.mark.parametrize("should_filter_empty_polls", [False])
299+
def test_commit_with_consume_with_error(producer, consumer, kafka_topic):
300+
producer.produce(kafka_topic, PAYLOAD, key=KEY)
301+
producer.flush()
302+
# Raises an exception by consuming messages after the consumer has been closed
303+
with pytest.raises(TypeError):
304+
# Empty poll spans are filtered out by the KafkaConsumerPollFilter. We need to disable
305+
# it to test error spans.
306+
# Allowing empty poll spans could introduce flakiness in the test.
307+
with override_config("kafka", dict(trace_empty_poll_enabled=True)):
308+
consumer.consume(num_messages=1, invalid_args="invalid_args")
309+
310+
269311
@pytest.mark.snapshot(ignores=["metrics.kafka.message_offset"])
270312
def test_commit_with_offset(producer, consumer, kafka_topic):
271313
with override_config("kafka", dict(trace_empty_poll_enabled=False)):
@@ -415,20 +457,10 @@ def _generate_in_subprocess(random_topic):
415457
import ddtrace
416458
from ddtrace.contrib.kafka.patch import patch
417459
from ddtrace.contrib.kafka.patch import unpatch
418-
from ddtrace.filters import TraceFilter
460+
from tests.contrib.kafka.test_kafka import KafkaConsumerPollFilter
419461

420462
PAYLOAD = bytes("hueh hueh hueh", encoding="utf-8")
421463

422-
class KafkaConsumerPollFilter(TraceFilter):
423-
def process_trace(self, trace):
424-
# Filter out all poll spans that have no received message
425-
return (
426-
None
427-
if trace[0].name in {"kafka.consume", "kafka.process"}
428-
and trace[0].get_tag("kafka.received_message") == "False"
429-
else trace
430-
)
431-
432464
ddtrace.tracer.configure(settings={"FILTERS": [KafkaConsumerPollFilter()]})
433465
# disable backoff because it makes these tests less reliable
434466
ddtrace.tracer._writer._send_payload_with_backoff = ddtrace.tracer._writer._send_payload
@@ -733,6 +765,7 @@ def test_tracing_context_is_propagated_when_enabled(ddtrace_run_python_code_in_s
733765
from tests.contrib.kafka.test_kafka import kafka_topic
734766
from tests.contrib.kafka.test_kafka import producer
735767
from tests.contrib.kafka.test_kafka import tracer
768+
from tests.contrib.kafka.test_kafka import should_filter_empty_polls
736769
from tests.utils import DummyTracer
737770
738771
def test(consumer, producer, kafka_topic):
@@ -923,6 +956,7 @@ def test_does_not_trace_empty_poll_when_disabled(ddtrace_run_python_code_in_subp
923956
from tests.contrib.kafka.test_kafka import kafka_topic
924957
from tests.contrib.kafka.test_kafka import producer
925958
from tests.contrib.kafka.test_kafka import tracer
959+
from tests.contrib.kafka.test_kafka import should_filter_empty_polls
926960
from tests.utils import DummyTracer
927961
928962
def test(consumer, producer, kafka_topic):
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
[[
2+
{
3+
"name": "kafka.consume",
4+
"service": "kafka",
5+
"resource": "kafka.consume",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "worker",
10+
"error": 0,
11+
"meta": {
12+
"_dd.base_service": "",
13+
"_dd.p.dm": "-0",
14+
"_dd.p.tid": "65dcd1fd00000000",
15+
"component": "kafka",
16+
"kafka.group_id": "test_group",
17+
"kafka.message_key": "test_key",
18+
"kafka.received_message": "True",
19+
"kafka.tombstone": "False",
20+
"kafka.topic": "test_commit_with_consume_single_message",
21+
"language": "python",
22+
"messaging.system": "kafka",
23+
"pathway.hash": "7964333589438960939",
24+
"runtime-id": "ff074b2cc3b34b63bbdabbfb5bafd0a4",
25+
"span.kind": "consumer"
26+
},
27+
"metrics": {
28+
"_dd.measured": 1,
29+
"_dd.top_level": 1,
30+
"_dd.tracer_kr": 1.0,
31+
"_sampling_priority_v1": 1,
32+
"kafka.message_offset": -1,
33+
"kafka.partition": 0,
34+
"process_id": 96733
35+
},
36+
"duration": 3198787000,
37+
"start": 1708970490483150000
38+
}],
39+
[
40+
{
41+
"name": "kafka.produce",
42+
"service": "kafka",
43+
"resource": "kafka.produce",
44+
"trace_id": 1,
45+
"span_id": 1,
46+
"parent_id": 0,
47+
"type": "worker",
48+
"error": 0,
49+
"meta": {
50+
"_dd.base_service": "",
51+
"_dd.p.dm": "-0",
52+
"_dd.p.tid": "65dcd1f900000000",
53+
"component": "kafka",
54+
"kafka.message_key": "test_key",
55+
"kafka.tombstone": "False",
56+
"kafka.topic": "test_commit_with_consume_single_message",
57+
"language": "python",
58+
"messaging.kafka.bootstrap.servers": "localhost:29092",
59+
"messaging.system": "kafka",
60+
"pathway.hash": "8904226842384519559",
61+
"runtime-id": "ff074b2cc3b34b63bbdabbfb5bafd0a4",
62+
"span.kind": "producer"
63+
},
64+
"metrics": {
65+
"_dd.measured": 1,
66+
"_dd.top_level": 1,
67+
"_dd.tracer_kr": 1.0,
68+
"_sampling_priority_v1": 1,
69+
"kafka.partition": -1,
70+
"process_id": 96733
71+
},
72+
"duration": 356000,
73+
"start": 1708970489477615000
74+
}]]

0 commit comments

Comments
 (0)