Skip to content

Commit 08cc70a

Browse files
authored
RabbitMQ: asynqp Instrumentation (#101)
* Add Rabbitmq to Travis tests * Initial asynqp instrumentation & tests. * RabbitMQ json span support; more tests * Default to localhost for host; Purgue queue between tests * Don't test asynqp on Python versions prior to 3.5 * Add missing packages * Switch to manually configured nosetest runs * Add stack traces to publish/consume
1 parent df83bc9 commit 08cc70a

File tree

11 files changed

+353
-27
lines changed

11 files changed

+353
-27
lines changed

.env-test

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export RABBITMQ_HOST="192.168.201.129"

.travis.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,9 @@ before_install:
1313

1414
install: "pip install -r requirements-test.txt"
1515

16+
sudo: required
1617

17-
script: nosetests -v
18+
services:
19+
- rabbitmq
20+
21+
script: python runtests.py

instana/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def load(module):
5959
def load_instrumentation():
6060
if "INSTANA_DISABLE_AUTO_INSTR" not in os.environ:
6161
# Import & initialize instrumentation
62+
from .instrumentation import asynqp # noqa
6263
from .instrumentation import urllib3 # noqa
6364
from .instrumentation import sudsjurko # noqa
6465
from .instrumentation import mysqlpython # noqa

instana/http_propagator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def extract(self, carrier): # noqa
5858
raise ot.SpanContextCorruptedException()
5959

6060
# Look for standard X-Instana-T/S format
61-
if self.HEADER_KEY_T in dc and self.header_key_s in dc:
61+
if self.HEADER_KEY_T in dc and self.HEADER_KEY_S in dc:
6262
trace_id = header_to_id(dc[self.HEADER_KEY_T])
6363
span_id = header_to_id(dc[self.HEADER_KEY_S])
6464

instana/instrumentation/asynqp.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
from __future__ import absolute_import
2+
3+
import opentracing
4+
import opentracing.ext.tags as ext
5+
import wrapt
6+
7+
from ..log import logger
8+
from ..singletons import tracer
9+
10+
try:
11+
import asyncio
12+
import asynqp
13+
14+
@wrapt.patch_function_wrapper('asynqp.exchange','Exchange.publish')
15+
def publish_with_instana(wrapped, instance, args, kwargs):
16+
parent_span = tracer.active_span
17+
18+
# If we're not tracing, just return
19+
if parent_span is None:
20+
return wrapped(*args, **kwargs)
21+
22+
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
23+
host, port = instance.sender.protocol.transport._sock.getsockname()
24+
25+
msg = args[0]
26+
if msg.headers is None:
27+
msg.headers = {}
28+
tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, msg.headers)
29+
30+
try:
31+
scope.span.set_tag("exchange", instance.name)
32+
scope.span.set_tag("sort", "publish")
33+
scope.span.set_tag("address", host + ":" + str(port) )
34+
scope.span.set_tag("key", args[1])
35+
36+
rv = wrapped(*args, **kwargs)
37+
except Exception as e:
38+
scope.span.log_kv({'message': e})
39+
scope.span.set_tag("error", True)
40+
ec = scope.span.tags.get('ec', 0)
41+
scope.span.set_tag("ec", ec+1)
42+
raise
43+
else:
44+
return rv
45+
46+
@wrapt.patch_function_wrapper('asynqp.queue','Queue.get')
47+
def get_with_instana(wrapped, instance, args, kwargs):
48+
parent_span = tracer.active_span
49+
50+
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
51+
host, port = instance.sender.protocol.transport._sock.getsockname()
52+
53+
try:
54+
scope.span.set_tag("queue", instance.name)
55+
scope.span.set_tag("sort", "consume")
56+
scope.span.set_tag("address", host + ":" + str(port) )
57+
58+
rv = wrapped(*args, **kwargs)
59+
except Exception as e:
60+
scope.span.log_kv({'message': e})
61+
scope.span.set_tag("error", True)
62+
ec = scope.span.tags.get('ec', 0)
63+
scope.span.set_tag("ec", ec+1)
64+
raise
65+
else:
66+
return rv
67+
68+
@wrapt.patch_function_wrapper('asynqp.queue','Consumers.deliver')
69+
def deliver_with_instana(wrapped, instance, args, kwargs):
70+
71+
ctx = None
72+
msg = args[1]
73+
if 'X-Instana-T' in msg.headers and 'X-Instana-S' in msg.headers:
74+
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers))
75+
76+
with tracer.start_active_span("rabbitmq", child_of=ctx) as scope:
77+
host, port = args[1].sender.protocol.transport._sock.getsockname()
78+
79+
try:
80+
scope.span.set_tag("exchange", msg.exchange_name)
81+
scope.span.set_tag("sort", "consume")
82+
scope.span.set_tag("address", host + ":" + str(port) )
83+
scope.span.set_tag("key", msg.routing_key)
84+
85+
rv = wrapped(*args, **kwargs)
86+
except Exception as e:
87+
scope.span.log_kv({'message': e})
88+
scope.span.set_tag("error", True)
89+
ec = scope.span.tags.get('ec', 0)
90+
scope.span.set_tag("ec", ec+1)
91+
raise
92+
else:
93+
return rv
94+
95+
logger.debug("Instrumenting asynqp")
96+
except ImportError:
97+
pass

instana/json_span.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,50 +17,62 @@ def __init__(self, **kwds):
1717
self.__dict__[key] = kwds[key]
1818

1919

20+
class CustomData(object):
21+
tags = None
22+
logs = None
23+
24+
def __init__(self, **kwds):
25+
self.__dict__.update(kwds)
26+
27+
2028
class Data(object):
2129
service = None
2230
http = None
2331
baggage = None
2432
custom = None
2533
sdk = None
2634
soap = None
35+
rabbitmq = None
2736

2837
def __init__(self, **kwds):
2938
self.__dict__.update(kwds)
3039

3140

32-
class MySQLData(object):
33-
db = None
41+
class HttpData(object):
3442
host = None
35-
user = None
36-
stmt = None
43+
url = None
44+
status = 0
45+
method = None
3746
error = None
3847

3948
def __init__(self, **kwds):
4049
self.__dict__.update(kwds)
4150

4251

43-
class HttpData(object):
52+
class MySQLData(object):
53+
db = None
4454
host = None
45-
url = None
46-
status = 0
47-
method = None
55+
user = None
56+
stmt = None
4857
error = None
4958

5059
def __init__(self, **kwds):
5160
self.__dict__.update(kwds)
5261

5362

54-
class SoapData(object):
55-
action = None
63+
class RabbitmqData(object):
64+
exchange = None
65+
queue = None
66+
sort = None
67+
address = None
68+
key = None
5669

5770
def __init__(self, **kwds):
5871
self.__dict__.update(kwds)
5972

6073

61-
class CustomData(object):
62-
tags = None
63-
logs = None
74+
class SoapData(object):
75+
action = None
6476

6577
def __init__(self, **kwds):
6678
self.__dict__.update(kwds)

instana/recorder.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import instana.singletons
1313

1414
from .json_span import (CustomData, Data, HttpData, JsonSpan, MySQLData,
15-
SDKData, SoapData)
15+
RabbitmqData, SDKData, SoapData)
1616
from .log import logger
1717

1818
if sys.version_info.major is 2:
@@ -22,12 +22,12 @@
2222

2323

2424
class InstanaRecorder(SpanRecorder):
25-
registered_spans = ("django", "memcache", "mysql", "rpc-client",
25+
registered_spans = ("django", "memcache", "mysql", "rabbitmq", "rpc-client",
2626
"rpc-server", "soap", "urllib3", "wsgi")
2727
http_spans = ("django", "wsgi", "urllib3", "soap")
2828

29-
exit_spans = ("memcache", "mysql", "rpc-client", "soap", "urllib3")
30-
entry_spans = ("django", "wsgi", "rpc-server")
29+
exit_spans = ("memcache", "mysql", "rabbitmq", "rpc-client", "soap", "urllib3")
30+
entry_spans = ("django", "wsgi", "rabbitmq", "rpc-server")
3131

3232
entry_kind = ["entry", "server", "consumer"]
3333
exit_kind = ["exit", "client", "producer"]
@@ -91,9 +91,13 @@ def record_span(self, span):
9191

9292
def build_registered_span(self, span):
9393
""" Takes a BasicSpan and converts it into a registered JsonSpan """
94-
data = Data(baggage=span.context.baggage,
95-
custom=CustomData(tags=span.tags,
96-
logs=self.collect_logs(span)))
94+
data = Data(baggage=span.context.baggage)
95+
96+
logs = self.collect_logs(span)
97+
if len(logs) > 0:
98+
if data.custom is None:
99+
data.custom = CustomData()
100+
data.custom.logs = logs
97101

98102
if span.operation_name in self.http_spans:
99103
data.http = HttpData(host=self.get_http_host_name(span),
@@ -102,6 +106,13 @@ def build_registered_span(self, span):
102106
status=span.tags.pop(ext.HTTP_STATUS_CODE, None),
103107
error=span.tags.pop('http.error', None))
104108

109+
if span.operation_name == "rabbitmq":
110+
data.rabbitmq = RabbitmqData(exchange=span.tags.pop('exchange', None),
111+
queue=span.tags.pop('queue', None),
112+
sort=span.tags.pop('sort', None),
113+
address=span.tags.pop('address', None),
114+
key=span.tags.pop('key', None))
115+
105116
if span.operation_name == "soap":
106117
data.soap = SoapData(action=span.tags.pop('soap.action', None))
107118

@@ -110,10 +121,15 @@ def build_registered_span(self, span):
110121
db=span.tags.pop(ext.DATABASE_INSTANCE, None),
111122
user=span.tags.pop(ext.DATABASE_USER, None),
112123
stmt=span.tags.pop(ext.DATABASE_STATEMENT, None))
113-
if len(data.custom.logs.keys()):
124+
if (data.custom is not None) and (data.custom.logs is not None) and len(data.custom.logs):
114125
tskey = list(data.custom.logs.keys())[0]
115126
data.mysql.error = data.custom.logs[tskey]['message']
116127

128+
if len(span.tags) > 0:
129+
if data.custom is None:
130+
data.custom = CustomData()
131+
data.custom.tags = span.tags
132+
117133
entityFrom = {'e': instana.singletons.agent.from_.pid,
118134
'h': instana.singletons.agent.from_.agentUuid}
119135

instana/tracer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,13 @@ def start_span(self,
9393
tags=tags,
9494
start_time=start_time)
9595

96-
if operation_name in self.recorder.entry_spans:
97-
# For entry spans, add only a backtrace fingerprint
98-
self.__add_stack(span, limit=2)
99-
10096
if operation_name in self.recorder.exit_spans:
10197
self.__add_stack(span)
10298

99+
elif operation_name in self.recorder.entry_spans:
100+
# For entry spans, add only a backtrace fingerprint
101+
self.__add_stack(span, limit=2)
102+
103103
return span
104104

105105
def inject(self, span_context, format, carrier):

runtests.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import sys
2+
import nose
3+
from distutils.version import LooseVersion
4+
5+
args = ['nosetests', '-v']
6+
7+
if (LooseVersion(sys.version) <= LooseVersion('3.5')):
8+
args.extend(['-e', 'asynqp'])
9+
10+
result = nose.run(argv=args)

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,15 @@ def check_setuptools():
4747
},
4848
extras_require={
4949
'test': [
50+
'asynqp>=0.4',
5051
'django>=1.11',
5152
'nose>=1.0',
5253
'flask>=0.12.2',
5354
'lxml>=3.4',
55+
'mock>=2.0.0',
5456
'MySQL-python>=1.2.5;python_version<="2.7"',
5557
'pyOpenSSL>=16.1.0;python_version<="2.7"',
58+
'pytest>=3.0.1',
5659
'requests>=2.17.1',
5760
'urllib3[secure]>=1.15',
5861
'spyne>=2.9',

0 commit comments

Comments
 (0)