Skip to content

Commit 8518b0f

Browse files
author
Emanuele Palazzetti
authored
Merge pull request #123 from palazzem/msgpack-api
[core] provide support for trace-agent API v0.3
2 parents 020e9d7 + 40cd4d1 commit 8518b0f

File tree

5 files changed

+116
-47
lines changed

5 files changed

+116
-47
lines changed

ddtrace/api.py

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import time
44

55
# project
6-
from .encoding import get_encoder
6+
from .encoding import get_encoder, JSONEncoder
77
from .compat import httplib
88

99

@@ -14,43 +14,66 @@ class API(object):
1414
"""
1515
Send data to the trace agent using the HTTP protocol and JSON format
1616
"""
17-
def __init__(self, hostname, port, wait_response=False, headers=None, encoder=None):
17+
def __init__(self, hostname, port, headers=None, encoder=None):
1818
self.hostname = hostname
1919
self.port = port
20+
self._traces = '/v0.3/traces'
21+
self._services = '/v0.3/services'
22+
self._compatibility_mode = False
2023
self._encoder = encoder or get_encoder()
21-
self._wait_response = wait_response
2224

2325
# overwrite the Content-type with the one chosen in the Encoder
2426
self._headers = headers or {}
2527
self._headers.update({'Content-Type': self._encoder.content_type})
2628

29+
def _downgrade(self):
30+
"""
31+
Downgrades the used encoder and API level. This method must fallback to a safe
32+
encoder and API, so that it will success despite users' configurations. This action
33+
ensures that the compatibility mode is activated so that the downgrade will be
34+
executed only once.
35+
"""
36+
self._compatibility_mode = True
37+
self._traces = '/v0.2/traces'
38+
self._services = '/v0.2/services'
39+
self._encoder = JSONEncoder()
40+
self._headers.update({'Content-Type': self._encoder.content_type})
41+
2742
def send_traces(self, traces):
2843
if not traces:
2944
return
3045
start = time.time()
3146
data = self._encoder.encode_traces(traces)
32-
response = self._send_span_data(data)
47+
response = self._put(self._traces, data)
48+
49+
# the API endpoint is not available so we should downgrade the connection and re-try the call
50+
if response.status in [404, 415] and self._compatibility_mode is False:
51+
log.debug('calling the endpoint "%s" but received %s; downgrading the API', self._traces, response.status)
52+
self._downgrade()
53+
return self.send_traces(traces)
54+
3355
log.debug("reported %d spans in %.5fs", len(traces), time.time() - start)
3456
return response
3557

3658
def send_services(self, services):
3759
if not services:
3860
return
39-
log.debug("Reporting %d services", len(services))
4061
s = {}
4162
for service in services:
4263
s.update(service)
4364
data = self._encoder.encode_services(s)
44-
return self._put("/v0.2/services", data)
65+
response = self._put(self._services, data)
4566

46-
def _send_span_data(self, data):
47-
return self._put("/v0.2/traces", data)
67+
# the API endpoint is not available so we should downgrade the connection and re-try the call
68+
if response.status in [404, 415] and self._compatibility_mode is False:
69+
log.debug('calling the endpoint "%s" but received 404; downgrading the API', self._services)
70+
self._downgrade()
71+
return self.send_services(services)
72+
73+
log.debug("reported %d services", len(services))
74+
return response
4875

4976
def _put(self, endpoint, data):
5077
conn = httplib.HTTPConnection(self.hostname, self.port)
5178
conn.request("PUT", endpoint, data, self._headers)
52-
53-
# read the server response only if the
54-
# API object is configured to do so
55-
if self._wait_response:
56-
return conn.getresponse()
79+
return conn.getresponse()

ddtrace/encoding.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
1-
import os
21
import json
32
import logging
43

54

65
# check msgpack CPP implementation; if the import fails, we're using the
76
# pure Python implementation that is really slow, so the ``Encoder`` should use
8-
# a different encoding format. To enable msgpack encoding, you should set
9-
# the ``DD_MSGPACK_ENCODING=1`` environment variable otherwise, the ``JSONEncoder``
10-
# will be used as a default.
7+
# a different encoding format.
118
try:
129
import msgpack
1310
from msgpack._packer import Packer # noqa
1411
from msgpack._unpacker import unpack, unpackb, Unpacker # noqa
15-
MSGPACK_ENCODING = os.getenv('DD_MSGPACK_ENCODING') == '1' # shortcut to accept only '1'
12+
MSGPACK_ENCODING = True
1613
except ImportError:
1714
MSGPACK_ENCODING = False
1815

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def run_tests(self):
5858
packages=find_packages(exclude=['tests*']),
5959
install_requires=[
6060
"wrapt",
61+
"msgpack-python",
6162
],
6263
# plugin tox
6364
tests_require=['tox', 'flake8'],

tests/test_integration.py

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from ddtrace.api import API
1111
from ddtrace.span import Span
1212
from ddtrace.tracer import Tracer
13-
from ddtrace.encoding import JSONEncoder, MsgpackEncoder
13+
from ddtrace.encoding import JSONEncoder, MsgpackEncoder, get_encoder
1414
from tests.test_tracer import get_dummy_tracer
1515

1616

@@ -67,7 +67,7 @@ def test_worker_single_trace(self):
6767
# check arguments
6868
endpoint = self.api._put.call_args[0][0]
6969
payload = self._decode(self.api._put.call_args[0][1])
70-
eq_(endpoint, '/v0.2/traces')
70+
eq_(endpoint, '/v0.3/traces')
7171
eq_(len(payload), 1)
7272
eq_(len(payload[0]), 1)
7373
eq_(payload[0][0]['name'], 'client.testing')
@@ -84,7 +84,7 @@ def test_worker_multiple_traces(self):
8484
# check arguments
8585
endpoint = self.api._put.call_args[0][0]
8686
payload = self._decode(self.api._put.call_args[0][1])
87-
eq_(endpoint, '/v0.2/traces')
87+
eq_(endpoint, '/v0.3/traces')
8888
eq_(len(payload), 2)
8989
eq_(len(payload[0]), 1)
9090
eq_(len(payload[1]), 1)
@@ -104,7 +104,7 @@ def test_worker_single_trace_multiple_spans(self):
104104
# check arguments
105105
endpoint = self.api._put.call_args[0][0]
106106
payload = self._decode(self.api._put.call_args[0][1])
107-
eq_(endpoint, '/v0.2/traces')
107+
eq_(endpoint, '/v0.3/traces')
108108
eq_(len(payload), 1)
109109
eq_(len(payload[0]), 2)
110110
eq_(payload[0][0]['name'], 'client.testing')
@@ -122,7 +122,7 @@ def test_worker_single_service(self):
122122
# check arguments
123123
endpoint = self.api._put.call_args[0][0]
124124
payload = self._decode(self.api._put.call_args[0][1])
125-
eq_(endpoint, '/v0.2/services')
125+
eq_(endpoint, '/v0.3/services')
126126
eq_(len(payload.keys()), 1)
127127
eq_(payload['client.service'], {'app': 'django', 'app_type': 'web'})
128128

@@ -139,7 +139,7 @@ def test_worker_service_called_multiple_times(self):
139139
# check arguments
140140
endpoint = self.api._put.call_args[0][0]
141141
payload = self._decode(self.api._put.call_args[0][1])
142-
eq_(endpoint, '/v0.2/services')
142+
eq_(endpoint, '/v0.3/services')
143143
eq_(len(payload.keys()), 2)
144144
eq_(payload['backend'], {'app': 'django', 'app_type': 'web'})
145145
eq_(payload['database'], {'app': 'postgres', 'app_type': 'db'})
@@ -161,8 +161,8 @@ def setUp(self):
161161
"""
162162
# create a new API object to test the transport using synchronous calls
163163
self.tracer = get_dummy_tracer()
164-
self.api_json = API('localhost', 7777, wait_response=True, encoder=JSONEncoder())
165-
self.api_msgpack = API('localhost', 7777, wait_response=True, encoder=MsgpackEncoder())
164+
self.api_json = API('localhost', 7777, encoder=JSONEncoder())
165+
self.api_msgpack = API('localhost', 7777, encoder=MsgpackEncoder())
166166

167167
def test_send_single_trace(self):
168168
# register a single trace with a span and send them to the trace agent
@@ -278,3 +278,44 @@ def test_send_service_called_multiple_times(self):
278278
response = self.api_msgpack.send_services(services)
279279
ok_(response)
280280
eq_(response.status, 200)
281+
282+
@skipUnless(
283+
os.environ.get('TEST_DATADOG_INTEGRATION', False),
284+
'You should have a running trace agent and set TEST_DATADOG_INTEGRATION=1 env variable'
285+
)
286+
class TestAPIDowngrade(TestCase):
287+
"""
288+
Ensures that if the tracing client found an earlier trace agent,
289+
it will downgrade the current connection to a stable API version
290+
"""
291+
def test_get_encoder_default(self):
292+
# get_encoder should return MsgpackEncoder instance if
293+
# msgpack and the CPP implementaiton are available
294+
encoder = get_encoder()
295+
ok_(isinstance(encoder, MsgpackEncoder))
296+
297+
@mock.patch('ddtrace.encoding.MSGPACK_ENCODING', False)
298+
def test_get_encoder_fallback(self):
299+
# get_encoder should return JSONEncoder instance if
300+
# msgpack or the CPP implementaiton, are not available
301+
encoder = get_encoder()
302+
ok_(isinstance(encoder, JSONEncoder))
303+
304+
def test_downgrade_api(self):
305+
# make a call to a not existing endpoint, downgrades
306+
# the current API to a stable one
307+
tracer = get_dummy_tracer()
308+
tracer.trace('client.testing').finish()
309+
trace = tracer.writer.pop()
310+
311+
# the encoder is right but we're targeting an API
312+
# endpoint that is not available
313+
api = API('localhost', 7777)
314+
api._traces = '/v0.0/traces'
315+
ok_(isinstance(api._encoder, MsgpackEncoder))
316+
317+
# after the call, we downgrade to a working endpoint
318+
response = api.send_traces([trace])
319+
ok_(response)
320+
eq_(response.status, 200)
321+
ok_(isinstance(api._encoder, JSONEncoder))

tox.ini

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
envlist =
1010
flake8
1111
wait
12+
13+
{py27,py34}-tracer
14+
{py27,py34}-integration
15+
{py27,py34}-contrib
1216
{py27,py34}-bottle{12}-webtest
1317
{py27,py34}-cassandra
1418
{py27,py34}-elasticsearch{23}
@@ -25,7 +29,6 @@ envlist =
2529
{py27,py34}-sqlalchemy{10,11}-psycopg2
2630
{py27,py34}-redis
2731
{py27,py34}-sqlite3
28-
{py27,py34}-all
2932

3033
[testenv]
3134
basepython =
@@ -36,25 +39,25 @@ deps =
3639
# test dependencies installed in all envs
3740
mock
3841
nose
39-
msgpack-python<0.4.9
42+
msgpack-python
4043
# integrations
41-
all: blinker
42-
all: bottle
43-
all: cassandra-driver
44-
all: elasticsearch
45-
all: falcon
46-
all: flask
47-
all: flask_cache
48-
all: mongoengine
49-
all: mysql-connector
50-
all: psycopg2
51-
all: pylibmc
52-
all: pymongo
53-
all: python-memcached
54-
all: redis
55-
all: requests
56-
all: sqlalchemy
57-
all: WebTest
44+
contrib: blinker
45+
contrib: bottle
46+
contrib: cassandra-driver
47+
contrib: elasticsearch
48+
contrib: falcon
49+
contrib: flask
50+
contrib: flask_cache
51+
contrib: mongoengine
52+
contrib: mysql-connector
53+
contrib: psycopg2
54+
contrib: pylibmc
55+
contrib: pymongo
56+
contrib: python-memcached
57+
contrib: redis
58+
contrib: requests
59+
contrib: sqlalchemy
60+
contrib: WebTest
5861
blinker: blinker
5962
bottle12: bottle>=0.12
6063
cassandra: cassandra-driver
@@ -98,8 +101,12 @@ passenv=TEST_*
98101
commands =
99102
# wait for services script
100103
{py34}-wait: python tests/wait-for-services.py
104+
# run only essential tests related to the tracing client
105+
{py27,py34}-tracer: nosetests {posargs} --exclude=".*(contrib|integration).*" tests/
106+
# integration tests
107+
{py27,py34}-integration: nosetests {posargs} tests/test_integration.py
101108
# run all tests for the release jobs except the ones with a different test runner
102-
{py27,py34}-all: nosetests {posargs} --exclude=".*(django).*"
109+
{py27,py34}-contrib: nosetests {posargs} --exclude=".*(django).*" tests/contrib/
103110
# run subsets of the tests for particular library versions
104111
{py27,py34}-bottle{12}: nosetests {posargs} tests/contrib/bottle/
105112
{py27,py34}-cassandra: nosetests {posargs} tests/contrib/cassandra
@@ -137,5 +144,5 @@ basepython=python
137144

138145
[flake8]
139146
ignore=W391,E231,E201,E202,E203,E261,E302,E128,E126,E124
140-
max-line-length=100
147+
max-line-length=120
141148
exclude = tests

0 commit comments

Comments
 (0)