Skip to content

Commit b62d75d

Browse files
authored
Improved Asynqp instrumentation (#109)
* Case insensitive context extraction; tests * If no context in carrier, return None by default. * Add test to validate behaviour without context * More context related tests. * Use asyncio context propogation * Make an async tracer available. * More extensive tests. * Update Travis * Fix travis postgres auth * Create DBs before test run * Don't instrument asynqp in Python versions < 3.4 * Use nosetests runner * Fix test urls * Limit redis version in tests * Add mysql to travis tests * Exclude asynqp tests from python versions < 3.5 because: https://stackoverflow.com/questions/48606389/exception-there-is-no-current-event-loop-in-thread-mainthread-while-runnin
1 parent d88afa5 commit b62d75d

File tree

16 files changed

+395
-158
lines changed

16 files changed

+395
-158
lines changed

.travis.yml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,19 @@ python:
99
before_install:
1010
- "pip install --upgrade pip"
1111
- "pip install --upgrade setuptools"
12-
- "mysql -e 'CREATE DATABASE travis_ci_test;'"
12+
13+
before_script:
14+
- psql -c 'create database travis_ci_test;' -U postgres
15+
- mysql -e 'CREATE DATABASE travis_ci_test;'
1316

1417
install: "pip install -r requirements-test.txt"
1518

1619
sudo: required
1720

1821
services:
22+
- mysql
23+
- postgresql
1924
- rabbitmq
25+
- redis
2026

2127
script: python runtests.py

instana/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import absolute_import
22

33
import os
4+
import sys
45
import pkg_resources
56
from threading import Timer
67

@@ -59,7 +60,8 @@ def load(module):
5960
def load_instrumentation():
6061
if "INSTANA_DISABLE_AUTO_INSTR" not in os.environ:
6162
# Import & initialize instrumentation
62-
from .instrumentation import asynqp # noqa
63+
if sys.version_info >= (3, 4):
64+
from .instrumentation import asynqp # noqa
6365
from .instrumentation import mysqlpython # noqa
6466
from .instrumentation import redis # noqa
6567
from .instrumentation import sqlalchemy # noqa

instana/http_propagator.py

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,15 @@ class HTTPPropagator():
2525
HEADER_KEY_T = 'X-Instana-T'
2626
HEADER_KEY_S = 'X-Instana-S'
2727
HEADER_KEY_L = 'X-Instana-L'
28+
LC_HEADER_KEY_T = 'x-instana-t'
29+
LC_HEADER_KEY_S = 'x-instana-s'
30+
LC_HEADER_KEY_L = 'x-instana-l'
2831
ALT_HEADER_KEY_T = 'HTTP_X_INSTANA_T'
2932
ALT_HEADER_KEY_S = 'HTTP_X_INSTANA_S'
3033
ALT_HEADER_KEY_L = 'HTTP_X_INSTANA_L'
34+
ALT_LC_HEADER_KEY_T = 'http_x_instana_t'
35+
ALT_LC_HEADER_KEY_S = 'http_x_instana_s'
36+
ALT_LC_HEADER_KEY_L = 'http_x_instana_l'
3137

3238
def inject(self, span_context, carrier):
3339
try:
@@ -49,6 +55,9 @@ def inject(self, span_context, carrier):
4955
logger.debug("inject error: ", str(e))
5056

5157
def extract(self, carrier): # noqa
58+
trace_id = None
59+
span_id = None
60+
5261
try:
5362
if type(carrier) is dict or hasattr(carrier, "__dict__"):
5463
dc = carrier
@@ -57,20 +66,28 @@ def extract(self, carrier): # noqa
5766
else:
5867
raise ot.SpanContextCorruptedException()
5968

60-
# Look for standard X-Instana-T/S format
61-
if self.HEADER_KEY_T in dc and self.HEADER_KEY_S in dc:
62-
trace_id = header_to_id(dc[self.HEADER_KEY_T])
63-
span_id = header_to_id(dc[self.HEADER_KEY_S])
69+
# Headers can exist in the standard X-Instana-T/S format or the alternate HTTP_X_INSTANA_T/S style
70+
# We do a case insensitive search to cover all possible variations of incoming headers.
71+
for key in dc.keys():
72+
lc_key = key.lower()
73+
74+
if self.LC_HEADER_KEY_T == lc_key:
75+
trace_id = header_to_id(dc[key])
76+
elif self.LC_HEADER_KEY_S == lc_key:
77+
span_id = header_to_id(dc[key])
6478

65-
# Alternatively check for alternate HTTP_X_INSTANA_T/S style
66-
elif self.ALT_HEADER_KEY_T in dc and self.ALT_HEADER_KEY_S in dc:
67-
trace_id = header_to_id(dc[self.ALT_HEADER_KEY_T])
68-
span_id = header_to_id(dc[self.ALT_HEADER_KEY_S])
79+
elif self.ALT_LC_HEADER_KEY_T == lc_key:
80+
trace_id = header_to_id(dc[key])
81+
elif self.ALT_LC_HEADER_KEY_S == lc_key:
82+
span_id = header_to_id(dc[key])
6983

70-
return SpanContext(span_id=span_id,
71-
trace_id=trace_id,
72-
baggage={},
73-
sampled=True)
84+
ctx = None
85+
if trace_id is not None and span_id is not None:
86+
ctx = SpanContext(span_id=span_id,
87+
trace_id=trace_id,
88+
baggage={},
89+
sampled=True)
90+
return ctx
7491

7592
except Exception as e:
7693
logger.debug("extract error: ", str(e))

instana/instrumentation/asynqp.py

Lines changed: 89 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -3,97 +3,95 @@
33
import sys
44

55
import opentracing
6-
import opentracing.ext.tags as ext
76
import wrapt
87

98
from ..log import logger
10-
from ..singletons import tracer
11-
12-
if sys.version_info >= (3,4):
13-
try:
14-
import asynqp
15-
16-
@wrapt.patch_function_wrapper('asynqp.exchange','Exchange.publish')
17-
def publish_with_instana(wrapped, instance, args, kwargs):
18-
parent_span = tracer.active_span
19-
20-
# If we're not tracing, just return
21-
if parent_span is None:
22-
return wrapped(*args, **kwargs)
23-
24-
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
25-
host, port = instance.sender.protocol.transport._sock.getsockname()
26-
27-
msg = args[0]
28-
if msg.headers is None:
29-
msg.headers = {}
30-
tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, msg.headers)
31-
32-
try:
33-
scope.span.set_tag("exchange", instance.name)
34-
scope.span.set_tag("sort", "publish")
35-
scope.span.set_tag("address", host + ":" + str(port) )
36-
scope.span.set_tag("key", args[1])
37-
38-
rv = wrapped(*args, **kwargs)
39-
except Exception as e:
40-
scope.span.log_kv({'message': e})
41-
scope.span.set_tag("error", True)
42-
ec = scope.span.tags.get('ec', 0)
43-
scope.span.set_tag("ec", ec+1)
44-
raise
45-
else:
46-
return rv
47-
48-
@wrapt.patch_function_wrapper('asynqp.queue','Queue.get')
49-
def get_with_instana(wrapped, instance, args, kwargs):
50-
parent_span = tracer.active_span
51-
52-
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
53-
host, port = instance.sender.protocol.transport._sock.getsockname()
54-
55-
try:
56-
scope.span.set_tag("queue", instance.name)
57-
scope.span.set_tag("sort", "consume")
58-
scope.span.set_tag("address", host + ":" + str(port) )
59-
60-
rv = wrapped(*args, **kwargs)
61-
except Exception as e:
62-
scope.span.log_kv({'message': e})
63-
scope.span.set_tag("error", True)
64-
ec = scope.span.tags.get('ec', 0)
65-
scope.span.set_tag("ec", ec+1)
66-
raise
67-
else:
68-
return rv
69-
70-
@wrapt.patch_function_wrapper('asynqp.queue','Consumers.deliver')
71-
def deliver_with_instana(wrapped, instance, args, kwargs):
72-
73-
ctx = None
74-
msg = args[1]
75-
if 'X-Instana-T' in msg.headers and 'X-Instana-S' in msg.headers:
76-
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers))
77-
78-
with tracer.start_active_span("rabbitmq", child_of=ctx) as scope:
79-
host, port = args[1].sender.protocol.transport._sock.getsockname()
80-
81-
try:
82-
scope.span.set_tag("exchange", msg.exchange_name)
83-
scope.span.set_tag("sort", "consume")
84-
scope.span.set_tag("address", host + ":" + str(port) )
85-
scope.span.set_tag("key", msg.routing_key)
86-
87-
rv = wrapped(*args, **kwargs)
88-
except Exception as e:
89-
scope.span.log_kv({'message': e})
90-
scope.span.set_tag("error", True)
91-
ec = scope.span.tags.get('ec', 0)
92-
scope.span.set_tag("ec", ec+1)
93-
raise
94-
else:
95-
return rv
96-
97-
logger.debug("Instrumenting asynqp")
98-
except ImportError:
99-
pass
9+
from ..singletons import async_tracer
10+
11+
try:
12+
import asynqp
13+
14+
@wrapt.patch_function_wrapper('asynqp.exchange','Exchange.publish')
15+
def publish_with_instana(wrapped, instance, argv, kwargs):
16+
parent_span = async_tracer.active_span
17+
18+
# If we're not tracing, just return
19+
if parent_span is None:
20+
return wrapped(*argv, **kwargs)
21+
22+
with async_tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
23+
host, port = instance.sender.protocol.transport._sock.getsockname()
24+
25+
msg = argv[0]
26+
if msg.headers is None:
27+
msg.headers = {}
28+
async_tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, msg.headers)
29+
30+
try:
31+
scope.span.set_tag("exchange", instance.name)
32+
scope.span.set_tag("sort", "publish")
33+
scope.span.set_tag("address", host + ":" + str(port) )
34+
scope.span.set_tag("key", argv[1])
35+
36+
rv = wrapped(*argv, **kwargs)
37+
except Exception as e:
38+
scope.span.log_kv({'message': e})
39+
scope.span.set_tag("error", True)
40+
ec = scope.span.tags.get('ec', 0)
41+
scope.span.set_tag("ec", ec+1)
42+
raise
43+
else:
44+
return rv
45+
46+
@wrapt.patch_function_wrapper('asynqp.queue','Queue.get')
47+
def get_with_instana(wrapped, instance, argv, kwargs):
48+
parent_span = async_tracer.active_span
49+
50+
# If we're not tracing, just return
51+
if parent_span is None:
52+
return wrapped(*argv, **kwargs)
53+
54+
with async_tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
55+
host, port = instance.sender.protocol.transport._sock.getsockname()
56+
57+
scope.span.set_tag("sort", "consume")
58+
scope.span.set_tag("address", host + ":" + str(port) )
59+
60+
msg = yield from wrapped(*argv, **kwargs)
61+
62+
if msg is not None:
63+
scope.span.set_tag("queue", instance.name)
64+
scope.span.set_tag("key", msg.routing_key)
65+
66+
return msg
67+
68+
@wrapt.patch_function_wrapper('asynqp.queue','Consumers.deliver')
69+
def deliver_with_instana(wrapped, instance, argv, kwargs):
70+
71+
ctx = None
72+
msg = argv[1]
73+
if msg.headers is not None:
74+
ctx = async_tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers))
75+
76+
with async_tracer.start_active_span("rabbitmq", child_of=ctx) as scope:
77+
host, port = argv[1].sender.protocol.transport._sock.getsockname()
78+
79+
try:
80+
scope.span.set_tag("exchange", msg.exchange_name)
81+
scope.span.set_tag("sort", "consume")
82+
scope.span.set_tag("address", host + ":" + str(port) )
83+
scope.span.set_tag("key", msg.routing_key)
84+
85+
rv = wrapped(*argv, **kwargs)
86+
except Exception as e:
87+
scope.span.log_kv({'message': e})
88+
scope.span.set_tag("error", True)
89+
ec = scope.span.tags.get('ec', 0)
90+
scope.span.set_tag("ec", ec+1)
91+
raise
92+
else:
93+
return rv
94+
95+
logger.debug("Instrumenting asynqp")
96+
except ImportError:
97+
pass

instana/instrumentation/django/middleware.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,8 @@ def __init__(self, get_response=None):
2727
def process_request(self, request):
2828
try:
2929
env = request.environ
30-
ctx = None
31-
32-
if 'HTTP_X_INSTANA_T' in env and 'HTTP_X_INSTANA_S' in env:
33-
ctx = tracer.extract(ot.Format.HTTP_HEADERS, env)
3430

31+
ctx = tracer.extract(ot.Format.HTTP_HEADERS, env)
3532
request.iscope = tracer.start_active_span('django', child_of=ctx)
3633

3734
if agent.extra_headers is not None:

instana/singletons.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import sys
12
import opentracing
23

34
from .agent import Agent # noqa
@@ -17,5 +18,9 @@
1718
#
1819
tracer = InstanaTracer()
1920

21+
if sys.version_info >= (3,4):
22+
from opentracing.scope_managers.asyncio import AsyncioScopeManager
23+
async_tracer = InstanaTracer(AsyncioScopeManager())
24+
2025
# Set ourselves as the tracer.
2126
opentracing.tracer = tracer

instana/util.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
import time
99

1010
import pkg_resources
11-
from urllib import parse
1211

12+
try:
13+
from urllib import parse
14+
except ImportError:
15+
from urlparse import urlparse as parse
1316
from .log import logger
1417

1518

instana/wsgi.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@ def new_start_response(status, headers, exc_info=None):
3232
self.scope.close()
3333
return res
3434

35-
ctx = None
36-
if 'HTTP_X_INSTANA_T' in env and 'HTTP_X_INSTANA_S' in env:
37-
ctx = tracer.extract(ot.Format.HTTP_HEADERS, env)
38-
35+
ctx = tracer.extract(ot.Format.HTTP_HEADERS, env)
3936
self.scope = tracer.start_active_span("wsgi", child_of=ctx)
4037

4138
if agent.extra_headers is not None:

runtests.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,15 @@
22
import nose
33
from distutils.version import LooseVersion
44

5-
args = ['nosetests', '-v']
5+
command_line = ['-v']
66

7-
if (LooseVersion(sys.version) <= LooseVersion('3.5')):
8-
args.extend(['-e', 'asynqp'])
7+
if (LooseVersion(sys.version) < LooseVersion('3.5')):
8+
command_line.extend(['-e', 'asynqp'])
99

10-
result = nose.run(argv=args)
10+
print("Nose arguments: %s" % command_line)
11+
result = nose.run(argv=command_line)
12+
13+
if result is True:
14+
exit(0)
15+
else:
16+
exit(-1)

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def check_setuptools():
4747
},
4848
extras_require={
4949
'test': [
50-
'asynqp>=0.4',
50+
'asynqp>=0.4;python_version>="3.4"',
5151
'django>=1.11',
5252
'nose>=1.0',
5353
'flask>=0.12.2',
@@ -57,7 +57,7 @@ def check_setuptools():
5757
'psycopg2>=2.7.1',
5858
'pyOpenSSL>=16.1.0;python_version<="2.7"',
5959
'pytest>=3.0.1',
60-
'redis>=2.10.6',
60+
'redis<3.0.0',
6161
'requests>=2.17.1',
6262
'sqlalchemy>=1.1.15',
6363
'spyne>=2.9',

0 commit comments

Comments
 (0)