Skip to content

Commit b6edbb0

Browse files
authored
Asynqp: Fix redundant import (#105)
* Remove redundant import that caused a stacktrace in some situations * Add version check to instrumentation
1 parent 46cec28 commit b6edbb0

File tree

1 file changed

+90
-88
lines changed

1 file changed

+90
-88
lines changed

instana/instrumentation/asynqp.py

Lines changed: 90 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,97 +1,99 @@
11
from __future__ import absolute_import
22

3+
import sys
4+
35
import opentracing
46
import opentracing.ext.tags as ext
57
import wrapt
68

79
from ..log import logger
810
from ..singletons import tracer
911

10-
try:
11-
import asyncio
12-
import asynqp
13-
14-
@wrapt.patch_function_wrapper('asynqp.exchange','Exchange.publish')
15-
def publish_with_instana(wrapped, instance, args, kwargs):
16-
parent_span = tracer.active_span
17-
18-
# If we're not tracing, just return
19-
if parent_span is None:
20-
return wrapped(*args, **kwargs)
21-
22-
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
23-
host, port = instance.sender.protocol.transport._sock.getsockname()
24-
25-
msg = args[0]
26-
if msg.headers is None:
27-
msg.headers = {}
28-
tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, msg.headers)
29-
30-
try:
31-
scope.span.set_tag("exchange", instance.name)
32-
scope.span.set_tag("sort", "publish")
33-
scope.span.set_tag("address", host + ":" + str(port) )
34-
scope.span.set_tag("key", args[1])
35-
36-
rv = wrapped(*args, **kwargs)
37-
except Exception as e:
38-
scope.span.log_kv({'message': e})
39-
scope.span.set_tag("error", True)
40-
ec = scope.span.tags.get('ec', 0)
41-
scope.span.set_tag("ec", ec+1)
42-
raise
43-
else:
44-
return rv
45-
46-
@wrapt.patch_function_wrapper('asynqp.queue','Queue.get')
47-
def get_with_instana(wrapped, instance, args, kwargs):
48-
parent_span = tracer.active_span
49-
50-
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
51-
host, port = instance.sender.protocol.transport._sock.getsockname()
52-
53-
try:
54-
scope.span.set_tag("queue", instance.name)
55-
scope.span.set_tag("sort", "consume")
56-
scope.span.set_tag("address", host + ":" + str(port) )
57-
58-
rv = wrapped(*args, **kwargs)
59-
except Exception as e:
60-
scope.span.log_kv({'message': e})
61-
scope.span.set_tag("error", True)
62-
ec = scope.span.tags.get('ec', 0)
63-
scope.span.set_tag("ec", ec+1)
64-
raise
65-
else:
66-
return rv
67-
68-
@wrapt.patch_function_wrapper('asynqp.queue','Consumers.deliver')
69-
def deliver_with_instana(wrapped, instance, args, kwargs):
70-
71-
ctx = None
72-
msg = args[1]
73-
if 'X-Instana-T' in msg.headers and 'X-Instana-S' in msg.headers:
74-
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers))
75-
76-
with tracer.start_active_span("rabbitmq", child_of=ctx) as scope:
77-
host, port = args[1].sender.protocol.transport._sock.getsockname()
78-
79-
try:
80-
scope.span.set_tag("exchange", msg.exchange_name)
81-
scope.span.set_tag("sort", "consume")
82-
scope.span.set_tag("address", host + ":" + str(port) )
83-
scope.span.set_tag("key", msg.routing_key)
84-
85-
rv = wrapped(*args, **kwargs)
86-
except Exception as e:
87-
scope.span.log_kv({'message': e})
88-
scope.span.set_tag("error", True)
89-
ec = scope.span.tags.get('ec', 0)
90-
scope.span.set_tag("ec", ec+1)
91-
raise
92-
else:
93-
return rv
94-
95-
logger.debug("Instrumenting asynqp")
96-
except ImportError:
97-
pass
12+
if sys.version_info >= (3,4):
13+
try:
14+
import asynqp
15+
16+
@wrapt.patch_function_wrapper('asynqp.exchange','Exchange.publish')
17+
def publish_with_instana(wrapped, instance, args, kwargs):
18+
parent_span = tracer.active_span
19+
20+
# If we're not tracing, just return
21+
if parent_span is None:
22+
return wrapped(*args, **kwargs)
23+
24+
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
25+
host, port = instance.sender.protocol.transport._sock.getsockname()
26+
27+
msg = args[0]
28+
if msg.headers is None:
29+
msg.headers = {}
30+
tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, msg.headers)
31+
32+
try:
33+
scope.span.set_tag("exchange", instance.name)
34+
scope.span.set_tag("sort", "publish")
35+
scope.span.set_tag("address", host + ":" + str(port) )
36+
scope.span.set_tag("key", args[1])
37+
38+
rv = wrapped(*args, **kwargs)
39+
except Exception as e:
40+
scope.span.log_kv({'message': e})
41+
scope.span.set_tag("error", True)
42+
ec = scope.span.tags.get('ec', 0)
43+
scope.span.set_tag("ec", ec+1)
44+
raise
45+
else:
46+
return rv
47+
48+
@wrapt.patch_function_wrapper('asynqp.queue','Queue.get')
49+
def get_with_instana(wrapped, instance, args, kwargs):
50+
parent_span = tracer.active_span
51+
52+
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
53+
host, port = instance.sender.protocol.transport._sock.getsockname()
54+
55+
try:
56+
scope.span.set_tag("queue", instance.name)
57+
scope.span.set_tag("sort", "consume")
58+
scope.span.set_tag("address", host + ":" + str(port) )
59+
60+
rv = wrapped(*args, **kwargs)
61+
except Exception as e:
62+
scope.span.log_kv({'message': e})
63+
scope.span.set_tag("error", True)
64+
ec = scope.span.tags.get('ec', 0)
65+
scope.span.set_tag("ec", ec+1)
66+
raise
67+
else:
68+
return rv
69+
70+
@wrapt.patch_function_wrapper('asynqp.queue','Consumers.deliver')
71+
def deliver_with_instana(wrapped, instance, args, kwargs):
72+
73+
ctx = None
74+
msg = args[1]
75+
if 'X-Instana-T' in msg.headers and 'X-Instana-S' in msg.headers:
76+
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers))
77+
78+
with tracer.start_active_span("rabbitmq", child_of=ctx) as scope:
79+
host, port = args[1].sender.protocol.transport._sock.getsockname()
80+
81+
try:
82+
scope.span.set_tag("exchange", msg.exchange_name)
83+
scope.span.set_tag("sort", "consume")
84+
scope.span.set_tag("address", host + ":" + str(port) )
85+
scope.span.set_tag("key", msg.routing_key)
86+
87+
rv = wrapped(*args, **kwargs)
88+
except Exception as e:
89+
scope.span.log_kv({'message': e})
90+
scope.span.set_tag("error", True)
91+
ec = scope.span.tags.get('ec', 0)
92+
scope.span.set_tag("ec", ec+1)
93+
raise
94+
else:
95+
return rv
96+
97+
logger.debug("Instrumenting asynqp")
98+
except ImportError:
99+
pass

0 commit comments

Comments
 (0)