Skip to content

Commit 09d6fc3

Browse files
authored
Celery: Use defaults when necessary (#243)
* Celery: Use defaults when necessary * Update tests to follow changes * Better log message extrapolation * Filter out ping tasks
1 parent 3362c5b commit 09d6fc3

File tree

3 files changed

+66
-29
lines changed

3 files changed

+66
-29
lines changed

instana/instrumentation/celery/hooks.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,44 @@ def add_broker_tags(span, broker_url):
1919
try:
2020
url = parse.urlparse(broker_url)
2121
span.set_tag("scheme", url.scheme)
22-
span.set_tag("host", url.hostname)
23-
span.set_tag("port", url.port)
22+
23+
if url.hostname is None:
24+
span.set_tag("host", 'localhost')
25+
else:
26+
span.set_tag("host", url.hostname)
27+
28+
if url.port is None:
29+
# Set default port if not specified
30+
if url.scheme == 'redis':
31+
span.set_tag("port", "6379")
32+
elif 'amqp' in url.scheme:
33+
span.set_tag("port", "5672")
34+
elif 'sqs' in url.scheme:
35+
span.set_tag("port", "443")
36+
else:
37+
span.set_tag("port", str(url.port))
2438
except:
2539
logger.debug("Error parsing broker URL: %s" % broker_url, exc_info=True)
2640

2741
@signals.task_prerun.connect
2842
def task_prerun(*args, **kwargs):
2943
try:
44+
ctx = None
3045
task = kwargs.get('sender', None)
3146
task_id = kwargs.get('task_id', None)
3247
task = registry.tasks.get(task.name)
3348

3449
headers = task.request.get('headers', {})
35-
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, headers)
50+
if headers is not None:
51+
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, headers)
3652

37-
if ctx is not None:
38-
scope = tracer.start_active_span("celery-worker", child_of=ctx)
39-
scope.span.set_tag("task", task.name)
40-
scope.span.set_tag("task_id", task_id)
41-
add_broker_tags(scope.span, task.app.conf['broker_url'])
53+
scope = tracer.start_active_span("celery-worker", child_of=ctx)
54+
scope.span.set_tag("task", task.name)
55+
scope.span.set_tag("task_id", task_id)
56+
add_broker_tags(scope.span, task.app.conf['broker_url'])
4257

43-
# Store the scope on the task to eventually close it out on the "after" signal
44-
task_catalog_push(task, task_id, scope, True)
58+
# Store the scope on the task to eventually close it out on the "after" signal
59+
task_catalog_push(task, task_id, scope, True)
4560
except:
4661
logger.debug("task_prerun: ", exc_info=True)
4762

instana/instrumentation/logging.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from __future__ import absolute_import
22

3+
import sys
34
import wrapt
45
import logging
5-
import sys
6+
import collections
67

78
from ..log import logger
89
from ..singletons import tracer
@@ -18,10 +19,14 @@ def log_with_instana(wrapped, instance, argv, kwargs):
1819

1920
# Only needed if we're tracing and serious log
2021
if parent_span and argv[0] >= logging.WARN:
22+
23+
msg = str(argv[1])
24+
args = argv[2]
25+
if args and len(args) == 1 and isinstance(args[0], collections.Mapping) and args[0]:
26+
args = args[0]
27+
2128
# get the formatted log message
22-
# clients such as suds-jurko log things such as: Fault(Server: 'Server side fault example.')
23-
# So make sure we're working with a string
24-
msg = str(argv[1]) % argv[2]
29+
msg = msg % args
2530

2631
# get additional information if an exception is being handled
2732
parameters = None
@@ -37,8 +42,8 @@ def log_with_instana(wrapped, instance, argv, kwargs):
3742
# extra tags for an error
3843
if argv[0] >= logging.ERROR:
3944
scope.span.mark_as_errored()
40-
except Exception as e:
41-
logger.debug('Exception: %s', e, exc_info=True)
45+
except Exception:
46+
logger.debug('log_with_instana:', exc_info=True)
4247
finally:
4348
return wrapped(*argv, **kwargs)
4449

tests/frameworks/test_celery.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@ def will_raise_error():
1616
raise Exception('This is a simulated error')
1717

1818

19+
def filter_out_ping_tasks(spans):
20+
filtered_spans = []
21+
for span in spans:
22+
is_ping_task = (span.n == 'celery-worker' and span.data['celery']['task'] == 'celery.ping')
23+
if not is_ping_task:
24+
filtered_spans.append(span)
25+
return filtered_spans
26+
27+
1928
def setup_method():
2029
""" Clear all spans before a test run """
2130
tracer.recorder.clear_spans()
@@ -29,7 +38,7 @@ def test_apply_async(celery_app, celery_worker):
2938
# Wait for jobs to finish
3039
time.sleep(0.5)
3140

32-
spans = tracer.recorder.queued_spans()
41+
spans = filter_out_ping_tasks(tracer.recorder.queued_spans())
3342
assert len(spans) == 3
3443

3544
filter = lambda span: span.n == "sdk"
@@ -51,15 +60,15 @@ def test_apply_async(celery_app, celery_worker):
5160
assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"])
5261
assert("redis" == client_span.data["celery"]["scheme"])
5362
assert("localhost" == client_span.data["celery"]["host"])
54-
assert(6379 == client_span.data["celery"]["port"])
63+
assert("6379" == client_span.data["celery"]["port"])
5564
assert(client_span.data["celery"]["task_id"])
5665
assert(client_span.data["celery"]["error"] == None)
5766
assert(client_span.ec == None)
5867

5968
assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"])
6069
assert("redis" == worker_span.data["celery"]["scheme"])
6170
assert("localhost" == worker_span.data["celery"]["host"])
62-
assert(6379 == worker_span.data["celery"]["port"])
71+
assert("6379" == worker_span.data["celery"]["port"])
6372
assert(worker_span.data["celery"]["task_id"])
6473
assert(worker_span.data["celery"]["error"] == None)
6574
assert(worker_span.data["celery"]["retry-reason"] == None)
@@ -74,7 +83,7 @@ def test_delay(celery_app, celery_worker):
7483
# Wait for jobs to finish
7584
time.sleep(0.5)
7685

77-
spans = tracer.recorder.queued_spans()
86+
spans = filter_out_ping_tasks(tracer.recorder.queued_spans())
7887
assert len(spans) == 3
7988

8089
filter = lambda span: span.n == "sdk"
@@ -96,15 +105,15 @@ def test_delay(celery_app, celery_worker):
96105
assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"])
97106
assert("redis" == client_span.data["celery"]["scheme"])
98107
assert("localhost" == client_span.data["celery"]["host"])
99-
assert(6379 == client_span.data["celery"]["port"])
108+
assert("6379" == client_span.data["celery"]["port"])
100109
assert(client_span.data["celery"]["task_id"])
101110
assert(client_span.data["celery"]["error"] == None)
102111
assert(client_span.ec == None)
103112

104113
assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"])
105114
assert("redis" == worker_span.data["celery"]["scheme"])
106115
assert("localhost" == worker_span.data["celery"]["host"])
107-
assert(6379 == worker_span.data["celery"]["port"])
116+
assert("6379" == worker_span.data["celery"]["port"])
108117
assert(worker_span.data["celery"]["task_id"])
109118
assert(worker_span.data["celery"]["error"] == None)
110119
assert(worker_span.data["celery"]["retry-reason"] == None)
@@ -119,7 +128,7 @@ def test_send_task(celery_app, celery_worker):
119128
# Wait for jobs to finish
120129
time.sleep(0.5)
121130

122-
spans = tracer.recorder.queued_spans()
131+
spans = filter_out_ping_tasks(tracer.recorder.queued_spans())
123132
assert len(spans) == 3
124133

125134
filter = lambda span: span.n == "sdk"
@@ -141,15 +150,15 @@ def test_send_task(celery_app, celery_worker):
141150
assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"])
142151
assert("redis" == client_span.data["celery"]["scheme"])
143152
assert("localhost" == client_span.data["celery"]["host"])
144-
assert(6379 == client_span.data["celery"]["port"])
153+
assert("6379" == client_span.data["celery"]["port"])
145154
assert(client_span.data["celery"]["task_id"])
146155
assert(client_span.data["celery"]["error"] == None)
147156
assert(client_span.ec == None)
148157

149158
assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"])
150159
assert("redis" == worker_span.data["celery"]["scheme"])
151160
assert("localhost" == worker_span.data["celery"]["host"])
152-
assert(6379 == worker_span.data["celery"]["port"])
161+
assert("6379" == worker_span.data["celery"]["port"])
153162
assert(worker_span.data["celery"]["task_id"])
154163
assert(worker_span.data["celery"]["error"] == None)
155164
assert(worker_span.data["celery"]["retry-reason"] == None)
@@ -164,8 +173,8 @@ def test_error_reporting(celery_app, celery_worker):
164173
# Wait for jobs to finish
165174
time.sleep(0.5)
166175

167-
spans = tracer.recorder.queued_spans()
168-
assert len(spans) == 3
176+
spans = filter_out_ping_tasks(tracer.recorder.queued_spans())
177+
assert len(spans) == 4
169178

170179
filter = lambda span: span.n == "sdk"
171180
test_span = get_first_span_by_filter(spans, filter)
@@ -175,26 +184,34 @@ def test_error_reporting(celery_app, celery_worker):
175184
client_span = get_first_span_by_filter(spans, filter)
176185
assert(client_span)
177186

187+
filter = lambda span: span.n == "log"
188+
log_span = get_first_span_by_filter(spans, filter)
189+
assert(log_span)
190+
178191
filter = lambda span: span.n == "celery-worker"
179192
worker_span = get_first_span_by_filter(spans, filter)
180193
assert(worker_span)
181194

182195
assert(client_span.t == test_span.t)
183196
assert(client_span.t == worker_span.t)
197+
assert(client_span.t == log_span.t)
198+
184199
assert(client_span.p == test_span.s)
200+
assert(worker_span.p == client_span.s)
201+
assert(log_span.p == worker_span.s)
185202

186203
assert("tests.frameworks.test_celery.will_raise_error" == client_span.data["celery"]["task"])
187204
assert("redis" == client_span.data["celery"]["scheme"])
188205
assert("localhost" == client_span.data["celery"]["host"])
189-
assert(6379 == client_span.data["celery"]["port"])
206+
assert("6379" == client_span.data["celery"]["port"])
190207
assert(client_span.data["celery"]["task_id"])
191208
assert(client_span.data["celery"]["error"] == None)
192209
assert(client_span.ec == None)
193210

194211
assert("tests.frameworks.test_celery.will_raise_error" == worker_span.data["celery"]["task"])
195212
assert("redis" == worker_span.data["celery"]["scheme"])
196213
assert("localhost" == worker_span.data["celery"]["host"])
197-
assert(6379 == worker_span.data["celery"]["port"])
214+
assert("6379" == worker_span.data["celery"]["port"])
198215
assert(worker_span.data["celery"]["task_id"])
199216
assert(worker_span.data["celery"]["error"] == 'This is a simulated error')
200217
assert(worker_span.data["celery"]["retry-reason"] == None)

0 commit comments

Comments
 (0)