Skip to content

Commit d36a681

Browse files
author
Andrey Slotin
authored
Instrument pika (#293)
* pika: instrument Channel.basic_publish * pika: instrument Channel.basic_get * pika: instrument Channel.basic_consume * pika: instrument BlockingChannel.consume * pika: use decorators to wrap instrumented methods
1 parent 60e2dca commit d36a681

File tree

6 files changed

+550
-3
lines changed

6 files changed

+550
-3
lines changed

instana/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ def boot_agent():
157157
from .instrumentation.tornado import client
158158
from .instrumentation.tornado import server
159159
from .instrumentation import logging
160+
from .instrumentation import pika
160161
from .instrumentation import pymysql
161162
from .instrumentation import psycopg2
162163
from .instrumentation import redis

instana/agent/host.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ def report_data_payload(self, payload):
253253
data=to_json(payload['profiles']),
254254
headers={"Content-Type": "application/json"},
255255
timeout=0.8)
256-
256+
257257
if response is not None and 200 <= response.status_code <= 204:
258258
self.last_seen = datetime.now()
259259

instana/instrumentation/pika.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# coding: utf-8
2+
3+
from __future__ import absolute_import
4+
5+
import wrapt
6+
import opentracing
7+
import types
8+
9+
from ..log import logger
10+
from ..singletons import tracer
11+
12+
try:
13+
import pika
14+
15+
def _extract_broker_tags(span, conn):
16+
span.set_tag("address", "%s:%d" % (conn.params.host, conn.params.port))
17+
18+
def _extract_publisher_tags(span, conn, exchange, routing_key):
19+
_extract_broker_tags(span, conn)
20+
21+
span.set_tag("sort", "publish")
22+
span.set_tag("key", routing_key)
23+
span.set_tag("exchange", exchange)
24+
25+
def _extract_consumer_tags(span, conn, queue):
26+
_extract_broker_tags(span, conn)
27+
28+
span.set_tag("address", "%s:%d" % (conn.params.host, conn.params.port))
29+
span.set_tag("sort", "consume")
30+
span.set_tag("queue", queue)
31+
32+
@wrapt.patch_function_wrapper('pika.channel', 'Channel.basic_publish')
33+
def basic_publish_with_instana(wrapped, instance, args, kwargs):
34+
def _bind_args(exchange, routing_key, body, properties=None, *args, **kwargs):
35+
return (exchange, routing_key, body, properties, args, kwargs)
36+
37+
parent_span = tracer.active_span
38+
39+
if parent_span is None:
40+
return wrapped(*args, **kwargs)
41+
42+
(exchange, routing_key, body, properties, args, kwargs) = (_bind_args(*args, **kwargs))
43+
44+
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
45+
try:
46+
_extract_publisher_tags(scope.span,
47+
conn=instance.connection,
48+
routing_key=routing_key,
49+
exchange=exchange)
50+
except:
51+
logger.debug("publish_with_instana: ", exc_info=True)
52+
53+
# context propagation
54+
properties = properties or pika.BasicProperties()
55+
properties.headers = properties.headers or {}
56+
57+
tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, properties.headers)
58+
args = (exchange, routing_key, body, properties) + args
59+
60+
try:
61+
rv = wrapped(*args, **kwargs)
62+
except Exception as e:
63+
scope.span.log_exception(e)
64+
raise
65+
else:
66+
return rv
67+
68+
def basic_get_with_instana(wrapped, instance, args, kwargs):
69+
def _bind_args(queue, callback, *args, **kwargs):
70+
return (queue, callback, args, kwargs)
71+
72+
(queue, callback, args, kwargs) = (_bind_args(*args, **kwargs))
73+
74+
def _cb_wrapper(channel, method, properties, body):
75+
parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers)
76+
77+
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
78+
try:
79+
_extract_consumer_tags(scope.span,
80+
conn=instance.connection,
81+
queue=queue)
82+
except:
83+
logger.debug("basic_get_with_instana: ", exc_info=True)
84+
85+
try:
86+
callback(channel, method, properties, body)
87+
except Exception as e:
88+
scope.span.log_exception(e)
89+
raise
90+
91+
args = (queue, _cb_wrapper) + args
92+
return wrapped(*args, **kwargs)
93+
94+
@wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.basic_consume')
95+
def basic_consume_with_instana(wrapped, instance, args, kwargs):
96+
def _bind_args(queue, on_consume_callback, *args, **kwargs):
97+
return (queue, on_consume_callback, args, kwargs)
98+
99+
(queue, on_consume_callback, args, kwargs) = (_bind_args(*args, **kwargs))
100+
101+
def _cb_wrapper(channel, method, properies, body):
102+
parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers)
103+
104+
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
105+
try:
106+
_extract_consumer_tags(scope.span,
107+
conn=instance.connection,
108+
queue=queue)
109+
except:
110+
logger.debug("basic_consume_with_instana: ", exc_info=True)
111+
112+
try:
113+
callback(channel, method, properties, body)
114+
except Exception as e:
115+
scope.span.log_exception(e)
116+
raise
117+
118+
args = (queue, _cb_wrapper) + args
119+
return wrapped(*args, **kwargs)
120+
121+
@wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.consume')
122+
def consume_with_instana(wrapped, instance, args, kwargs):
123+
def _bind_args(queue, *args, **kwargs):
124+
return (queue, args, kwargs)
125+
126+
(queue, args, kwargs) = (_bind_args(*args, **kwargs))
127+
128+
def _consume(gen):
129+
for yilded in gen:
130+
# Bypass the delivery created due to inactivity timeout
131+
if yilded is None or not any(yilded):
132+
yield yilded
133+
continue
134+
135+
(method_frame, properties, body) = yilded
136+
137+
parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers)
138+
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
139+
try:
140+
_extract_consumer_tags(scope.span,
141+
conn=instance.connection._impl,
142+
queue=queue)
143+
except:
144+
logger.debug("consume_with_instana: ", exc_info=True)
145+
146+
try:
147+
yield yilded
148+
except Exception as e:
149+
scope.span.log_exception(e)
150+
raise
151+
152+
args = (queue,) + args
153+
res = wrapped(*args, **kwargs)
154+
155+
if isinstance(res, types.GeneratorType):
156+
return _consume(res)
157+
else:
158+
return res
159+
160+
@wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.__init__')
161+
def _BlockingChannel___init__(wrapped, instance, args, kwargs):
162+
ret = wrapped(*args, **kwargs)
163+
impl = getattr(instance, '_impl', None)
164+
165+
if impl and hasattr(impl.basic_consume, '__wrapped__'):
166+
impl.basic_consume = impl.basic_consume.__wrapped__
167+
168+
return ret
169+
170+
wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_get', basic_get_with_instana)
171+
wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_consume', basic_get_with_instana)
172+
173+
174+
logger.debug("Instrumenting pika")
175+
except ImportError:
176+
pass

instana/span.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def __init__(self, span, source, service_name, **kwargs):
110110
self.sy = span.synthetic
111111

112112
self.__dict__.update(kwargs)
113-
113+
114114
def _validate_tags(self, tags):
115115
"""
116116
This method will loop through a set of tags to validate each key and value.

0 commit comments

Comments
 (0)