Skip to content

Commit af17063

Browse files
authored
[core] Add Payload class helper (#834)
* [core] Add Payload class * update encoder tests * Add payload tests * use byte string for python 3.x
1 parent e6f1015 commit af17063

File tree

5 files changed

+339
-11
lines changed

5 files changed

+339
-11
lines changed

ddtrace/api.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from .encoding import get_encoder, JSONEncoder
88
from .compat import httplib, PYTHON_VERSION, PYTHON_INTERPRETER, get_connection_response
99
from .internal.logger import get_logger
10+
from .payload import Payload
1011
from .utils.deprecation import deprecated
1112

1213

@@ -147,9 +148,13 @@ def _downgrade(self):
147148
def send_traces(self, traces):
148149
if not traces:
149150
return
151+
150152
start = time.time()
151-
data = self._encoder.encode_traces(traces)
152-
response = self._put(self._traces, data, len(traces))
153+
payload = Payload(encoder=self._encoder)
154+
for trace in traces:
155+
payload.add_trace(trace)
156+
157+
response = self._put(self._traces, payload.get_payload(), payload.length)
153158

154159
# the API endpoint is not available so we should downgrade the connection and re-try the call
155160
if response.status in [404, 415] and self._fallback:

ddtrace/encoding.py

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import struct
23

34
from .internal.logger import get_logger
45

@@ -45,41 +46,78 @@ def encode_traces(self, traces):
4546
:param traces: A list of traces that should be serialized
4647
"""
4748
normalized_traces = [[span.to_dict() for span in trace] for trace in traces]
48-
return self._encode(normalized_traces)
49+
return self.encode(normalized_traces)
4950

50-
def encode_services(self, services):
51+
def encode_trace(self, trace):
5152
"""
52-
Encodes a dictionary of services.
53+
Encodes a trace, expecting a list of spans. Before dump the string in a
54+
serialized format all traces are normalized, calling the ``to_dict()`` method.
55+
The traces nesting is not changed.
5356
54-
:param services: A dictionary that contains one or more services
57+
:param trace: A list of traces that should be serialized
5558
"""
56-
return self._encode(services)
59+
return self.encode([span.to_dict() for span in trace])
5760

58-
def _encode(self, obj):
61+
def encode(self, obj):
5962
"""
6063
Defines the underlying format used during traces or services encoding.
6164
This method must be implemented and should only be used by the internal functions.
6265
"""
6366
raise NotImplementedError
6467

68+
def decode(self, data):
69+
"""
70+
Defines the underlying format used during traces or services encoding.
71+
This method must be implemented and should only be used by the internal functions.
72+
"""
73+
raise NotImplementedError
74+
75+
def join_encoded(self, objs):
76+
"""Helper used to join a list of encoded objects into an encoded list of objects"""
77+
raise NotImplementedError
78+
6579

6680
class JSONEncoder(Encoder):
6781
def __init__(self):
6882
# TODO[manu]: add instructions about how users can switch to Msgpack
6983
log.debug('using JSON encoder; application performance may be degraded')
7084
self.content_type = 'application/json'
7185

72-
def _encode(self, obj):
86+
def encode(self, obj):
7387
return json.dumps(obj)
7488

89+
def decode(self, data):
90+
return json.loads(data)
91+
92+
def join_encoded(self, objs):
93+
"""Join a list of encoded objects together as a json array"""
94+
return '[' + ','.join(objs) + ']'
95+
7596

7697
class MsgpackEncoder(Encoder):
7798
def __init__(self):
7899
log.debug('using Msgpack encoder')
79100
self.content_type = 'application/msgpack'
80101

81-
def _encode(self, obj):
82-
return msgpack.packb(obj, **MSGPACK_PARAMS)
102+
def encode(self, obj):
103+
return msgpack.packb(obj)
104+
105+
def decode(self, data):
106+
return msgpack.unpackb(data)
107+
108+
def join_encoded(self, objs):
109+
"""Join a list of encoded objects together as a msgpack array"""
110+
buf = b''.join(objs)
111+
112+
# Prepend array header to buffer
113+
# https://github.com/msgpack/msgpack-python/blob/f46523b1af7ff2d408da8500ea36a4f9f2abe915/msgpack/fallback.py#L948-L955
114+
count = len(objs)
115+
if count <= 0xf:
116+
return struct.pack('B', 0x90 + count) + buf
117+
elif count <= 0xffff:
118+
return struct.pack('>BH', 0xdc, count) + buf
119+
else:
120+
return struct.pack('>BI', 0xdd, count) + buf
83121

84122

85123
def get_encoder():

ddtrace/payload.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import logging
2+
3+
from .encoding import get_encoder
4+
5+
log = logging.getLogger(__name__)
6+
7+
8+
class Payload:
9+
"""
10+
Trace agent API payload buffer class
11+
12+
This class is used to encoded and store traces to build the payload we send to
13+
the trace agent.
14+
15+
DEV: We encoded and buffer traces so that we can reliable determine the size of
16+
the payload easily so we can flush based on the payload size.
17+
"""
18+
__slots__ = ('traces', 'size', 'encoder', 'max_payload_size')
19+
20+
# Default max payload size of 5mb
21+
# DEV: Trace agent limit is 10mb, cutoff at 5mb to ensure we don't hit 10mb
22+
DEFAULT_MAX_PAYLOAD_SIZE = 5 * 1000000
23+
24+
def __init__(self, encoder=None, max_payload_size=DEFAULT_MAX_PAYLOAD_SIZE):
25+
"""
26+
Constructor for Payload
27+
28+
:param encoder: The encoded to use, default is the default encoder
29+
:type encoder: ``ddtrace.encoding.Encoder``
30+
:param max_payload_size: The max number of bytes a payload should be before
31+
being considered full (default: 5mb)
32+
"""
33+
self.max_payload_size = max_payload_size
34+
self.encoder = encoder or get_encoder()
35+
self.traces = []
36+
self.size = 0
37+
38+
def add_trace(self, trace):
39+
"""
40+
Encode and append a trace to this payload
41+
42+
:param trace: A trace to append
43+
:type trace: A list of ``ddtrace.span.Span``s
44+
"""
45+
# No trace or empty trace was given, ignore
46+
if not trace:
47+
return
48+
49+
# Encode the trace, append, and add it's length to the size
50+
encoded = self.encoder.encode_trace(trace)
51+
self.traces.append(encoded)
52+
self.size += len(encoded)
53+
54+
@property
55+
def length(self):
56+
"""
57+
Get the number of traces in this payload
58+
59+
:returns: The number of traces in the payload
60+
:rtype: int
61+
"""
62+
return len(self.traces)
63+
64+
@property
65+
def empty(self):
66+
"""
67+
Whether this payload is empty or not
68+
69+
:returns: Whether this payload is empty or not
70+
:rtype: bool
71+
"""
72+
return self.length == 0
73+
74+
@property
75+
def full(self):
76+
"""
77+
Whether this payload is at or over the max allowed payload size
78+
79+
:returns: Whether we have reached the max payload size yet or not
80+
:rtype: bool
81+
"""
82+
return self.size >= self.max_payload_size
83+
84+
def get_payload(self):
85+
"""
86+
Get the fully encoded payload
87+
88+
:returns: The fully encoded payload
89+
:rtype: str | bytes
90+
"""
91+
# DEV: `self.traces` is an array of encoded traces, `join_encoded` joins them together
92+
return self.encoder.join_encoded(self.traces)
93+
94+
def __repr__(self):
95+
"""Get the string representation of this payload"""
96+
return '{0}(length={1}, size={2}b, full={3})'.format(self.__class__.__name__, self.length, self.size, self.full)

tests/test_encoders.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,42 @@ def test_encode_traces_json(self):
3939
for j in range(2):
4040
eq_('client.testing', items[i][j]['name'])
4141

42+
def test_join_encoded_json(self):
43+
# test encoding for JSON format
44+
traces = []
45+
traces.append([
46+
Span(name='client.testing', tracer=None),
47+
Span(name='client.testing', tracer=None),
48+
])
49+
traces.append([
50+
Span(name='client.testing', tracer=None),
51+
Span(name='client.testing', tracer=None),
52+
])
53+
54+
encoder = JSONEncoder()
55+
56+
# Encode each trace on it's own
57+
encoded_traces = [
58+
encoder.encode_trace(trace)
59+
for trace in traces
60+
]
61+
62+
# Join the encoded traces together
63+
data = encoder.join_encoded(encoded_traces)
64+
65+
# Parse the resulting data
66+
items = json.loads(data)
67+
68+
# test the encoded output that should be a string
69+
# and the output must be flatten
70+
ok_(isinstance(data, string_type))
71+
eq_(len(items), 2)
72+
eq_(len(items[0]), 2)
73+
eq_(len(items[1]), 2)
74+
for i in range(2):
75+
for j in range(2):
76+
eq_('client.testing', items[i][j]['name'])
77+
4278
def test_encode_traces_msgpack(self):
4379
# test encoding for MsgPack format
4480
traces = []
@@ -64,3 +100,38 @@ def test_encode_traces_msgpack(self):
64100
for i in range(2):
65101
for j in range(2):
66102
eq_(b'client.testing', items[i][j][b'name'])
103+
104+
def test_join_encoded_msgpack(self):
105+
# test encoding for MsgPack format
106+
traces = []
107+
traces.append([
108+
Span(name='client.testing', tracer=None),
109+
Span(name='client.testing', tracer=None),
110+
])
111+
traces.append([
112+
Span(name='client.testing', tracer=None),
113+
Span(name='client.testing', tracer=None),
114+
])
115+
116+
encoder = MsgpackEncoder()
117+
118+
# Encode each individual trace on it's own
119+
encoded_traces = [
120+
encoder.encode_trace(trace)
121+
for trace in traces
122+
]
123+
# Join the encoded traces together
124+
data = encoder.join_encoded(encoded_traces)
125+
126+
# Parse the encoded data
127+
items = msgpack.unpackb(data)
128+
129+
# test the encoded output that should be a string
130+
# and the output must be flatten
131+
ok_(isinstance(data, msgpack_type))
132+
eq_(len(items), 2)
133+
eq_(len(items[0]), 2)
134+
eq_(len(items[1]), 2)
135+
for i in range(2):
136+
for j in range(2):
137+
eq_(b'client.testing', items[i][j][b'name'])

0 commit comments

Comments
 (0)