Skip to content

Commit 42c281c

Browse files
fix(kafka): handle errors from ensure_text gracefully for Kafka tracing [backport 1.17] (#6514)
Backport 8536722 from #6493 to 1.17. ## Motivation This change ensures that we don't hard fail if there's string decoded by `ensure_text` isn't utf-8 encoded. We saw this in one our production applications after updating this library. While we are investigating the messages to see if there's an issue in our producer, we still would prefer that the code not hard error at this point if it's being traced. ```python Traceback (most recent call last): .... File "/pyenv/versions/.../lib/python3.8/site-packages/confluent_kafka/avro/__init__.py", line 174, in poll message = super(AvroConsumer, self).poll(timeout) File "/pyenv/versions/.../lib/python3.8/site-packages/ddtrace/contrib/kafka/patch.py", line 157, in traced_poll span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key)) File "/pyenv/versions/.../lib/python3.8/site-packages/six.py", line 951, in ensure_text return s.decode(encoding, errors) UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc8 in position 5: invalid continuation byte ``` This is a non-breaking change (can't seem to edit the labels of this PR). ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) Co-authored-by: Joe Danis <[email protected]>
1 parent b77124e commit 42c281c

File tree

2 files changed

+6
-2
lines changed

2 files changed

+6
-2
lines changed

ddtrace/contrib/kafka/patch.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def traced_produce(func, instance, args, kwargs):
116116
span.set_tag_str(COMPONENT, config.kafka.integration_name)
117117
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
118118
span.set_tag_str(kafkax.TOPIC, topic)
119-
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key))
119+
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key, errors="replace"))
120120
span.set_tag(kafkax.PARTITION, partition)
121121
span.set_tag_str(kafkax.TOMBSTONE, str(value is None))
122122
span.set_tag(SPAN_MEASURED_KEY)
@@ -154,7 +154,7 @@ def traced_poll(func, instance, args, kwargs):
154154
message_key = message.key() or ""
155155
message_offset = message.offset() or -1
156156
span.set_tag_str(kafkax.TOPIC, message.topic())
157-
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key))
157+
span.set_tag_str(kafkax.MESSAGE_KEY, ensure_text(message_key, errors="replace"))
158158
span.set_tag(kafkax.PARTITION, message.partition())
159159
span.set_tag_str(kafkax.TOMBSTONE, str(len(message) == 0))
160160
span.set_tag(kafkax.MESSAGE_OFFSET, message_offset)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
kafka: Resolves ``UnicodeDecodeError`` raised when kafka messages key contain characters that are not supported by UTF-8 encoding.

0 commit comments

Comments
 (0)