Skip to content

Commit aa3a538

Browse files
emmettbutlerYun-Kimmabdinur
authored
feat(kafka): add confluent_kafka integration (#5303)
This PR adds automatic instrumentation for [confluent_kafka](https://github.com/confluentinc/confluent-kafka-python), which is the Python client for Apache Kafka v0.8 and above. Currently, this integration provides a wrapper for the `confluent_kafka.Producer` and `confluent_kafka.Consumer` classes, specifically providing tracing for `confluent_kafka.Producer.produce()` and `confluent_kafka.Consumer.poll()`. This integration supports `confluent_kafka>=1.7.0`. Notes: - consumer group_id as well as consumer/producer client_id are not publicly available attributes in the `confluent_kafka` API. While the `message_key, partition, and tombstone` attributes are set in our instrumentation, the former two attributes are unavailable until the `confluent_kafka` API changes to publicly include them in the future. - When a traced producer creates a message with `Producer.produce(topic)`, but no partition is explicitly set, the resulting span will have a default partition of `-1` to signify that the partition will be set by the built-in partitioner. `Consumer poll()` spans with messages will have the correct partition set as the tag as that information is available directly from the message. - The traced `Consumer.poll()` span will have a boolean tag `kafka.received_message` to indicate if the `poll()` method resulted in a received message or not. If a message was received, then the span will include message-specific tags including `topic,partition,tombstone`. - The traced `Consumer.poll()` (if a message was received) and `Producer.produce()` spans will include a boolean `tombstone` tag to indicate whether or not the message was a tombstone (empty/null message value). ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines) are followed. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Author is aware of the performance implications of this PR as reported in the benchmarks PR comment. ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer is aware of, and discussed the performance implications of this PR as reported in the benchmarks PR comment. --------- Co-authored-by: Yun Kim <[email protected]> Co-authored-by: Yun Kim <[email protected]> Co-authored-by: Munir Abdinur <[email protected]>
1 parent e6adaaf commit aa3a538

22 files changed

+823
-0
lines changed

.circleci/config.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,6 +1084,15 @@ jobs:
10841084
pattern: "wsgi"
10851085
snapshot: true
10861086

1087+
kafka:
1088+
<<: *machine_executor
1089+
parallelism: 2
1090+
steps:
1091+
- run_test:
1092+
pattern: 'kafka'
1093+
snapshot: true
1094+
docker_services: 'kafka'
1095+
10871096
kombu:
10881097
<<: *contrib_job_small
10891098
docker:
@@ -1184,6 +1193,7 @@ requires_tests: &requires_tests
11841193
- vendor
11851194
- profile
11861195
- jinja2
1196+
- kafka
11871197
- kombu
11881198
- mako
11891199
- mariadb
@@ -1288,6 +1298,7 @@ workflows:
12881298
- vendor: *requires_base_venvs
12891299
- profile: *requires_base_venvs
12901300
- jinja2: *requires_base_venvs
1301+
- kafka: *requires_base_venvs
12911302
- kombu: *requires_base_venvs
12921303
- mako: *requires_base_venvs
12931304
- mariadb: *requires_base_venvs

.riot/requirements/e376a11.txt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#
2+
# This file is autogenerated by pip-compile with Python 3.10
3+
# by the following command:
4+
#
5+
# pip-compile --no-annotate --resolver=backtracking .riot/requirements/e376a11.in
6+
#
7+
attrs==22.2.0
8+
confluent-kafka==1.9.2
9+
coverage[toml]==6.5.0
10+
exceptiongroup==1.1.1
11+
hypothesis==6.45.0
12+
iniconfig==2.0.0
13+
mock==4.0.3
14+
opentracing==2.4.0
15+
packaging==23.0
16+
pluggy==1.0.0
17+
pytest==7.2.0
18+
pytest-cov==4.0.0
19+
pytest-mock==3.10.0
20+
sortedcontainers==2.4.0
21+
tomli==2.0.1

ddtrace/_monkey.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
"graphql": True,
4141
"grpc": True,
4242
"httpx": True,
43+
"kafka": True,
4344
"mongoengine": True,
4445
"mysql": True,
4546
"mysqldb": True,
@@ -119,6 +120,7 @@
119120
"vertica": ("vertica_python",),
120121
"aws_lambda": ("datadog_lambda",),
121122
"httplib": ("httplib" if PY2 else "http.client",),
123+
"kafka": ("confluent_kafka",),
122124
}
123125

124126
IAST_PATCH = {

ddtrace/contrib/kafka/__init__.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""
2+
This integration instruments the ``confluent-kafka<https://github.com/confluentinc/confluent-kafka-python>``
3+
library to trace event streaming.
4+
5+
Enabling
6+
~~~~~~~~
7+
8+
The kafka integration is enabled automatically when using
9+
:ref:`ddtrace-run <ddtracerun>` or :func:`patch_all() <ddtrace.patch_all>`.
10+
11+
Or use :func:`patch() <ddtrace.patch>` to manually enable the integration::
12+
13+
from ddtrace import patch
14+
patch(kafka=True)
15+
import confluent_kafka
16+
...
17+
18+
Global Configuration
19+
~~~~~~~~~~~~~~~~~~~~
20+
21+
.. py:data:: ddtrace.config.kafka["service"]
22+
23+
The service name reported by default for your kafka spans.
24+
25+
This option can also be set with the ``DD_KAFKA_SERVICE`` environment
26+
variable.
27+
28+
Default: ``"kafka"``
29+
30+
31+
To configure the kafka integration using the
32+
``Pin`` API::
33+
34+
from ddtrace import Pin
35+
from ddtrace import patch
36+
37+
# Make sure to patch before importing confluent_kafka
38+
patch(kafka=True)
39+
40+
import confluent_kafka
41+
42+
Pin.override(confluent_kafka, service="custom-service-name")
43+
"""
44+
from ...internal.utils.importlib import require_modules
45+
46+
47+
required_modules = ["confluent_kafka"]
48+
49+
with require_modules(required_modules) as missing_modules:
50+
if not missing_modules:
51+
from .patch import patch
52+
from .patch import unpatch
53+
54+
__all__ = ["patch", "unpatch"]

ddtrace/contrib/kafka/patch.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import confluent_kafka
2+
3+
from ddtrace import config
4+
from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY
5+
from ddtrace.constants import SPAN_KIND
6+
from ddtrace.constants import SPAN_MEASURED_KEY
7+
from ddtrace.contrib import trace_utils
8+
from ddtrace.ext import SpanKind
9+
from ddtrace.ext import SpanTypes
10+
from ddtrace.ext import kafka as kafkax
11+
from ddtrace.internal.compat import ensure_text
12+
from ddtrace.internal.constants import COMPONENT
13+
from ddtrace.internal.constants import MESSAGING_SYSTEM
14+
from ddtrace.internal.utils import ArgumentError
15+
from ddtrace.internal.utils import get_argument_value
16+
from ddtrace.pin import Pin
17+
from ddtrace.vendor.wrapt import ObjectProxy
18+
19+
20+
_Producer = confluent_kafka.Producer
21+
_Consumer = confluent_kafka.Consumer
22+
23+
24+
config._add(
25+
"kafka",
26+
dict(_default_service="kafka"),
27+
)
28+
29+
30+
class TracedProducer(ObjectProxy):
31+
def __init__(self, *args, **kwargs):
32+
producer = _Producer(*args, **kwargs)
33+
super(TracedProducer, self).__init__(producer)
34+
Pin().onto(self)
35+
36+
def produce(self, *args, **kwargs):
37+
func = self.__wrapped__.produce
38+
topic = get_argument_value(args, kwargs, 0, "topic")
39+
try:
40+
value = get_argument_value(args, kwargs, 1, "value")
41+
except ArgumentError:
42+
value = None
43+
message_key = kwargs.get("key", "")
44+
partition = kwargs.get("partition", -1)
45+
46+
pin = Pin.get_from(self)
47+
if not pin or not pin.enabled():
48+
return func(*args, **kwargs)
49+
50+
with pin.tracer.trace(
51+
kafkax.PRODUCE,
52+
service=trace_utils.ext_service(pin, config.kafka),
53+
span_type=SpanTypes.WORKER,
54+
) as span:
55+
span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE)
56+
span.set_tag_str(COMPONENT, config.kafka.integration_name)
57+
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
58+
span.set_tag_str(kafkax.TOPIC, topic)
59+
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key))
60+
span.set_tag(kafkax.PARTITION, partition)
61+
span.set_tag_str(kafkax.TOMBSTONE, str(value is None))
62+
span.set_tag(SPAN_MEASURED_KEY)
63+
rate = config.kafka.get_analytics_sample_rate()
64+
if rate is not None:
65+
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate)
66+
return func(*args, **kwargs)
67+
68+
# in older versions of confluent_kafka, bool(Producer()) evaluates to False,
69+
# which makes the Pin functionality ignore it.
70+
def __bool__(self):
71+
return True
72+
73+
__nonzero__ = __bool__
74+
75+
76+
class TracedConsumer(ObjectProxy):
77+
78+
__slots__ = "_group_id"
79+
80+
def __init__(self, *args, **kwargs):
81+
consumer = _Consumer(*args, **kwargs)
82+
super(TracedConsumer, self).__init__(consumer)
83+
self._group_id = get_argument_value(args, kwargs, 0, "config")["group.id"]
84+
Pin().onto(self)
85+
86+
def poll(self, *args, **kwargs):
87+
func = self.__wrapped__.poll
88+
pin = Pin.get_from(self)
89+
if not pin or not pin.enabled():
90+
return func(*args, **kwargs)
91+
92+
with pin.tracer.trace(
93+
kafkax.CONSUME,
94+
service=trace_utils.ext_service(pin, config.kafka),
95+
span_type=SpanTypes.WORKER,
96+
) as span:
97+
message = func(*args, **kwargs)
98+
span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE)
99+
span.set_tag_str(COMPONENT, config.kafka.integration_name)
100+
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
101+
span.set_tag_str(kafkax.RECEIVED_MESSAGE, str(message is not None))
102+
span.set_tag_str(kafkax.GROUP_ID, self._group_id)
103+
if message is not None:
104+
message_key = message.key() or ""
105+
message_offset = message.offset() or -1
106+
span.set_tag_str(kafkax.TOPIC, message.topic())
107+
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key))
108+
span.set_tag(kafkax.PARTITION, message.partition())
109+
span.set_tag_str(kafkax.TOMBSTONE, str(len(message) == 0))
110+
span.set_tag(kafkax.MESSAGE_OFFSET, message_offset)
111+
span.set_tag(SPAN_MEASURED_KEY)
112+
rate = config.kafka.get_analytics_sample_rate()
113+
if rate is not None:
114+
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate)
115+
return message
116+
117+
118+
def patch():
119+
if getattr(confluent_kafka, "_datadog_patch", False):
120+
return
121+
setattr(confluent_kafka, "_datadog_patch", True)
122+
123+
setattr(confluent_kafka, "Producer", TracedProducer)
124+
setattr(confluent_kafka, "Consumer", TracedConsumer)
125+
126+
127+
def unpatch():
128+
if getattr(confluent_kafka, "_datadog_patch", False):
129+
setattr(confluent_kafka, "_datadog_patch", False)
130+
131+
setattr(confluent_kafka, "Producer", _Producer)
132+
setattr(confluent_kafka, "Consumer", _Consumer)

ddtrace/ext/kafka.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
SERVICE = "kafka"
2+
3+
TOPIC = "kafka.topic"
4+
PARTITION = "kafka.partition"
5+
MESSAGE_KEY = "kafka.message_key"
6+
MESSAGE_OFFSET = "kafka.message_offset"
7+
GROUP_ID = "kafka.group_id"
8+
TOMBSTONE = "kafka.tombstone"
9+
RECEIVED_MESSAGE = "kafka.received_message"
10+
11+
PRODUCE = "kafka.produce"
12+
CONSUME = "kafka.consume"

ddtrace/internal/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,5 @@
4242
{"errors": [{"title": "You've been blocked", "detail": "Sorry, you cannot access this page.
4343
Please contact the customer service team. Security provided by Datadog."}]}
4444
"""
45+
46+
MESSAGING_SYSTEM = "messaging.system"

docker-compose.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,26 @@ services:
5656
image: redis:4.0-alpine
5757
ports:
5858
- "127.0.0.1:6379:6379"
59+
zookeeper:
60+
image: confluentinc/cp-zookeeper:7.3.1
61+
environment:
62+
- ZOOKEEPER_CLIENT_PORT=2181
63+
- ZOOKEEPER_TICK_TIME=2000
64+
ports:
65+
- "127.0.0.1:22181:2181"
66+
kafka:
67+
image: confluentinc/cp-kafka:7.3.2
68+
ports:
69+
- "127.0.0.1:29092:29092"
70+
environment:
71+
- KAFKA_BROKER_ID=1
72+
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
73+
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
74+
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
75+
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
76+
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
77+
depends_on:
78+
- zookeeper
5979
rediscluster:
6080
image: grokzen/redis-cluster:6.2.0
6181
environment:

docs/index.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ contacting support.
7575
+--------------------------------------------------+---------------+----------------+
7676
| :ref:`cherrypy` | >= 11.2.0 | No |
7777
+--------------------------------------------------+---------------+----------------+
78+
| :ref:`confluent-kafka <kafka>` | >= 1.7.0 | Yes |
79+
+--------------------------------------------------+---------------+----------------+
7880
| :ref:`consul` | >= 0.7 | Yes [2]_ |
7981
+--------------------------------------------------+---------------+----------------+
8082
| :ref:`django` | >= 1.8 | Yes |

docs/integrations.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,12 @@ Jinja2
236236
^^^^^^
237237
.. automodule:: ddtrace.contrib.jinja2
238238

239+
.. _kafka:
240+
241+
Kafka
242+
^^^^^
243+
244+
.. automodule:: ddtrace.contrib.kafka
239245

240246
.. _kombu:
241247

0 commit comments

Comments
 (0)