Skip to content

Commit f42fea6

Browse files
github-actions[bot]wconti27emmettbutler
authored
fix(kafka): fix bug when setting message.key span tag for kafka serialization classes [backport 2.3] (#7793)
Backport 0e3a293 from #7725 to 2.3. ## Description Fixes #7674 . Sets `SerializedProducer` span tag for `message.key` by serializing key to bytes. Does not set the tag for a `DeserializedConsumer` unless the key is a `str` or `bytes` since we do not have the serialization function ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that. Co-authored-by: William Conti <[email protected]> Co-authored-by: Emmett Butler <[email protected]>
1 parent 8bfbf79 commit f42fea6

File tree

3 files changed

+108
-3
lines changed

3 files changed

+108
-3
lines changed

ddtrace/contrib/kafka/patch.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@
99
from ddtrace.ext import SpanTypes
1010
from ddtrace.ext import kafka as kafkax
1111
from ddtrace.internal import core
12-
from ddtrace.internal.compat import ensure_text
1312
from ddtrace.internal.constants import COMPONENT
1413
from ddtrace.internal.constants import MESSAGING_SYSTEM
14+
from ddtrace.internal.logger import get_logger
1515
from ddtrace.internal.schema import schematize_messaging_operation
1616
from ddtrace.internal.schema import schematize_service_name
1717
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
1818
from ddtrace.internal.utils import ArgumentError
1919
from ddtrace.internal.utils import get_argument_value
2020
from ddtrace.internal.utils.formats import asbool
21+
from ddtrace.internal.utils.version import parse_version
2122
from ddtrace.pin import Pin
2223

2324

@@ -29,6 +30,9 @@
2930
)
3031

3132

33+
log = get_logger(__name__)
34+
35+
3236
config._add(
3337
"kafka",
3438
dict(
@@ -42,6 +46,13 @@ def get_version():
4246
return getattr(confluent_kafka, "__version__", "")
4347

4448

49+
KAFKA_VERSION_TUPLE = parse_version(get_version())
50+
51+
52+
_SerializationContext = confluent_kafka.serialization.SerializationContext if KAFKA_VERSION_TUPLE >= (1, 4, 0) else None
53+
_MessageField = confluent_kafka.serialization.MessageField if KAFKA_VERSION_TUPLE >= (1, 4, 0) else None
54+
55+
4556
class TracedProducerMixin:
4657
def __init__(self, config, *args, **kwargs):
4758
super(TracedProducerMixin, self).__init__(config, *args, **kwargs)
@@ -139,6 +150,7 @@ def traced_produce(func, instance, args, kwargs):
139150
value = None
140151
message_key = kwargs.get("key", "") or ""
141152
partition = kwargs.get("partition", -1)
153+
headers = get_argument_value(args, kwargs, 6, "headers", optional=True) or {}
142154
with pin.tracer.trace(
143155
schematize_messaging_operation(kafkax.PRODUCE, provider="kafka", direction=SpanDirection.OUTBOUND),
144156
service=trace_utils.ext_service(pin, config.kafka),
@@ -149,7 +161,14 @@ def traced_produce(func, instance, args, kwargs):
149161
span.set_tag_str(COMPONENT, config.kafka.integration_name)
150162
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
151163
span.set_tag_str(kafkax.TOPIC, topic)
152-
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key, errors="replace"))
164+
165+
if _SerializingProducer is not None and isinstance(instance, _SerializingProducer):
166+
serialized_key = serialize_key(instance, topic, message_key, headers)
167+
if serialized_key is not None:
168+
span.set_tag_str(kafkax.MESSAGE_KEY, serialized_key)
169+
else:
170+
span.set_tag_str(kafkax.MESSAGE_KEY, message_key)
171+
153172
span.set_tag(kafkax.PARTITION, partition)
154173
span.set_tag_str(kafkax.TOMBSTONE, str(value is None))
155174
span.set_tag(SPAN_MEASURED_KEY)
@@ -184,7 +203,15 @@ def traced_poll(func, instance, args, kwargs):
184203
message_key = message.key() or ""
185204
message_offset = message.offset() or -1
186205
span.set_tag_str(kafkax.TOPIC, message.topic())
187-
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key, errors="replace"))
206+
207+
# If this is a deserializing consumer, do not set the key as a tag since we
208+
# do not have the serialization function
209+
if (
210+
(_DeserializingConsumer is not None and not isinstance(instance, _DeserializingConsumer))
211+
or isinstance(message_key, str)
212+
or isinstance(message_key, bytes)
213+
):
214+
span.set_tag_str(kafkax.MESSAGE_KEY, message_key)
188215
span.set_tag(kafkax.PARTITION, message.partition())
189216
span.set_tag_str(kafkax.TOMBSTONE, str(len(message) == 0))
190217
span.set_tag(kafkax.MESSAGE_OFFSET, message_offset)
@@ -203,3 +230,18 @@ def traced_commit(func, instance, args, kwargs):
203230
core.dispatch("kafka.commit.start", [instance, args, kwargs])
204231

205232
return func(*args, **kwargs)
233+
234+
235+
def serialize_key(instance, topic, key, headers):
236+
if _SerializationContext is not None and _MessageField is not None:
237+
ctx = _SerializationContext(topic, _MessageField.KEY, headers)
238+
if hasattr(instance, "_key_serializer") and instance._key_serializer is not None:
239+
try:
240+
key = instance._key_serializer(key, ctx)
241+
return key
242+
except Exception:
243+
log.debug("Failed to set Kafka Consumer key tag: %s", str(key))
244+
return None
245+
else:
246+
log.warning("Failed to set Kafka Consumer key tag, no method available to serialize key: %s", str(key))
247+
return None
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
kafka: Resolves ``TypeError`` raised by serializing producers and deserializing consumers when the ``message.key`` tag is set on spans.

tests/contrib/kafka/test_kafka.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
import os
34
import time
@@ -740,3 +741,61 @@ def test_span_has_dsm_payload_hash(dummy_tracer, consumer, producer, kafka_topic
740741

741742
assert consume_span.name == "kafka.consume"
742743
assert consume_span.get_tag("pathway.hash") is not None
744+
745+
746+
def test_tracing_with_serialization_works(dummy_tracer, kafka_topic):
747+
def json_serializer(msg, s_obj):
748+
return json.dumps(msg).encode("utf-8")
749+
750+
conf = {
751+
"bootstrap.servers": BOOTSTRAP_SERVERS,
752+
"key.serializer": json_serializer,
753+
"value.serializer": json_serializer,
754+
}
755+
_producer = confluent_kafka.SerializingProducer(conf)
756+
757+
def json_deserializer(as_bytes, ctx):
758+
try:
759+
return json.loads(as_bytes)
760+
except json.decoder.JSONDecodeError:
761+
return as_bytes
762+
763+
conf = {
764+
"bootstrap.servers": BOOTSTRAP_SERVERS,
765+
"group.id": GROUP_ID,
766+
"auto.offset.reset": "earliest",
767+
"key.deserializer": json_deserializer,
768+
"value.deserializer": json_deserializer,
769+
}
770+
771+
_consumer = confluent_kafka.DeserializingConsumer(conf)
772+
tp = TopicPartition(kafka_topic, 0)
773+
tp.offset = 0 # we want to read the first message
774+
_consumer.commit(offsets=[tp])
775+
_consumer.subscribe([kafka_topic])
776+
777+
Pin.override(_producer, tracer=dummy_tracer)
778+
Pin.override(_consumer, tracer=dummy_tracer)
779+
780+
test_string = "serializing_test"
781+
PAYLOAD = {"val": test_string}
782+
783+
_producer.produce(kafka_topic, key={"name": "keykey"}, value=PAYLOAD)
784+
_producer.flush()
785+
786+
message = None
787+
while message is None or message.value() != PAYLOAD:
788+
message = _consumer.poll(1.0)
789+
790+
# message comes back with expected test string
791+
assert message.value() == PAYLOAD
792+
793+
traces = dummy_tracer.pop_traces()
794+
produce_span = traces[0][0]
795+
consume_span = traces[len(traces) - 1][0]
796+
797+
assert produce_span.get_tag("kafka.message_key") is not None
798+
799+
# consumer span will not have tag set since we can't serialize the deserialized key from the original type to
800+
# a string
801+
assert consume_span.get_tag("kafka.message_key") is None

0 commit comments

Comments
 (0)