Skip to content

Commit 55c12af

Browse files
avara1986wconti27
andauthored
fix(kafka): message.key span tag for kafka serialization classes [backport 1.20] (#7922)
Backport 0e3a293 from #7725 to 1.20. ## Description Fixes #7674 . Sets `SerializedProducer` span tag for `message.key` by serializing key to bytes. Does not set the tag for a `DeserializedConsumer` unless the key is a `str` or `bytes` since we do not have the serialization function ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that. --------- Co-authored-by: William Conti <[email protected]>
1 parent 40c0526 commit 55c12af

File tree

3 files changed

+111
-4
lines changed

3 files changed

+111
-4
lines changed

ddtrace/contrib/kafka/patch.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@
99
from ddtrace.ext import SpanTypes
1010
from ddtrace.ext import kafka as kafkax
1111
from ddtrace.internal import core
12-
from ddtrace.internal.compat import ensure_text
1312
from ddtrace.internal.constants import COMPONENT
1413
from ddtrace.internal.constants import MESSAGING_SYSTEM
14+
from ddtrace.internal.logger import get_logger
1515
from ddtrace.internal.schema import schematize_messaging_operation
1616
from ddtrace.internal.schema import schematize_service_name
1717
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
1818
from ddtrace.internal.utils import ArgumentError
1919
from ddtrace.internal.utils import get_argument_value
2020
from ddtrace.internal.utils.formats import asbool
21+
from ddtrace.internal.utils.version import parse_version
2122
from ddtrace.pin import Pin
2223

2324

@@ -29,6 +30,9 @@
2930
)
3031

3132

33+
log = get_logger(__name__)
34+
35+
3236
config._add(
3337
"kafka",
3438
dict(
@@ -42,6 +46,13 @@ def get_version():
4246
return getattr(confluent_kafka, "__version__", "")
4347

4448

49+
KAFKA_VERSION_TUPLE = parse_version(get_version())
50+
51+
52+
_SerializationContext = confluent_kafka.serialization.SerializationContext if KAFKA_VERSION_TUPLE >= (1, 4, 0) else None
53+
_MessageField = confluent_kafka.serialization.MessageField if KAFKA_VERSION_TUPLE >= (1, 4, 0) else None
54+
55+
4556
class TracedProducer(confluent_kafka.Producer):
4657
def __init__(self, config, *args, **kwargs):
4758
super(TracedProducer, self).__init__(config, *args, **kwargs)
@@ -118,7 +129,7 @@ def traced_produce(func, instance, args, kwargs):
118129
message_key = kwargs.get("key", "") or ""
119130
partition = kwargs.get("partition", -1)
120131
core.dispatch("kafka.produce.start", [instance, args, kwargs])
121-
132+
headers = get_argument_value(args, kwargs, 6, "headers", optional=True) or {}
122133
with pin.tracer.trace(
123134
schematize_messaging_operation(kafkax.PRODUCE, provider="kafka", direction=SpanDirection.OUTBOUND),
124135
service=trace_utils.ext_service(pin, config.kafka),
@@ -128,7 +139,14 @@ def traced_produce(func, instance, args, kwargs):
128139
span.set_tag_str(COMPONENT, config.kafka.integration_name)
129140
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
130141
span.set_tag_str(kafkax.TOPIC, topic)
131-
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key, errors="replace"))
142+
143+
if _SerializingProducer is not None and isinstance(instance, _SerializingProducer):
144+
serialized_key = serialize_key(instance, topic, message_key, headers)
145+
if serialized_key is not None:
146+
span.set_tag_str(kafkax.MESSAGE_KEY, serialized_key)
147+
else:
148+
span.set_tag_str(kafkax.MESSAGE_KEY, message_key)
149+
132150
span.set_tag(kafkax.PARTITION, partition)
133151
span.set_tag_str(kafkax.TOMBSTONE, str(value is None))
134152
span.set_tag(SPAN_MEASURED_KEY)
@@ -163,7 +181,15 @@ def traced_poll(func, instance, args, kwargs):
163181
message_key = message.key() or ""
164182
message_offset = message.offset() or -1
165183
span.set_tag_str(kafkax.TOPIC, message.topic())
166-
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key, errors="replace"))
184+
185+
# If this is a deserializing consumer, do not set the key as a tag since we
186+
# do not have the serialization function
187+
if (
188+
(_DeserializingConsumer is not None and not isinstance(instance, _DeserializingConsumer))
189+
or isinstance(message_key, str)
190+
or isinstance(message_key, bytes)
191+
):
192+
span.set_tag_str(kafkax.MESSAGE_KEY, message_key)
167193
span.set_tag(kafkax.PARTITION, message.partition())
168194
span.set_tag_str(kafkax.TOMBSTONE, str(len(message) == 0))
169195
span.set_tag(kafkax.MESSAGE_OFFSET, message_offset)
@@ -182,3 +208,18 @@ def traced_commit(func, instance, args, kwargs):
182208
core.dispatch("kafka.commit.start", [instance, args, kwargs])
183209

184210
return func(*args, **kwargs)
211+
212+
213+
def serialize_key(instance, topic, key, headers):
214+
if _SerializationContext is not None and _MessageField is not None:
215+
ctx = _SerializationContext(topic, _MessageField.KEY, headers)
216+
if hasattr(instance, "_key_serializer") and instance._key_serializer is not None:
217+
try:
218+
key = instance._key_serializer(key, ctx)
219+
return key
220+
except Exception:
221+
log.debug("Failed to set Kafka Consumer key tag: %s", str(key))
222+
return None
223+
else:
224+
log.warning("Failed to set Kafka Consumer key tag, no method available to serialize key: %s", str(key))
225+
return None
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
kafka: Resolves ``TypeError`` raised by serializing producers and deserializing consumers when the ``message.key`` tag is set on spans.

tests/contrib/kafka/test_kafka.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
import os
34
import time
@@ -590,3 +591,64 @@ def _read_single_message(consumer):
590591
consumer.commit(asynchronous=False)
591592
assert consumer.committed([TopicPartition(kafka_topic, 0)])[0].offset == 2
592593
assert list(buckets.values())[0].latest_commit_offsets[ConsumerPartitionKey("test_group", kafka_topic, 0)] == 1
594+
595+
596+
def test_tracing_with_serialization_works(dummy_tracer, kafka_topic):
597+
def json_serializer(msg, s_obj):
598+
return json.dumps(msg).encode("utf-8")
599+
600+
conf = {
601+
"bootstrap.servers": BOOTSTRAP_SERVERS,
602+
"key.serializer": json_serializer,
603+
"value.serializer": json_serializer,
604+
}
605+
try:
606+
_producer = confluent_kafka.SerializingProducer(conf)
607+
except KafkaException:
608+
pytest.xfail("No such configuration property: 'key.serializer'")
609+
610+
def json_deserializer(as_bytes, ctx):
611+
try:
612+
return json.loads(as_bytes)
613+
except json.decoder.JSONDecodeError:
614+
return as_bytes
615+
616+
conf = {
617+
"bootstrap.servers": BOOTSTRAP_SERVERS,
618+
"group.id": GROUP_ID,
619+
"auto.offset.reset": "earliest",
620+
"key.deserializer": json_deserializer,
621+
"value.deserializer": json_deserializer,
622+
}
623+
624+
_consumer = confluent_kafka.DeserializingConsumer(conf)
625+
tp = TopicPartition(kafka_topic, 0)
626+
tp.offset = 0 # we want to read the first message
627+
_consumer.commit(offsets=[tp])
628+
_consumer.subscribe([kafka_topic])
629+
630+
Pin.override(_producer, tracer=dummy_tracer)
631+
Pin.override(_consumer, tracer=dummy_tracer)
632+
633+
test_string = "serializing_test"
634+
PAYLOAD = {"val": test_string}
635+
636+
_producer.produce(kafka_topic, key={"name": "keykey"}, value=PAYLOAD)
637+
_producer.flush()
638+
639+
message = None
640+
while message is None or message.value() != PAYLOAD:
641+
message = _consumer.poll(1.0)
642+
643+
# message comes back with expected test string
644+
assert message.value() == PAYLOAD
645+
646+
traces = dummy_tracer.pop_traces()
647+
produce_span = traces[0][0]
648+
consume_span = traces[len(traces) - 1][0]
649+
650+
assert produce_span.get_tag("kafka.message_key") is not None
651+
652+
# consumer span will not have tag set since we can't serialize the deserialized key from the original type to
653+
# a string
654+
assert consume_span.get_tag("kafka.message_key") is None

0 commit comments

Comments
 (0)