Skip to content

Commit 69553f5

Browse files
chore(tracing): improve debug logging for v05 encoding errors [backport 1.20] (#7173)
Backport 85d5ec5 from #7099 to 1.20. There is an edge case were the string table used by the v0.5 encoder swaps entries. This corrupts trace payloads. In this PR I will add debug logs that will help us reproduce the behavior that triggered incident-24455. ## RISK This PR maybe increase memory usage when v0.5 is enabled. With this change we will hold a reference to a all encoded spans until the encoded bytes are flushed and we will log trace payloads that can have a size of up to 50MB when we traces are misencoded. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that. --------- Co-authored-by: Munir Abdinur <[email protected]>
1 parent 9e3d8d6 commit 69553f5

File tree

6 files changed

+274
-1
lines changed

6 files changed

+274
-1
lines changed

ddtrace/internal/_encoding.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ class BufferFull(Exception):
1616
class BufferItemTooLarge(Exception):
1717
pass
1818

19+
class EncodingValidationError(Exception):
20+
pass
21+
1922
class BufferedEncoder(object):
2023
max_size: int
2124
max_item_size: int

ddtrace/internal/_encoding.pyx

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ class BufferItemTooLarge(Exception):
6767
pass
6868

6969

70+
class EncodingValidationError(Exception):
71+
pass
72+
73+
7074
cdef inline const char * string_to_buff(str s):
7175
IF PY_MAJOR_VERSION >= 3:
7276
return PyUnicode_AsUTF8(s)
@@ -268,6 +272,12 @@ cdef class MsgpackStringTable(StringTable):
268272
self._sp_len = self.pk.length
269273
self._sp_id = self._next_id
270274

275+
@property
276+
def table_values(self):
277+
return "_sp_len:{}\n_sp_id:{}\npk_buf:{}\npk_buff_size:{}\nmax_size:{}\ntable:{}".format(
278+
self._sp_len, self._sp_id, self.get_bytes(), self.pk.buf_size, self.max_size, self._table
279+
)
280+
271281
cdef rollback(self):
272282
if self._sp_len > 0:
273283
self.pk.length = self._sp_len
@@ -711,9 +721,11 @@ cdef class MsgpackEncoderV03(MsgpackEncoderBase):
711721

712722
cdef class MsgpackEncoderV05(MsgpackEncoderBase):
713723
cdef MsgpackStringTable _st
724+
cdef dict _encoded_spans
714725

715726
def __cinit__(self, size_t max_size, size_t max_item_size):
716727
self._st = MsgpackStringTable(max_size)
728+
self._encoded_spans = {}
717729

718730
cpdef flush(self):
719731
with self._lock:
@@ -722,9 +734,16 @@ cdef class MsgpackEncoderV05(MsgpackEncoderBase):
722734
PyLong_FromLong(<long> self.get_buffer()),
723735
<Py_ssize_t> super(MsgpackEncoderV05, self).size,
724736
)
725-
return self._st.flush()
737+
v05bytes = self._st.flush()
738+
self._verify_encoding(v05bytes)
739+
return v05bytes
726740
finally:
727741
self._reset_buffer()
742+
self._encoded_spans = {}
743+
744+
@property
745+
def stable(self):
746+
return self._st
728747

729748
@property
730749
def size(self):
@@ -737,6 +756,8 @@ cdef class MsgpackEncoderV05(MsgpackEncoderBase):
737756
try:
738757
self._st.savepoint()
739758
super(MsgpackEncoderV05, self).put(trace)
759+
for span in trace:
760+
self._encoded_spans[span.span_id] = span
740761
except Exception:
741762
self._st.rollback()
742763
raise
@@ -831,6 +852,80 @@ cdef class MsgpackEncoderV05(MsgpackEncoderBase):
831852

832853
return 0
833854

855+
cpdef _verify_encoding(self, encoded_bytes):
856+
import msgpack
857+
858+
from ddtrace import config
859+
860+
if not config._trace_writer_log_err_payload:
861+
return
862+
863+
unpacked = msgpack.unpackb(encoded_bytes, raw=True, strict_map_key=False)
864+
if not unpacked or not unpacked[0]:
865+
return unpacked
866+
867+
table, packed_traces = unpacked
868+
for trace in packed_traces:
869+
for span in trace:
870+
og_span = self._encoded_spans.get(span[4]) # correlate encoded span to a span in span_dict
871+
# structure of v0.5 encoded spans
872+
# 0: Service (uint32)
873+
# 1: Name (uint32)
874+
# 2: Resource (uint32)
875+
# 3: TraceID (uint64)
876+
# 4: SpanID (uint64)
877+
# 5: ParentID (uint64)
878+
# 6: Start (int64)
879+
# 7: Duration (int64)
880+
# 8: Error (int32)
881+
# 9: Meta (map[uint32]uint32)
882+
# 10: Metrics (map[uint32]float64)
883+
# 11: Type (uint32)
884+
try:
885+
assert og_span.service == (table[span[0]].decode() or None), "misencoded service: {}".format(
886+
table[span[0]]
887+
)
888+
assert og_span.name == (table[span[1]].decode() or None), "misencoded name: {}".format(
889+
table[span[1]]
890+
)
891+
assert og_span.resource == (table[span[2]].decode() or None), "misencoded resource: {}".format(
892+
table[span[2]]
893+
)
894+
assert og_span._trace_id_64bits == (span[3] or None), "misencoded trace id: {}".format(span[3])
895+
assert og_span.span_id == (span[4] or None), "misencoded span id: {}".format(span[4])
896+
assert og_span.parent_id == (span[5] or None), "misencoded parent id: {}".format(span[5])
897+
assert og_span.start_ns == span[6], "misencoded start: {}".format(span[6])
898+
assert og_span.duration_ns == (span[7] or None), "misencoded duration: {}".format(span[7])
899+
assert og_span.error == span[8], "misencoded error type: {}".format(span[8])
900+
901+
for k, v in span[9].items():
902+
k = table[k].decode()
903+
v = table[v].decode()
904+
if "dropped string of length" in k or "dropped string of length" in v:
905+
continue
906+
assert og_span._meta[k] == v, "misencoded tag: k={} v={}".format(k, v)
907+
908+
for k, v in span[10].items():
909+
k = table[k].decode()
910+
if "dropped string of length" in k:
911+
continue
912+
assert og_span._metrics[k] == v, "misencoded metric: k={} v={}".format(k, v)
913+
914+
assert og_span.span_type == (table[span[11]].decode() or None), "misencoded span type: {}".format(
915+
table[span[11]]
916+
)
917+
except Exception as e:
918+
eve = EncodingValidationError(
919+
str(e) + "\nDecoded Span does not match encoded span: {}".format(og_span._pprint())
920+
)
921+
setattr(
922+
eve,
923+
"_debug_message",
924+
"Malformed String table values\ntable:{}\ntraces:{}\nencoded_bytes:{}\nspans:{}".format(
925+
table, packed_traces, encoded_bytes, self._encoded_spans
926+
),
927+
)
928+
raise eve
834929

835930
cdef class Packer(object):
836931
"""Slightly modified version of the v0.6.2 msgpack Packer

ddtrace/internal/writer/writer.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from ...internal.utils.time import StopWatch
3131
from .._encoding import BufferFull
3232
from .._encoding import BufferItemTooLarge
33+
from .._encoding import EncodingValidationError
3334
from ..agent import get_connection
3435
from ..constants import _HTTPLIB_NO_TRACE_REQUEST
3536
from ..encoding import JSONEncoderV2
@@ -395,6 +396,11 @@ def _flush_queue_with_client(self, client, raise_exc=False):
395396
encoded = client.encoder.encode()
396397
if encoded is None:
397398
return
399+
except EncodingValidationError as e:
400+
log.error("Encoding Error (or span was modified after finish): %s", str(e))
401+
if hasattr(e, "_debug_message"):
402+
log.debug(e._debug_message)
403+
return
398404
except Exception:
399405
log.error("failed to encode trace with encoder %r", client.encoder, exc_info=True)
400406
self._metrics_dist("encoder.dropped.traces", n_traces)

tests/integration/test_integration.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,31 @@ def encode_traces(self, traces):
529529
)
530530

531531

532+
@skip_if_testagent
533+
@pytest.mark.subprocess(
534+
env={
535+
"_DD_TRACE_WRITER_LOG_ERROR_PAYLOADS": "true",
536+
"DD_TRACE_API_VERSION": "v0.5",
537+
"DD_TRACE_WRITER_INTERVAL_SECONDS": "1000",
538+
}
539+
)
540+
def test_trace_with_invalid_encoding_for_v05_payload():
541+
import mock
542+
543+
from ddtrace import tracer
544+
from tests.utils import AnyStr
545+
from tests.utils import AnyStringWithText
546+
547+
with mock.patch("ddtrace.internal.writer.writer.log") as log:
548+
span = tracer.trace("name")
549+
span.finish()
550+
span.name = "new_name"
551+
tracer.flush()
552+
553+
log.error.assert_has_calls([mock.call("Encoding Error (or span was modified after finish): %s", AnyStr())])
554+
log.debug.assert_has_calls([mock.call(AnyStringWithText("Malformed String table values"))])
555+
556+
532557
def test_trace_with_failing_encoder_generates_error_log():
533558
class ExceptionBadEncoder(BadEncoder):
534559
def encode(self):

tests/tracer/test_encoders.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# -*- coding: utf-8 -*-
22
import contextlib
33
import json
4+
import os
45
import random
56
import string
67
import threading
@@ -21,6 +22,7 @@
2122
from ddtrace.ext.ci import CI_APP_TEST_ORIGIN
2223
from ddtrace.internal._encoding import BufferFull
2324
from ddtrace.internal._encoding import BufferItemTooLarge
25+
from ddtrace.internal._encoding import EncodingValidationError
2426
from ddtrace.internal._encoding import ListStringTable
2527
from ddtrace.internal._encoding import MsgpackStringTable
2628
from ddtrace.internal.compat import msgpack_type
@@ -33,6 +35,7 @@
3335
from ddtrace.internal.encoding import _EncoderBase
3436
from ddtrace.span import Span
3537
from tests.utils import DummyTracer
38+
from tests.utils import override_global_config
3639

3740

3841
_ORIGIN_KEY = ORIGIN_KEY.encode()
@@ -700,3 +703,139 @@ def test_json_encoder_traces_bytes():
700703
assert u"\ufffdspan.a" == span_a["name"], span_a["name"]
701704
assert u"\x80span.b" == span_b["name"]
702705
assert u"\ufffdspan.b" == span_c["name"]
706+
707+
708+
@pytest.mark.skipif(
709+
os.getenv("PYTHONOPTIMIZE", "").lower() in ("1", "t", "true") or six.PY2,
710+
reason="Python optimize removes assertions from cython code. PY2 byte string formatting fails assertions",
711+
)
712+
def test_verifying_v05_payloads():
713+
string_table_size = 4 * (1 << 12)
714+
encoder = MsgpackEncoderV05(string_table_size, string_table_size)
715+
716+
# Ensure EncodingValidationError is not raised when trace fields are encoded as expected
717+
with override_global_config({"_trace_writer_log_err_payload": True}):
718+
traces = [[Span("name", "service", "resource", "type") for _ in range(5)] for _ in range(100)]
719+
for trace in traces:
720+
for s in trace:
721+
s._meta = {
722+
"app": "ac_query",
723+
"language": "python",
724+
"key_for_long_string": "very_long_string_that_will_be_dropped" * int(string_table_size * 0.1),
725+
}
726+
s._metrics = {
727+
"zqSCqAYiBgjmqYKoBiohcCKwCagB": 7,
728+
"_sampling_priority_v1": 0,
729+
"very_long_string_that_will_be_dropped" * int(string_table_size * 0.1): 1,
730+
}
731+
732+
encoded = []
733+
for trace in traces:
734+
try:
735+
encoder.put(trace)
736+
encoded.append(trace)
737+
except BufferFull:
738+
pass
739+
assert 0 < len(encoded) < len(traces), "Ensures BufferFull is raised and only a subset of traces are encoded"
740+
assert encoder.encode()
741+
742+
# Ensure EncodingValidationError is raised when the encoded span name does not match the span name
743+
with override_global_config({"_trace_writer_log_err_payload": True}):
744+
with pytest.raises(EncodingValidationError) as e:
745+
og_span = Span("name", "service", "resource", "type")
746+
encoder.put([og_span])
747+
og_span.name = "new_name"
748+
encoder.encode()
749+
assert "misencoded name: b'name'" in e.value.args[0]
750+
751+
# Ensure EncodingValidationError is raised when the encoded service does not match the span service
752+
with override_global_config({"_trace_writer_log_err_payload": True}):
753+
with pytest.raises(EncodingValidationError) as e:
754+
og_span = Span("name", "service", "resource", "type")
755+
encoder.put([og_span])
756+
og_span.service = "new_service"
757+
encoder.encode()
758+
assert "misencoded service: b'service'" in e.value.args[0]
759+
760+
# Ensure EncodingValidationError is raised when the encoded resource does not match the span resource
761+
with override_global_config({"_trace_writer_log_err_payload": True}):
762+
with pytest.raises(EncodingValidationError) as e:
763+
og_span = Span("name", "service", "resource", "type")
764+
encoder.put([og_span])
765+
og_span.resource = "new_resource"
766+
encoder.encode()
767+
assert "misencoded resource: b'resource'" in e.value.args[0]
768+
769+
# Ensure EncodingValidationError is raised when the encoded duration does not match
770+
with override_global_config({"_trace_writer_log_err_payload": True}):
771+
with pytest.raises(EncodingValidationError) as e:
772+
og_span = Span("name", "service", "resource", "type")
773+
encoder.put([og_span])
774+
og_span.duration_ns = 55
775+
encoder.encode()
776+
assert "misencoded duration: 0" in e.value.args[0]
777+
778+
# Ensure EncodingValidationError is raised when the encoded start does not match
779+
with override_global_config({"_trace_writer_log_err_payload": True}):
780+
with pytest.raises(EncodingValidationError) as e:
781+
og_span = Span("name", "service", "resource", "type", start=10)
782+
encoder.put([og_span])
783+
og_span.start_ns = 100000001
784+
encoder.encode()
785+
assert "misencoded start: 10" in e.value.args[0]
786+
787+
# Ensure EncodingValidationError is raised when the encoded parent_id does not match
788+
with override_global_config({"_trace_writer_log_err_payload": True}):
789+
with pytest.raises(EncodingValidationError) as e:
790+
og_span = Span("name", "service", "resource", "type", parent_id=1)
791+
encoder.put([og_span])
792+
og_span.parent_id = 2
793+
encoder.encode()
794+
assert "misencoded parent id: 1" in e.value.args[0]
795+
796+
# Ensure EncodingValidationError is raised when the encoded trace id does not match
797+
with override_global_config({"_trace_writer_log_err_payload": True}):
798+
with pytest.raises(EncodingValidationError) as e:
799+
og_span = Span("name", "service", "resource", "type", trace_id=1)
800+
encoder.put([og_span])
801+
og_span.trace_id = 2
802+
encoder.encode()
803+
assert "misencoded trace id: 1" in e.value.args[0]
804+
805+
# Ensure EncodingValidationError is raised when the encoded span id does not match
806+
with override_global_config({"_trace_writer_log_err_payload": True}):
807+
with pytest.raises(EncodingValidationError) as e:
808+
og_span = Span("name", "service", "resource", "type", span_id=1)
809+
encoder.put([og_span])
810+
og_span.span_id = 2
811+
encoder.encode()
812+
assert "misencoded span id: 1" in e.value.args[0]
813+
814+
# Ensure EncodingValidationError is raised when the encoded tags do not match the span tag's
815+
with override_global_config({"_trace_writer_log_err_payload": True}):
816+
with pytest.raises(EncodingValidationError) as e:
817+
og_span = Span("name", "service", "resource", "type")
818+
og_span._meta["hi"] = "tag"
819+
encoder.put([og_span])
820+
og_span._meta["hi"] = "new tag"
821+
encoder.encode()
822+
assert "misencoded tag: k=hi v=tag" in e.value.args[0]
823+
824+
# Ensure EncodingValidationError is raised when the encoded metrics do not match the metrics set on the span
825+
with override_global_config({"_trace_writer_log_err_payload": True}):
826+
with pytest.raises(EncodingValidationError) as e:
827+
og_span = Span("name", "service", "resource", "type")
828+
og_span._metrics["hi"] = 1
829+
encoder.put([og_span])
830+
og_span._metrics["hi"] = 2
831+
encoder.encode()
832+
assert "misencoded metric: k=hi v=1" in e.value.args[0]
833+
834+
# Ensure EncodingValidationError is raised when the encoded span type does not match the span type on the span
835+
with override_global_config({"_trace_writer_log_err_payload": True}):
836+
with pytest.raises(EncodingValidationError) as e:
837+
og_span = Span("name", "service", "resource", "type")
838+
encoder.put([og_span])
839+
og_span.span_type = "new_span_type"
840+
encoder.encode()
841+
assert "misencoded span type: b'type'" in e.value.args[0]

tests/utils.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,6 +1098,11 @@ def __eq__(self, other):
10981098
return isinstance(other, str)
10991099

11001100

1101+
class AnyStringWithText(str):
1102+
def __eq__(self, other):
1103+
return self in other
1104+
1105+
11011106
class AnyInt(object):
11021107
def __eq__(self, other):
11031108
return isinstance(other, int)

0 commit comments

Comments
 (0)