Skip to content

Commit 29a884d

Browse files
authored
fix(kafka): trace using subclass of producer/consumer, not objectproxy [Backport #5545 to 1.12] (#5661)
Backport #5545 to 1.12. Fixes #5494. Previously, we patched `confluent-kafka` by wrapping the `Producer/Consumer` classes with an ObjectProxy `TracedProducer/TracedConsumer` class. However, since `confluent-kafka`'s `Producer/Consumer` classes are implemented in C and have strictly defined slots, our traced classes break for subclasses such as `AvroProducer/AvroConsumer`. This PR fixes the issue by basing our traced classes from `confluent-kafka`'s base `Producer/Consumer` classes directly instead of using an ObjectProxy. Local testing shows that the bug from #5494 is resolved. Before: ``` >>> from ddtrace import patch_all >>> patch_all() >>> from confluent_kafka.avro import AvroConsumer >>> AvroConsumer({"group.id": 12345}, "http://localhost:8080") <stdin>:1: DeprecationWarning: AvroConsumer has been deprecated. Use AvroDeserializer instead. Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/yun.kim/go/src/github.com/DataDog/dd-trace-py/.riot/venv_py3105_mock_pytest_pytest-mock_coverage_pytest-cov_opentracing_hypothesis6451_confluent-kafka/lib/python3.10/site-packages/confluent_kafka/avro/__init__.py", line 161, in __init__ self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema) AttributeError: 'cimpl.Consumer' object has no attribute '_serializer' ``` After: ``` >>> from ddtrace import patch_all >>> patch_all() >>> from confluent_kafka.avro import AvroConsumer >>> AvroConsumer({"group.id": 12345}, "http://localhost:8080") <stdin>:1: DeprecationWarning: AvroConsumer has been deprecated. Use AvroDeserializer instead. <confluent_kafka.avro.AvroConsumer object at 0x10479a680> ``` ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines) are followed. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] PR description includes explicit acknowledgement/acceptance of the performance implications of this PR as reported in the benchmarks PR comment. ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment.
1 parent c296a27 commit 29a884d

File tree

4 files changed

+101
-91
lines changed

4 files changed

+101
-91
lines changed

ddtrace/contrib/kafka/patch.py

Lines changed: 87 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from ddtrace.internal.utils import ArgumentError
1515
from ddtrace.internal.utils import get_argument_value
1616
from ddtrace.pin import Pin
17-
from ddtrace.vendor.wrapt import ObjectProxy
1817

1918

2019
_Producer = confluent_kafka.Producer
@@ -27,43 +26,9 @@
2726
)
2827

2928

30-
class TracedProducer(ObjectProxy):
31-
def __init__(self, *args, **kwargs):
32-
producer = _Producer(*args, **kwargs)
33-
super(TracedProducer, self).__init__(producer)
34-
Pin().onto(self)
35-
36-
def produce(self, *args, **kwargs):
37-
func = self.__wrapped__.produce
38-
topic = get_argument_value(args, kwargs, 0, "topic")
39-
try:
40-
value = get_argument_value(args, kwargs, 1, "value")
41-
except ArgumentError:
42-
value = None
43-
message_key = kwargs.get("key", "")
44-
partition = kwargs.get("partition", -1)
45-
46-
pin = Pin.get_from(self)
47-
if not pin or not pin.enabled():
48-
return func(*args, **kwargs)
49-
50-
with pin.tracer.trace(
51-
kafkax.PRODUCE,
52-
service=trace_utils.ext_service(pin, config.kafka),
53-
span_type=SpanTypes.WORKER,
54-
) as span:
55-
span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE)
56-
span.set_tag_str(COMPONENT, config.kafka.integration_name)
57-
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
58-
span.set_tag_str(kafkax.TOPIC, topic)
59-
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key))
60-
span.set_tag(kafkax.PARTITION, partition)
61-
span.set_tag_str(kafkax.TOMBSTONE, str(value is None))
62-
span.set_tag(SPAN_MEASURED_KEY)
63-
rate = config.kafka.get_analytics_sample_rate()
64-
if rate is not None:
65-
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate)
66-
return func(*args, **kwargs)
29+
class TracedProducer(confluent_kafka.Producer):
30+
def produce(self, topic, value=None, *args, **kwargs):
31+
super(TracedProducer, self).produce(topic, value, *args, **kwargs)
6732

6833
# in older versions of confluent_kafka, bool(Producer()) evaluates to False,
6934
# which makes the Pin functionality ignore it.
@@ -73,60 +38,100 @@ def __bool__(self):
7338
__nonzero__ = __bool__
7439

7540

76-
class TracedConsumer(ObjectProxy):
77-
78-
__slots__ = "_group_id"
79-
80-
def __init__(self, *args, **kwargs):
81-
consumer = _Consumer(*args, **kwargs)
82-
super(TracedConsumer, self).__init__(consumer)
83-
self._group_id = get_argument_value(args, kwargs, 0, "config")["group.id"]
84-
Pin().onto(self)
85-
86-
def poll(self, *args, **kwargs):
87-
func = self.__wrapped__.poll
88-
pin = Pin.get_from(self)
89-
if not pin or not pin.enabled():
90-
return func(*args, **kwargs)
91-
92-
with pin.tracer.trace(
93-
kafkax.CONSUME,
94-
service=trace_utils.ext_service(pin, config.kafka),
95-
span_type=SpanTypes.WORKER,
96-
) as span:
97-
message = func(*args, **kwargs)
98-
span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE)
99-
span.set_tag_str(COMPONENT, config.kafka.integration_name)
100-
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
101-
span.set_tag_str(kafkax.RECEIVED_MESSAGE, str(message is not None))
102-
span.set_tag_str(kafkax.GROUP_ID, self._group_id)
103-
if message is not None:
104-
message_key = message.key() or ""
105-
message_offset = message.offset() or -1
106-
span.set_tag_str(kafkax.TOPIC, message.topic())
107-
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key))
108-
span.set_tag(kafkax.PARTITION, message.partition())
109-
span.set_tag_str(kafkax.TOMBSTONE, str(len(message) == 0))
110-
span.set_tag(kafkax.MESSAGE_OFFSET, message_offset)
111-
span.set_tag(SPAN_MEASURED_KEY)
112-
rate = config.kafka.get_analytics_sample_rate()
113-
if rate is not None:
114-
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate)
115-
return message
41+
class TracedConsumer(confluent_kafka.Consumer):
42+
def __init__(self, config):
43+
super(TracedConsumer, self).__init__(config)
44+
self._group_id = config["group.id"]
45+
46+
def poll(self, timeout=1):
47+
return super(TracedConsumer, self).poll(timeout)
11648

11749

11850
def patch():
11951
if getattr(confluent_kafka, "_datadog_patch", False):
12052
return
12153
setattr(confluent_kafka, "_datadog_patch", True)
12254

123-
setattr(confluent_kafka, "Producer", TracedProducer)
124-
setattr(confluent_kafka, "Consumer", TracedConsumer)
55+
confluent_kafka.Producer = TracedProducer
56+
confluent_kafka.Consumer = TracedConsumer
57+
58+
trace_utils.wrap(TracedProducer, "produce", traced_produce)
59+
trace_utils.wrap(TracedConsumer, "poll", traced_poll)
60+
Pin().onto(confluent_kafka.Producer)
61+
Pin().onto(confluent_kafka.Consumer)
12562

12663

12764
def unpatch():
12865
if getattr(confluent_kafka, "_datadog_patch", False):
12966
setattr(confluent_kafka, "_datadog_patch", False)
13067

131-
setattr(confluent_kafka, "Producer", _Producer)
132-
setattr(confluent_kafka, "Consumer", _Consumer)
68+
if trace_utils.iswrapped(TracedProducer.produce):
69+
trace_utils.unwrap(TracedProducer, "produce")
70+
if trace_utils.iswrapped(TracedConsumer.poll):
71+
trace_utils.unwrap(TracedConsumer, "poll")
72+
73+
confluent_kafka.Producer = _Producer
74+
confluent_kafka.Consumer = _Consumer
75+
76+
77+
def traced_produce(func, instance, args, kwargs):
78+
pin = Pin.get_from(instance)
79+
if not pin or not pin.enabled():
80+
return func(*args, **kwargs)
81+
82+
topic = get_argument_value(args, kwargs, 0, "topic")
83+
try:
84+
value = get_argument_value(args, kwargs, 1, "value")
85+
except ArgumentError:
86+
value = None
87+
message_key = kwargs.get("key", "")
88+
partition = kwargs.get("partition", -1)
89+
90+
with pin.tracer.trace(
91+
kafkax.PRODUCE,
92+
service=trace_utils.ext_service(pin, config.kafka),
93+
span_type=SpanTypes.WORKER,
94+
) as span:
95+
span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE)
96+
span.set_tag_str(COMPONENT, config.kafka.integration_name)
97+
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
98+
span.set_tag_str(kafkax.TOPIC, topic)
99+
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key))
100+
span.set_tag(kafkax.PARTITION, partition)
101+
span.set_tag_str(kafkax.TOMBSTONE, str(value is None))
102+
span.set_tag(SPAN_MEASURED_KEY)
103+
rate = config.kafka.get_analytics_sample_rate()
104+
if rate is not None:
105+
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate)
106+
return func(*args, **kwargs)
107+
108+
109+
def traced_poll(func, instance, args, kwargs):
110+
pin = Pin.get_from(instance)
111+
if not pin or not pin.enabled():
112+
return func(*args, **kwargs)
113+
114+
with pin.tracer.trace(
115+
kafkax.CONSUME,
116+
service=trace_utils.ext_service(pin, config.kafka),
117+
span_type=SpanTypes.WORKER,
118+
) as span:
119+
message = func(*args, **kwargs)
120+
span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE)
121+
span.set_tag_str(COMPONENT, config.kafka.integration_name)
122+
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
123+
span.set_tag_str(kafkax.RECEIVED_MESSAGE, str(message is not None))
124+
span.set_tag_str(kafkax.GROUP_ID, instance._group_id)
125+
if message is not None:
126+
message_key = message.key() or ""
127+
message_offset = message.offset() or -1
128+
span.set_tag_str(kafkax.TOPIC, message.topic())
129+
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key))
130+
span.set_tag(kafkax.PARTITION, message.partition())
131+
span.set_tag_str(kafkax.TOMBSTONE, str(len(message) == 0))
132+
span.set_tag(kafkax.MESSAGE_OFFSET, message_offset)
133+
span.set_tag(SPAN_MEASURED_KEY)
134+
rate = config.kafka.get_analytics_sample_rate()
135+
if rate is not None:
136+
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate)
137+
return message

docs/spelling_wordlist.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ opensearch
136136
opentracer
137137
opentracing
138138
otel
139+
ObjectProxy
139140
parameterized
140141
perf
141142
pid
@@ -182,6 +183,7 @@ sqlite
182183
stacktrace
183184
starlette
184185
stringable
186+
subclass
185187
subdomains
186188
submodule
187189
submodules
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
fixes:
3+
- |
4+
kafka: Previously instantiating a subclass of kafka's Producer/Consumer classes would result in attribute errors
5+
due to patching the Producer/Consumer classes with an ObjectProxy. This fix resolves this issue by making the
6+
traced classes directly inherit from kafka's base Producer/Consumer classes.

tests/contrib/kafka/test_kafka_patch.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,14 @@ class TestKafkaPatch(PatchTestCase.Base):
99
__patch_func__ = patch
1010
__unpatch_func__ = unpatch
1111

12-
# DEV: normally, we directly patch methods, but since confluent-kafka's methods are implemented in C and
13-
# directly imported, we have to patch the Producer/Consumer classes via proxy Traced Producer/Consumer classes.
14-
# Because of this, we need to create instances of each proxy class to make wrapping status assertions.
1512
def assert_module_patched(self, confluent_kafka):
16-
self.assert_wrapped(confluent_kafka.Producer({}))
17-
self.assert_wrapped(confluent_kafka.Consumer({"group.id": "group_id"}))
13+
self.assert_wrapped(confluent_kafka.Producer({}).produce)
14+
self.assert_wrapped(confluent_kafka.Consumer({"group.id": "group_id"}).poll)
1815

1916
def assert_not_module_patched(self, confluent_kafka):
20-
self.assert_not_wrapped(confluent_kafka.Producer({}))
21-
self.assert_not_wrapped(confluent_kafka.Consumer({"group.id": "group_id"}))
17+
self.assert_not_wrapped(confluent_kafka.Producer({}).produce)
18+
self.assert_not_wrapped(confluent_kafka.Consumer({"group.id": "group_id"}).poll)
2219

2320
def assert_not_module_double_patched(self, confluent_kafka):
24-
self.assert_not_double_wrapped(confluent_kafka.Producer({}))
25-
self.assert_not_double_wrapped(confluent_kafka.Consumer({"group.id": "group_id"}))
21+
self.assert_not_double_wrapped(confluent_kafka.Producer({}).produce)
22+
self.assert_not_double_wrapped(confluent_kafka.Consumer({"group.id": "group_id"}).poll)

0 commit comments

Comments
 (0)