Skip to content

Commit 7c1b921

Browse files
chore(kafka): use core api for data streams monitoring [backport 2.0] (#7163)
Backport 5480ca8 from #6890 to 2.0. This commit swaps out all remaining DSM code in the kafka integration to use the core API. This commit also contains the addition of two tests and fixes to the commit wrapper which were required to safely transition. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that. Co-authored-by: Teague Bick <[email protected]>
1 parent ad31d75 commit 7c1b921

File tree

10 files changed

+283
-74
lines changed

10 files changed

+283
-74
lines changed

ddtrace/contrib/kafka/patch.py

Lines changed: 5 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
import time
2-
31
import confluent_kafka
4-
from confluent_kafka import TopicPartition
52

63
from ddtrace import config
74
from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY
@@ -20,7 +17,6 @@
2017
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
2118
from ddtrace.internal.utils import ArgumentError
2219
from ddtrace.internal.utils import get_argument_value
23-
from ddtrace.internal.utils import set_argument_value
2420
from ddtrace.internal.utils.formats import asbool
2521
from ddtrace.pin import Pin
2622

@@ -121,37 +117,7 @@ def traced_produce(func, instance, args, kwargs):
121117
value = None
122118
message_key = kwargs.get("key", "")
123119
partition = kwargs.get("partition", -1)
124-
if config._data_streams_enabled:
125-
# inject data streams context
126-
core.dispatch("kafka.produce.start", [instance, args, kwargs])
127-
128-
on_delivery_kwarg = "on_delivery"
129-
on_delivery_arg = 5
130-
on_delivery = None
131-
try:
132-
on_delivery = get_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg)
133-
except ArgumentError:
134-
on_delivery_kwarg = "callback"
135-
on_delivery_arg = 4
136-
try:
137-
on_delivery = get_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg)
138-
except ArgumentError:
139-
on_delivery = None
140-
141-
def wrapped_callback(err, msg):
142-
if err is None:
143-
if pin.tracer.data_streams_processor:
144-
pin.tracer.data_streams_processor.track_kafka_produce(
145-
msg.topic(), msg.partition(), msg.offset() or -1, time.time()
146-
)
147-
if on_delivery is not None:
148-
on_delivery(err, msg)
149-
150-
try:
151-
args, kwargs = set_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg, wrapped_callback)
152-
except ArgumentError:
153-
# we set the callback even if it's not set by the client, to track produce calls correctly.
154-
kwargs[on_delivery_kwarg] = wrapped_callback
120+
core.dispatch("kafka.produce.start", [instance, args, kwargs])
155121

156122
with pin.tracer.trace(
157123
schematize_messaging_operation(kafkax.PRODUCE, provider="kafka", direction=SpanDirection.OUTBOUND),
@@ -191,15 +157,8 @@ def traced_poll(func, instance, args, kwargs):
191157
span.set_tag_str(kafkax.RECEIVED_MESSAGE, str(message is not None))
192158
span.set_tag_str(kafkax.GROUP_ID, instance._group_id)
193159
if message is not None:
194-
if config._data_streams_enabled:
195-
core.set_item("kafka_topic", message.topic())
196-
core.dispatch("kafka.consume.start", [instance, message])
197-
if instance._auto_commit:
198-
# it's not exactly true, but if auto commit is enabled, we consider that a message is acknowledged
199-
# when it's read.
200-
pin.tracer.data_streams_processor.track_kafka_commit(
201-
instance._group_id, message.topic(), message.partition(), message.offset() or -1, time.time()
202-
)
160+
core.set_item("kafka_topic", message.topic())
161+
core.dispatch("kafka.consume.start", [instance, message])
203162

204163
message_key = message.key() or ""
205164
message_offset = message.offset() or -1
@@ -220,17 +179,6 @@ def traced_commit(func, instance, args, kwargs):
220179
if not pin or not pin.enabled():
221180
return func(*args, **kwargs)
222181

223-
if config._data_streams_enabled:
224-
message = get_argument_value(args, kwargs, 0, "message", True)
225-
# message and offset are mutually exclusive. Only one parameter can be passed.
226-
if message is not None:
227-
offsets = [TopicPartition(message.topic(), message.partition(), offset=message.offset())]
228-
else:
229-
offsets = get_argument_value(args, kwargs, 1, "offsets", True)
230-
231-
if offsets:
232-
for offset in offsets:
233-
pin.tracer.data_streams_processor.track_kafka_commit(
234-
instance._group_id, offset.topic, offset.partition, offset.offset or -1, time.time()
235-
)
182+
core.dispatch("kafka.commit.start", [instance, args, kwargs])
183+
236184
return func(*args, **kwargs)
Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
1-
from . import kafka # noqa:F401
1+
from ddtrace import config
2+
from ddtrace.internal import agent
23

4+
from ...internal.utils.importlib import require_modules
35

6+
7+
required_modules = ["confluent_kafka"]
48
_processor = None
59

10+
if config._data_streams_enabled:
11+
with require_modules(required_modules) as missing_modules:
12+
if not missing_modules:
13+
from . import kafka # noqa:F401
614

7-
def data_streams_processor():
8-
from . import processor
915

16+
def data_streams_processor():
1017
global _processor
11-
if not _processor:
12-
_processor = processor.DataStreamsProcessor()
18+
if config._data_streams_enabled and not _processor:
19+
from . import processor
20+
21+
_processor = processor.DataStreamsProcessor(agent.get_trace_url())
1322

1423
return _processor

ddtrace/internal/datastreams/kafka.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
1+
import time
2+
3+
from confluent_kafka import TopicPartition
4+
15
from ddtrace import config
26
from ddtrace.internal import core
37
from ddtrace.internal.datastreams.processor import PROPAGATION_KEY
8+
from ddtrace.internal.utils import ArgumentError
9+
from ddtrace.internal.utils import get_argument_value
10+
from ddtrace.internal.utils import set_argument_value
11+
12+
13+
INT_TYPES = (int,)
414

515

616
def dsm_kafka_message_produce(instance, args, kwargs):
@@ -13,6 +23,29 @@ def dsm_kafka_message_produce(instance, args, kwargs):
1323
headers[PROPAGATION_KEY] = encoded_pathway
1424
kwargs["headers"] = headers
1525

26+
on_delivery_kwarg = "on_delivery"
27+
on_delivery_arg = 5
28+
on_delivery = None
29+
try:
30+
on_delivery = get_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg)
31+
except ArgumentError:
32+
on_delivery_kwarg = "callback"
33+
on_delivery_arg = 4
34+
on_delivery = get_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg, optional=True)
35+
36+
def wrapped_callback(err, msg):
37+
if err is None:
38+
reported_offset = msg.offset() if isinstance(msg.offset(), INT_TYPES) else -1
39+
processor().track_kafka_produce(msg.topic(), msg.partition(), reported_offset, time.time())
40+
if on_delivery is not None:
41+
on_delivery(err, msg)
42+
43+
try:
44+
args, kwargs = set_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg, wrapped_callback)
45+
except ArgumentError:
46+
# we set the callback even if it's not set by the client, to track produce calls correctly.
47+
kwargs[on_delivery_kwarg] = wrapped_callback
48+
1649

1750
def dsm_kafka_message_consume(instance, message):
1851
from . import data_streams_processor as processor
@@ -24,7 +57,39 @@ def dsm_kafka_message_consume(instance, message):
2457
ctx = processor().decode_pathway(headers.get(PROPAGATION_KEY, None))
2558
ctx.set_checkpoint(["direction:in", "group:" + group, "topic:" + topic, "type:kafka"])
2659

60+
if instance._auto_commit:
61+
# it's not exactly true, but if auto commit is enabled, we consider that a message is acknowledged
62+
# when it's read.
63+
reported_offset = message.offset() if isinstance(message.offset(), INT_TYPES) else -1
64+
processor().track_kafka_commit(
65+
instance._group_id, message.topic(), message.partition(), reported_offset, time.time()
66+
)
67+
68+
69+
def dsm_kafka_message_commit(instance, args, kwargs):
70+
from . import data_streams_processor as processor
71+
72+
message = get_argument_value(args, kwargs, 0, "message", optional=True)
73+
74+
offsets = []
75+
if message is not None:
76+
# We need to add one to message offsets to make them mean the same thing as offsets
77+
# passed in by the offsets keyword
78+
reported_offset = message.offset() + 1 if isinstance(message.offset(), INT_TYPES) else -1
79+
offsets = [TopicPartition(message.topic(), message.partition(), reported_offset)]
80+
else:
81+
offsets = get_argument_value(args, kwargs, 1, "offsets", True) or []
82+
83+
for offset in offsets:
84+
# When offsets is passed in as an arg, its an exact value for the next expected message.
85+
# When message is passed in Kafka reports msg.offset() + 1. We add +1 above to message
86+
# offsets to make them mean the same thing as passed in offsets, then subtract 1 universally
87+
# here from both
88+
reported_offset = offset.offset - 1 if isinstance(offset.offset, INT_TYPES) else -1
89+
processor().track_kafka_commit(instance._group_id, offset.topic, offset.partition, reported_offset, time.time())
90+
2791

2892
if config._data_streams_enabled:
2993
core.on("kafka.produce.start", dsm_kafka_message_produce)
3094
core.on("kafka.consume.start", dsm_kafka_message_consume)
95+
core.on("kafka.commit.start", dsm_kafka_message_commit)

ddtrace/internal/datastreams/processor.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# coding: utf-8
22
import base64
33
from collections import defaultdict
4+
from functools import partial
45
import gzip
56
import os
67
import struct
@@ -20,6 +21,7 @@
2021

2122
import ddtrace
2223
from ddtrace import config
24+
from ddtrace.internal.atexit import register_on_exit_signal
2325
from ddtrace.internal.utils.retry import fibonacci_backoff_with_jitter
2426

2527
from .._encoding import packb
@@ -68,6 +70,7 @@ def gzip_compress(payload):
6870

6971
PROPAGATION_KEY = "dd-pathway-ctx"
7072
PROPAGATION_KEY_BASE_64 = "dd-pathway-ctx-base64"
73+
SHUTDOWN_TIMEOUT = 5
7174

7275
"""
7376
PathwayAggrKey uniquely identifies a pathway to aggregate stats on.
@@ -135,6 +138,7 @@ def __init__(self, agent_url, interval=None, timeout=1.0, retry_attempts=3):
135138
initial_wait=0.618 * self.interval / (1.618 ** retry_attempts) / 2,
136139
)(self._flush_stats)
137140

141+
register_on_exit_signal(partial(_atexit, obj=self))
138142
self.start()
139143

140144
def on_checkpoint_creation(
@@ -442,3 +446,14 @@ def set_checkpoint(self, tags, now_sec=None, edge_start_sec_override=None, pathw
442446
self.processor.on_checkpoint_creation(
443447
hash_value, parent_hash, tags, now_sec, edge_latency_sec, pathway_latency_sec
444448
)
449+
450+
451+
def _atexit(obj=None):
452+
try:
453+
# Data streams tries to flush data on shutdown.
454+
# Adding a try except here to ensure we don't crash the application if the agent is killed before
455+
# the application for example.
456+
obj.shutdown(SHUTDOWN_TIMEOUT)
457+
except Exception as e:
458+
if config._data_streams_enabled:
459+
log.warning("Failed to shutdown data streams processor: %s", repr(e))

ddtrace/tracer.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,14 +1007,6 @@ def shutdown(self, timeout=None):
10071007
"""
10081008
with self._shutdown_lock:
10091009
# Thread safety: Ensures tracer is shutdown synchronously
1010-
try:
1011-
# Data streams tries to flush data on shutdown.
1012-
# Adding a try except here to ensure we don't crash the application if the agent is killed before
1013-
# the application for example.
1014-
self.data_streams_processor.shutdown(timeout)
1015-
except Exception as e:
1016-
if config._data_streams_enabled:
1017-
log.warning("Failed to shutdown data streams processor: %s", repr(e))
10181010
span_processors = self._span_processors
10191011
deferred_processors = self._deferred_processors
10201012
self._span_processors = []

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ exclude-modules = '''
154154
| ddtrace.appsec._iast._ast.aspects
155155
| ddtrace.appsec._iast._taint_utils
156156
| ddtrace.appsec._iast.taint_sinks.sql_injection
157+
# DSM specific contribs
158+
| ddtrace.internal.datastreams.kafka
157159
)
158160
'''
159161

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
fixes:
2+
- |
3+
DSM: fix off-by-one metric issue and error where statistics weren't calculated when the core API was used.

0 commit comments

Comments
 (0)