Skip to content

Commit 496676b

Browse files
authored
chore(kafka): use core api for data streams monitoring [backport 1.20] (#7130)
This commit swaps out all remaining DSM code in the kafka integration to use the core API. This commit also contains the addition of two tests and fixes to the commit wrapper which were required to safely transition. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that.
1 parent cd73b13 commit 496676b

File tree

10 files changed

+286
-74
lines changed

10 files changed

+286
-74
lines changed

ddtrace/contrib/kafka/patch.py

Lines changed: 5 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
import time
2-
31
import confluent_kafka
4-
from confluent_kafka import TopicPartition
52

63
from ddtrace import config
74
from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY
@@ -20,7 +17,6 @@
2017
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
2118
from ddtrace.internal.utils import ArgumentError
2219
from ddtrace.internal.utils import get_argument_value
23-
from ddtrace.internal.utils import set_argument_value
2420
from ddtrace.internal.utils.formats import asbool
2521
from ddtrace.pin import Pin
2622

@@ -121,37 +117,7 @@ def traced_produce(func, instance, args, kwargs):
121117
value = None
122118
message_key = kwargs.get("key", "")
123119
partition = kwargs.get("partition", -1)
124-
if config._data_streams_enabled:
125-
# inject data streams context
126-
core.dispatch("kafka.produce.start", [instance, args, kwargs])
127-
128-
on_delivery_kwarg = "on_delivery"
129-
on_delivery_arg = 5
130-
on_delivery = None
131-
try:
132-
on_delivery = get_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg)
133-
except ArgumentError:
134-
on_delivery_kwarg = "callback"
135-
on_delivery_arg = 4
136-
try:
137-
on_delivery = get_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg)
138-
except ArgumentError:
139-
on_delivery = None
140-
141-
def wrapped_callback(err, msg):
142-
if err is None:
143-
if pin.tracer.data_streams_processor:
144-
pin.tracer.data_streams_processor.track_kafka_produce(
145-
msg.topic(), msg.partition(), msg.offset() or -1, time.time()
146-
)
147-
if on_delivery is not None:
148-
on_delivery(err, msg)
149-
150-
try:
151-
args, kwargs = set_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg, wrapped_callback)
152-
except ArgumentError:
153-
# we set the callback even if it's not set by the client, to track produce calls correctly.
154-
kwargs[on_delivery_kwarg] = wrapped_callback
120+
core.dispatch("kafka.produce.start", [instance, args, kwargs])
155121

156122
with pin.tracer.trace(
157123
schematize_messaging_operation(kafkax.PRODUCE, provider="kafka", direction=SpanDirection.OUTBOUND),
@@ -191,15 +157,8 @@ def traced_poll(func, instance, args, kwargs):
191157
span.set_tag_str(kafkax.RECEIVED_MESSAGE, str(message is not None))
192158
span.set_tag_str(kafkax.GROUP_ID, instance._group_id)
193159
if message is not None:
194-
if config._data_streams_enabled:
195-
core.set_item("kafka_topic", message.topic())
196-
core.dispatch("kafka.consume.start", [instance, message])
197-
if instance._auto_commit:
198-
# it's not exactly true, but if auto commit is enabled, we consider that a message is acknowledged
199-
# when it's read.
200-
pin.tracer.data_streams_processor.track_kafka_commit(
201-
instance._group_id, message.topic(), message.partition(), message.offset() or -1, time.time()
202-
)
160+
core.set_item("kafka_topic", message.topic())
161+
core.dispatch("kafka.consume.start", [instance, message])
203162

204163
message_key = message.key() or ""
205164
message_offset = message.offset() or -1
@@ -220,17 +179,6 @@ def traced_commit(func, instance, args, kwargs):
220179
if not pin or not pin.enabled():
221180
return func(*args, **kwargs)
222181

223-
if config._data_streams_enabled:
224-
message = get_argument_value(args, kwargs, 0, "message", True)
225-
# message and offset are mutually exclusive. Only one parameter can be passed.
226-
if message is not None:
227-
offsets = [TopicPartition(message.topic(), message.partition(), offset=message.offset())]
228-
else:
229-
offsets = get_argument_value(args, kwargs, 1, "offsets", True)
230-
231-
if offsets:
232-
for offset in offsets:
233-
pin.tracer.data_streams_processor.track_kafka_commit(
234-
instance._group_id, offset.topic, offset.partition, offset.offset or -1, time.time()
235-
)
182+
core.dispatch("kafka.commit.start", [instance, args, kwargs])
183+
236184
return func(*args, **kwargs)
Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
1-
from . import kafka # noqa:F401
1+
from ddtrace import config
2+
from ddtrace.internal import agent
23

4+
from ...internal.utils.importlib import require_modules
35

6+
7+
required_modules = ["confluent_kafka"]
48
_processor = None
59

10+
if config._data_streams_enabled:
11+
with require_modules(required_modules) as missing_modules:
12+
if not missing_modules:
13+
from . import kafka # noqa:F401
614

7-
def data_streams_processor():
8-
from . import processor
915

16+
def data_streams_processor():
1017
global _processor
11-
if not _processor:
12-
_processor = processor.DataStreamsProcessor()
18+
if config._data_streams_enabled and not _processor:
19+
from . import processor
20+
21+
_processor = processor.DataStreamsProcessor(agent.get_trace_url())
1322

1423
return _processor

ddtrace/internal/datastreams/kafka.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
1+
import time
2+
3+
from confluent_kafka import TopicPartition
4+
import six
5+
16
from ddtrace import config
27
from ddtrace.internal import core
38
from ddtrace.internal.datastreams.processor import PROPAGATION_KEY
9+
from ddtrace.internal.utils import ArgumentError
10+
from ddtrace.internal.utils import get_argument_value
11+
from ddtrace.internal.utils import set_argument_value
12+
13+
14+
INT_TYPES = six.integer_types
415

516

617
def dsm_kafka_message_produce(instance, args, kwargs):
@@ -13,6 +24,29 @@ def dsm_kafka_message_produce(instance, args, kwargs):
1324
headers[PROPAGATION_KEY] = encoded_pathway
1425
kwargs["headers"] = headers
1526

27+
on_delivery_kwarg = "on_delivery"
28+
on_delivery_arg = 5
29+
on_delivery = None
30+
try:
31+
on_delivery = get_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg)
32+
except ArgumentError:
33+
on_delivery_kwarg = "callback"
34+
on_delivery_arg = 4
35+
on_delivery = get_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg, optional=True)
36+
37+
def wrapped_callback(err, msg):
38+
if err is None:
39+
reported_offset = msg.offset() if isinstance(msg.offset(), INT_TYPES) else -1
40+
processor().track_kafka_produce(msg.topic(), msg.partition(), reported_offset, time.time())
41+
if on_delivery is not None:
42+
on_delivery(err, msg)
43+
44+
try:
45+
args, kwargs = set_argument_value(args, kwargs, on_delivery_arg, on_delivery_kwarg, wrapped_callback)
46+
except ArgumentError:
47+
# we set the callback even if it's not set by the client, to track produce calls correctly.
48+
kwargs[on_delivery_kwarg] = wrapped_callback
49+
1650

1751
def dsm_kafka_message_consume(instance, message):
1852
from . import data_streams_processor as processor
@@ -24,7 +58,39 @@ def dsm_kafka_message_consume(instance, message):
2458
ctx = processor().decode_pathway(headers.get(PROPAGATION_KEY, None))
2559
ctx.set_checkpoint(["direction:in", "group:" + group, "topic:" + topic, "type:kafka"])
2660

61+
if instance._auto_commit:
62+
# it's not exactly true, but if auto commit is enabled, we consider that a message is acknowledged
63+
# when it's read.
64+
reported_offset = message.offset() if isinstance(message.offset(), INT_TYPES) else -1
65+
processor().track_kafka_commit(
66+
instance._group_id, message.topic(), message.partition(), reported_offset, time.time()
67+
)
68+
69+
70+
def dsm_kafka_message_commit(instance, args, kwargs):
71+
from . import data_streams_processor as processor
72+
73+
message = get_argument_value(args, kwargs, 0, "message", optional=True)
74+
75+
offsets = []
76+
if message is not None:
77+
# We need to add one to message offsets to make them mean the same thing as offsets
78+
# passed in by the offsets keyword
79+
reported_offset = message.offset() + 1 if isinstance(message.offset(), INT_TYPES) else -1
80+
offsets = [TopicPartition(message.topic(), message.partition(), reported_offset)]
81+
else:
82+
offsets = get_argument_value(args, kwargs, 1, "offsets", True) or []
83+
84+
for offset in offsets:
85+
# When offsets is passed in as an arg, its an exact value for the next expected message.
86+
# When message is passed in Kafka reports msg.offset() + 1. We add +1 above to message
87+
# offsets to make them mean the same thing as passed in offsets, then subtract 1 universally
88+
# here from both
89+
reported_offset = offset.offset - 1 if isinstance(offset.offset, INT_TYPES) else -1
90+
processor().track_kafka_commit(instance._group_id, offset.topic, offset.partition, reported_offset, time.time())
91+
2792

2893
if config._data_streams_enabled:
2994
core.on("kafka.produce.start", dsm_kafka_message_produce)
3095
core.on("kafka.consume.start", dsm_kafka_message_consume)
96+
core.on("kafka.commit.start", dsm_kafka_message_commit)

ddtrace/internal/datastreams/processor.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# coding: utf-8
22
import base64
33
from collections import defaultdict
4+
from functools import partial
45
import gzip
56
import os
67
import struct
@@ -20,6 +21,7 @@
2021

2122
import ddtrace
2223
from ddtrace import config
24+
from ddtrace.internal.atexit import register_on_exit_signal
2325
from ddtrace.internal.utils.retry import fibonacci_backoff_with_jitter
2426

2527
from .._encoding import packb
@@ -68,6 +70,7 @@ def gzip_compress(payload):
6870

6971
PROPAGATION_KEY = "dd-pathway-ctx"
7072
PROPAGATION_KEY_BASE_64 = "dd-pathway-ctx-base64"
73+
SHUTDOWN_TIMEOUT = 5
7174

7275
"""
7376
PathwayAggrKey uniquely identifies a pathway to aggregate stats on.
@@ -135,6 +138,7 @@ def __init__(self, agent_url, interval=None, timeout=1.0, retry_attempts=3):
135138
initial_wait=0.618 * self.interval / (1.618 ** retry_attempts) / 2,
136139
)(self._flush_stats)
137140

141+
register_on_exit_signal(partial(_atexit, obj=self))
138142
self.start()
139143

140144
def on_checkpoint_creation(
@@ -442,3 +446,14 @@ def set_checkpoint(self, tags, now_sec=None, edge_start_sec_override=None, pathw
442446
self.processor.on_checkpoint_creation(
443447
hash_value, parent_hash, tags, now_sec, edge_latency_sec, pathway_latency_sec
444448
)
449+
450+
451+
def _atexit(obj=None):
452+
try:
453+
# Data streams tries to flush data on shutdown.
454+
# Adding a try except here to ensure we don't crash the application if the agent is killed before
455+
# the application for example.
456+
obj.shutdown(SHUTDOWN_TIMEOUT)
457+
except Exception as e:
458+
if config._data_streams_enabled:
459+
log.warning("Failed to shutdown data streams processor: %s", repr(e))

ddtrace/tracer.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,14 +1028,6 @@ def shutdown(self, timeout=None):
10281028
"""
10291029
with self._shutdown_lock:
10301030
# Thread safety: Ensures tracer is shutdown synchronously
1031-
try:
1032-
# Data streams tries to flush data on shutdown.
1033-
# Adding a try except here to ensure we don't crash the application if the agent is killed before
1034-
# the application for example.
1035-
self.data_streams_processor.shutdown(timeout)
1036-
except Exception as e:
1037-
if config._data_streams_enabled:
1038-
log.warning("Failed to shutdown data streams processor: %s", repr(e))
10391031
span_processors = self._span_processors
10401032
deferred_processors = self._deferred_processors
10411033
self._span_processors = []

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ exclude-modules = '''
167167
| ddtrace.appsec.iast._ast.aspects
168168
| ddtrace.appsec.iast._taint_utils
169169
| ddtrace.appsec.iast.taint_sinks.sql_injection
170+
# DSM specific contribs
171+
| ddtrace.internal.datastreams.kafka
170172
)
171173
'''
172174

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
fixes:
2+
- |
3+
DSM: fix off-by-one metric issue and error where statistics weren't calculated when the core API was used.

0 commit comments

Comments
 (0)