Skip to content

Commit 0ea3774

Browse files
author
Emanuele Palazzetti
authored
Merge pull request #530 from palazzem/celery-signals
[celery] use Celery signals instead of patch
2 parents fe9d7f2 + 2216737 commit 0ea3774

22 files changed

+870
-609
lines changed

ddtrace/bootstrap/sitecustomize.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,6 @@ def update_patched_modules():
7070
if opts:
7171
tracer.configure(**opts)
7272

73-
if not hasattr(sys, 'argv'):
74-
sys.argv = ['']
75-
7673
if patch:
7774
update_patched_modules()
7875
from ddtrace import patch_all; patch_all(**EXTRA_PATCHED_MODULES) # noqa

ddtrace/contrib/celery/__init__.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"""
2+
The Celery integration will trace all tasks that are executed in the
3+
background. To trace your Celery application, call the patch method::
4+
5+
import celery
6+
from ddtrace import patch
7+
8+
patch(celery=True)
9+
app = celery.Celery()
10+
11+
@app.task
12+
def my_task():
13+
pass
14+
15+
16+
class MyTask(app.Task):
17+
def run(self):
18+
pass
19+
20+
21+
If you don't need to patch all Celery tasks, you can patch individual
22+
applications or tasks using a fine grain patching method::
23+
24+
import celery
25+
from ddtrace.contrib.celery import patch_app, patch_task
26+
27+
# patch only this application
28+
app = celery.Celery()
29+
app = patch_app(app)
30+
31+
# or if you didn't patch the whole application, just patch
32+
# a single function or class based Task
33+
@app.task
34+
def fn_task():
35+
pass
36+
37+
38+
class BaseClassTask(celery.Task):
39+
def run(self):
40+
pass
41+
42+
43+
BaseClassTask = patch_task(BaseClassTask)
44+
fn_task = patch_task(fn_task)
45+
"""
46+
from ...utils.importlib import require_modules
47+
48+
49+
required_modules = ['celery']
50+
51+
with require_modules(required_modules) as missing_modules:
52+
if not missing_modules:
53+
from .app import patch_app, unpatch_app
54+
from .patch import patch, unpatch
55+
from .task import patch_task, unpatch_task
56+
57+
__all__ = [
58+
'patch',
59+
'patch_app',
60+
'patch_task',
61+
'unpatch',
62+
'unpatch_app',
63+
'unpatch_task',
64+
]

ddtrace/contrib/celery/app.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from celery import signals
2+
3+
from ddtrace import Pin
4+
from ddtrace.pin import _DD_PIN_NAME
5+
from ddtrace.ext import AppTypes
6+
7+
from .constants import APP, WORKER_SERVICE
8+
from .signals import (
9+
trace_prerun,
10+
trace_postrun,
11+
trace_before_publish,
12+
trace_after_publish,
13+
trace_failure,
14+
)
15+
16+
17+
def patch_app(app, pin=None):
18+
"""Attach the Pin class to the application and connect
19+
our handlers to Celery signals.
20+
"""
21+
if getattr(app, '__datadog_patch', False):
22+
return
23+
setattr(app, '__datadog_patch', True)
24+
25+
# attach the PIN object
26+
pin = pin or Pin(service=WORKER_SERVICE, app=APP, app_type=AppTypes.worker)
27+
pin.onto(app)
28+
# connect to the Signal framework
29+
signals.task_prerun.connect(trace_prerun)
30+
signals.task_postrun.connect(trace_postrun)
31+
signals.before_task_publish.connect(trace_before_publish)
32+
signals.after_task_publish.connect(trace_after_publish)
33+
signals.task_failure.connect(trace_failure)
34+
return app
35+
36+
37+
def unpatch_app(app):
38+
"""Remove the Pin instance from the application and disconnect
39+
our handlers from Celery signal framework.
40+
"""
41+
if not getattr(app, '__datadog_patch', False):
42+
return
43+
setattr(app, '__datadog_patch', False)
44+
45+
pin = Pin.get_from(app)
46+
if pin is not None:
47+
delattr(app, _DD_PIN_NAME)
48+
49+
signals.task_prerun.disconnect(trace_prerun)
50+
signals.task_postrun.disconnect(trace_postrun)
51+
signals.before_task_publish.disconnect(trace_before_publish)
52+
signals.after_task_publish.disconnect(trace_after_publish)
53+
signals.task_failure.disconnect(trace_failure)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from os import getenv
2+
3+
# Celery Context key
4+
CTX_KEY = '__dd_task_span'
5+
6+
# Span names
7+
PRODUCER_ROOT_SPAN = 'celery.apply'
8+
WORKER_ROOT_SPAN = 'celery.run'
9+
10+
# Task operations
11+
TASK_TAG_KEY = 'celery.action'
12+
TASK_APPLY = 'apply'
13+
TASK_APPLY_ASYNC = 'apply_async'
14+
TASK_RUN = 'run'
15+
16+
# Service info
17+
APP = 'celery'
18+
PRODUCER_SERVICE = getenv('DATADOG_SERVICE_NAME') or 'celery-producer'
19+
WORKER_SERVICE = getenv('DATADOG_SERVICE_NAME') or 'celery-worker'

ddtrace/contrib/celery/patch.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import celery
2+
3+
from .app import patch_app, unpatch_app
4+
5+
6+
def patch():
7+
"""Instrument Celery base application and the `TaskRegistry` so
8+
that any new registered task is automatically instrumented. In the
9+
case of Django-Celery integration, also the `@shared_task` decorator
10+
must be instrumented because Django doesn't use the Celery registry.
11+
"""
12+
patch_app(celery.Celery)
13+
14+
15+
def unpatch():
16+
"""Disconnect all signals and remove Tracing capabilities"""
17+
unpatch_app(celery.Celery)

ddtrace/contrib/celery/signals.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import logging
2+
3+
from ddtrace import Pin
4+
5+
from celery import registry
6+
7+
from . import constants as c
8+
from .utils import (
9+
tags_from_context,
10+
retrieve_task_id,
11+
attach_span,
12+
detach_span,
13+
retrieve_span,
14+
)
15+
16+
17+
log = logging.getLogger(__name__)
18+
19+
20+
def trace_prerun(*args, **kwargs):
21+
# safe-guard to avoid crashes in case the signals API
22+
# changes in Celery
23+
task = kwargs.get('sender')
24+
task_id = kwargs.get('task_id')
25+
if task is None or task_id is None:
26+
log.debug('unable to extract the Task and the task_id. This version of Celery may not be supported.')
27+
return
28+
29+
# retrieve the task Pin or fallback to the global one
30+
pin = Pin.get_from(task) or Pin.get_from(task.app)
31+
if pin is None:
32+
return
33+
34+
# propagate the `Span` in the current task Context
35+
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=c.WORKER_SERVICE, resource=task.name)
36+
attach_span(task, task_id, span)
37+
38+
39+
def trace_postrun(*args, **kwargs):
40+
# safe-guard to avoid crashes in case the signals API
41+
# changes in Celery
42+
task = kwargs.get('sender')
43+
task_id = kwargs.get('task_id')
44+
if task is None or task_id is None:
45+
log.debug('unable to extract the Task and the task_id. This version of Celery may not be supported.')
46+
return
47+
48+
# retrieve and finish the Span
49+
span = retrieve_span(task, task_id)
50+
if span is None:
51+
return
52+
else:
53+
# request context tags
54+
span.set_tag(c.TASK_TAG_KEY, c.TASK_RUN)
55+
span.set_tags(tags_from_context(kwargs))
56+
span.set_tags(tags_from_context(task.request))
57+
span.finish()
58+
detach_span(task, task_id)
59+
60+
61+
def trace_before_publish(*args, **kwargs):
62+
# `before_task_publish` signal doesn't propagate the task instance so
63+
# we need to retrieve it from the Celery Registry to access the `Pin`. The
64+
# `Task` instance **does not** include any information about the current
65+
# execution, so it **must not** be used to retrieve `request` data.
66+
task_name = kwargs.get('sender')
67+
task = registry.tasks.get(task_name)
68+
task_id = retrieve_task_id(kwargs)
69+
# safe-guard to avoid crashes in case the signals API
70+
# changes in Celery
71+
if task is None or task_id is None:
72+
log.debug('unable to extract the Task and the task_id. This version of Celery may not be supported.')
73+
return
74+
75+
# propagate the `Span` in the current task Context
76+
pin = Pin.get_from(task) or Pin.get_from(task.app)
77+
if pin is None:
78+
return
79+
80+
# apply some tags here because most of the data is not available
81+
# in the task_after_publish signal
82+
span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=c.PRODUCER_SERVICE, resource=task_name)
83+
span.set_tag(c.TASK_TAG_KEY, c.TASK_APPLY_ASYNC)
84+
span.set_tag('celery.id', task_id)
85+
span.set_tags(tags_from_context(kwargs))
86+
# Note: adding tags from `traceback` or `state` calls will make an
87+
# API call to the backend for the properties so we should rely
88+
# only on the given `Context`
89+
attach_span(task, task_id, span)
90+
91+
92+
def trace_after_publish(*args, **kwargs):
93+
task_name = kwargs.get('sender')
94+
task = registry.tasks.get(task_name)
95+
task_id = retrieve_task_id(kwargs)
96+
# safe-guard to avoid crashes in case the signals API
97+
# changes in Celery
98+
if task is None or task_id is None:
99+
log.debug('unable to extract the Task and the task_id. This version of Celery may not be supported.')
100+
return
101+
102+
# retrieve and finish the Span
103+
span = retrieve_span(task, task_id)
104+
if span is None:
105+
return
106+
else:
107+
span.finish()
108+
detach_span(task, task_id)
109+
110+
111+
def trace_failure(*args, **kwargs):
112+
# safe-guard to avoid crashes in case the signals API
113+
# changes in Celery
114+
task = kwargs.get('sender')
115+
task_id = kwargs.get('task_id')
116+
if task is None or task_id is None:
117+
log.debug('unable to extract the Task and the task_id. This version of Celery may not be supported.')
118+
return
119+
120+
# retrieve and finish the Span
121+
span = retrieve_span(task, task_id)
122+
if span is None:
123+
return
124+
else:
125+
# add Exception tags; post signals are still called
126+
# so we don't need to attach other tags here
127+
ex = kwargs.get('einfo')
128+
if ex is None:
129+
return
130+
span.set_exc_info(ex.type, ex.exception, ex.tb)

ddtrace/contrib/celery/task.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from .app import patch_app
2+
3+
from ...utils.deprecation import deprecation
4+
5+
6+
def patch_task(task, pin=None):
7+
"""Deprecated API. The new API uses signals that can be activated via
8+
patch(celery=True) or through `ddtrace-run` script. Using this API
9+
enables instrumentation on all tasks.
10+
"""
11+
deprecation(
12+
name='ddtrace.contrib.celery.patch_task',
13+
message='Use `patch(celery=True)` or `ddtrace-run` script instead',
14+
version='1.0.0',
15+
)
16+
17+
# Enable instrumentation everywhere
18+
patch_app(task.app)
19+
return task
20+
21+
def unpatch_task(task):
22+
"""Deprecated API. The new API uses signals that can be deactivated
23+
via unpatch() API. This API is now a no-op implementation so it doesn't
24+
affect instrumented tasks.
25+
"""
26+
deprecation(
27+
name='ddtrace.contrib.celery.patch_task',
28+
message='Use `unpatch()` instead',
29+
version='1.0.0',
30+
)
31+
return task

0 commit comments

Comments
 (0)