Skip to content

Commit d345d35

Browse files
fix(kafka): avoid overriding methods in Producers and Consumers [backport 2.0] (#7007)
1 parent d4e5161 commit d345d35

File tree

7 files changed

+252
-15
lines changed

7 files changed

+252
-15
lines changed

ddtrace/contrib/kafka/patch.py

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@ def __init__(self, config, *args, **kwargs):
5555
else config.get("metadata.broker.list")
5656
)
5757

58-
def produce(self, topic, value=None, *args, **kwargs):
59-
super(TracedProducer, self).produce(topic, value, *args, **kwargs)
60-
6158
# in older versions of confluent_kafka, bool(Producer()) evaluates to False,
6259
# which makes the Pin functionality ignore it.
6360
def __bool__(self):
@@ -72,12 +69,6 @@ def __init__(self, config, *args, **kwargs):
7269
self._group_id = config.get("group.id", "")
7370
self._auto_commit = asbool(config.get("enable.auto.commit", True))
7471

75-
def poll(self, timeout=None):
76-
return super(TracedConsumer, self).poll(timeout)
77-
78-
def commit(self, message=None, *args, **kwargs):
79-
return super(TracedConsumer, self).commit(message, args, kwargs)
80-
8172

8273
def patch():
8374
if getattr(confluent_kafka, "_datadog_patch", False):
@@ -230,12 +221,16 @@ def traced_commit(func, instance, args, kwargs):
230221
return func(*args, **kwargs)
231222

232223
if config._data_streams_enabled:
233-
message = get_argument_value(args, kwargs, 0, "message")
234-
offsets = kwargs.get("offsets", [])
224+
message = get_argument_value(args, kwargs, 0, "message", True)
225+
# message and offset are mutually exclusive. Only one parameter can be passed.
235226
if message is not None:
236227
offsets = [TopicPartition(message.topic(), message.partition(), offset=message.offset())]
237-
for offset in offsets:
238-
pin.tracer.data_streams_processor.track_kafka_commit(
239-
instance._group_id, offset.topic, offset.partition, offset.offset or -1, time.time()
240-
)
228+
else:
229+
offsets = get_argument_value(args, kwargs, 1, "offsets", True)
230+
231+
if offsets:
232+
for offset in offsets:
233+
pin.tracer.data_streams_processor.track_kafka_commit(
234+
instance._group_id, offset.topic, offset.partition, offset.offset or -1, time.time()
235+
)
241236
return func(*args, **kwargs)

ddtrace/internal/utils/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def get_argument_value(
1919
kwargs, # type: Dict[str, Any]
2020
pos, # type: int
2121
kw, # type: str
22+
optional=False, # type: bool
2223
):
2324
# type: (...) -> Optional[Any]
2425
"""
@@ -41,6 +42,8 @@ def get_argument_value(
4142
try:
4243
return args[pos]
4344
except IndexError:
45+
if optional:
46+
return None
4447
raise ArgumentError("%s (at position %d)" % (kw, pos))
4548

4649

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
kafka: Fixes ``ValueError`` raised when ``Consumer.commit(offsets=...)`` is called.

tests/contrib/kafka/test_kafka.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import confluent_kafka
66
from confluent_kafka import KafkaException
7+
from confluent_kafka import TopicPartition
78
from confluent_kafka import admin as kafka_admin
89
import mock
910
import pytest
@@ -190,6 +191,36 @@ def test_message(producer, consumer, tombstone, kafka_topic):
190191
message = consumer.poll(1.0)
191192

192193

194+
@pytest.mark.snapshot(ignores=["metrics.kafka.message_offset"])
195+
def test_commit(producer, consumer, kafka_topic):
196+
producer.produce(kafka_topic, PAYLOAD, key=KEY)
197+
producer.flush()
198+
message = None
199+
while message is None:
200+
message = consumer.poll(1.0)
201+
consumer.commit(message)
202+
203+
204+
@pytest.mark.snapshot(ignores=["metrics.kafka.message_offset"])
205+
def test_commit_with_offset(producer, consumer, kafka_topic):
206+
producer.produce(kafka_topic, PAYLOAD, key=KEY)
207+
producer.flush()
208+
message = None
209+
while message is None:
210+
message = consumer.poll(1.0)
211+
consumer.commit(offsets=[TopicPartition(kafka_topic)])
212+
213+
214+
@pytest.mark.snapshot(ignores=["metrics.kafka.message_offset"])
215+
def test_commit_with_only_async_arg(producer, consumer, kafka_topic):
216+
producer.produce(kafka_topic, PAYLOAD, key=KEY)
217+
producer.flush()
218+
message = None
219+
while message is None:
220+
message = consumer.poll(1.0)
221+
consumer.commit(asynchronous=False)
222+
223+
193224
@pytest.mark.snapshot(
194225
token="tests.contrib.kafka.test_kafka.test_service_override", ignores=["metrics.kafka.message_offset"]
195226
)
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
[[
2+
{
3+
"name": "kafka.consume",
4+
"service": "kafka",
5+
"resource": "kafka.consume",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "worker",
10+
"meta": {
11+
"_dd.base_service": "",
12+
"_dd.p.dm": "-0",
13+
"component": "kafka",
14+
"kafka.group_id": "test_group",
15+
"kafka.message_key": "test_key",
16+
"kafka.received_message": "True",
17+
"kafka.tombstone": "False",
18+
"kafka.topic": "test_commit",
19+
"language": "python",
20+
"messaging.system": "kafka",
21+
"runtime-id": "4cf20234df69485781c43482565d9d82",
22+
"span.kind": "consumer"
23+
},
24+
"metrics": {
25+
"_dd.measured": 1,
26+
"_dd.top_level": 1,
27+
"_dd.tracer_kr": 1.0,
28+
"_sampling_priority_v1": 1,
29+
"kafka.message_offset": -1,
30+
"kafka.partition": 0,
31+
"process_id": 222
32+
},
33+
"duration": 187877000,
34+
"start": 1695310282564811000
35+
}],
36+
[
37+
{
38+
"name": "kafka.produce",
39+
"service": "kafka",
40+
"resource": "kafka.produce",
41+
"trace_id": 1,
42+
"span_id": 1,
43+
"parent_id": 0,
44+
"type": "worker",
45+
"meta": {
46+
"_dd.base_service": "",
47+
"_dd.p.dm": "-0",
48+
"component": "kafka",
49+
"kafka.message_key": "test_key",
50+
"kafka.tombstone": "False",
51+
"kafka.topic": "test_commit",
52+
"language": "python",
53+
"messaging.kafka.bootstrap.servers": "localhost:29092",
54+
"messaging.system": "kafka",
55+
"runtime-id": "4cf20234df69485781c43482565d9d82",
56+
"span.kind": "producer"
57+
},
58+
"metrics": {
59+
"_dd.measured": 1,
60+
"_dd.top_level": 1,
61+
"_dd.tracer_kr": 1.0,
62+
"_sampling_priority_v1": 1,
63+
"kafka.partition": -1,
64+
"process_id": 222
65+
},
66+
"duration": 135000,
67+
"start": 1695310277502282000
68+
}]]
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
[[
2+
{
3+
"name": "kafka.consume",
4+
"service": "kafka",
5+
"resource": "kafka.consume",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "worker",
10+
"meta": {
11+
"_dd.base_service": "",
12+
"_dd.p.dm": "-0",
13+
"component": "kafka",
14+
"kafka.group_id": "test_group",
15+
"kafka.message_key": "test_key",
16+
"kafka.received_message": "True",
17+
"kafka.tombstone": "False",
18+
"kafka.topic": "test_commit_with_offset",
19+
"language": "python",
20+
"messaging.system": "kafka",
21+
"runtime-id": "9a7c9425ae8745c1bb8d35d2514f8958",
22+
"span.kind": "consumer"
23+
},
24+
"metrics": {
25+
"_dd.measured": 1,
26+
"_dd.top_level": 1,
27+
"_dd.tracer_kr": 1.0,
28+
"_sampling_priority_v1": 1,
29+
"kafka.message_offset": -1,
30+
"kafka.partition": 0,
31+
"process_id": 6340
32+
},
33+
"duration": 307084000,
34+
"start": 1695311385900323000
35+
}],
36+
[
37+
{
38+
"name": "kafka.produce",
39+
"service": "kafka",
40+
"resource": "kafka.produce",
41+
"trace_id": 1,
42+
"span_id": 1,
43+
"parent_id": 0,
44+
"type": "worker",
45+
"meta": {
46+
"_dd.base_service": "",
47+
"_dd.p.dm": "-0",
48+
"component": "kafka",
49+
"kafka.message_key": "test_key",
50+
"kafka.tombstone": "False",
51+
"kafka.topic": "test_commit_with_offset",
52+
"language": "python",
53+
"messaging.kafka.bootstrap.servers": "localhost:29092",
54+
"messaging.system": "kafka",
55+
"runtime-id": "9a7c9425ae8745c1bb8d35d2514f8958",
56+
"span.kind": "producer"
57+
},
58+
"metrics": {
59+
"_dd.measured": 1,
60+
"_dd.top_level": 1,
61+
"_dd.tracer_kr": 1.0,
62+
"_sampling_priority_v1": 1,
63+
"kafka.partition": -1,
64+
"process_id": 6340
65+
},
66+
"duration": 99000,
67+
"start": 1695311382892915000
68+
}]]
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
[[
2+
{
3+
"name": "kafka.consume",
4+
"service": "kafka",
5+
"resource": "kafka.consume",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "worker",
10+
"meta": {
11+
"_dd.base_service": "",
12+
"_dd.p.dm": "-0",
13+
"component": "kafka",
14+
"kafka.group_id": "test_group",
15+
"kafka.message_key": "test_key",
16+
"kafka.received_message": "True",
17+
"kafka.tombstone": "False",
18+
"kafka.topic": "test_commit_with_only_async_arg",
19+
"language": "python",
20+
"messaging.system": "kafka",
21+
"runtime-id": "15fa67b4ad1e498aa489305814d88a75",
22+
"span.kind": "consumer"
23+
},
24+
"metrics": {
25+
"_dd.measured": 1,
26+
"_dd.top_level": 1,
27+
"_dd.tracer_kr": 1.0,
28+
"_sampling_priority_v1": 1,
29+
"kafka.message_offset": -1,
30+
"kafka.partition": 0,
31+
"process_id": 35657
32+
},
33+
"duration": 343409000,
34+
"start": 1695323048560969000
35+
}],
36+
[
37+
{
38+
"name": "kafka.produce",
39+
"service": "kafka",
40+
"resource": "kafka.produce",
41+
"trace_id": 1,
42+
"span_id": 1,
43+
"parent_id": 0,
44+
"type": "worker",
45+
"meta": {
46+
"_dd.base_service": "",
47+
"_dd.p.dm": "-0",
48+
"component": "kafka",
49+
"kafka.message_key": "test_key",
50+
"kafka.tombstone": "False",
51+
"kafka.topic": "test_commit_with_only_async_arg",
52+
"language": "python",
53+
"messaging.kafka.bootstrap.servers": "localhost:29092",
54+
"messaging.system": "kafka",
55+
"runtime-id": "15fa67b4ad1e498aa489305814d88a75",
56+
"span.kind": "producer"
57+
},
58+
"metrics": {
59+
"_dd.measured": 1,
60+
"_dd.top_level": 1,
61+
"_dd.tracer_kr": 1.0,
62+
"_sampling_priority_v1": 1,
63+
"kafka.partition": -1,
64+
"process_id": 35657
65+
},
66+
"duration": 280000,
67+
"start": 1695323045564363000
68+
}]]

0 commit comments

Comments
 (0)