Skip to content

Commit e75bc70

Browse files
author
Emanuele Palazzetti
committed
[celery] add patch_task and unpatch_task as a backward compatibility
This reverts commit 0440d39.
1 parent 58ab619 commit e75bc70

File tree

5 files changed

+111
-22
lines changed

5 files changed

+111
-22
lines changed

ddtrace/contrib/celery/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def run(self):
5252
if not missing_modules:
5353
from .app import patch_app, unpatch_app
5454
from .patch import patch, unpatch
55+
from .task import patch_task, unpatch_task
5556

5657
__all__ = [
5758
'patch',

ddtrace/contrib/celery/app.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,44 @@
1+
from celery import signals
2+
13
from ddtrace import Pin
24
from ddtrace.pin import _DD_PIN_NAME
35
from ddtrace.ext import AppTypes
46

57
from .util import APP, WORKER_SERVICE
8+
from .signals import (
9+
trace_prerun,
10+
trace_postrun,
11+
trace_before_publish,
12+
trace_after_publish,
13+
trace_failure,
14+
)
615

716

817
def patch_app(app, pin=None):
9-
"""Attach the Pin class to the application"""
18+
"""Attach the Pin class to the application and connect
19+
our handlers to Celery signals.
20+
"""
1021
pin = pin or Pin(service=WORKER_SERVICE, app=APP, app_type=AppTypes.worker)
1122
pin.onto(app)
23+
24+
signals.task_prerun.connect(trace_prerun)
25+
signals.task_postrun.connect(trace_postrun)
26+
signals.before_task_publish.connect(trace_before_publish)
27+
signals.after_task_publish.connect(trace_after_publish)
28+
signals.task_failure.connect(trace_failure)
1229
return app
1330

1431

1532
def unpatch_app(app):
16-
""" unpatch_app will remove tracing from a celery app """
33+
"""Remove the Pin instance from the application and disconnect
34+
our handlers from Celery signal framework.
35+
"""
1736
pin = Pin.get_from(app)
1837
if pin is not None:
1938
delattr(app, _DD_PIN_NAME)
39+
40+
signals.task_prerun.disconnect(trace_prerun)
41+
signals.task_postrun.disconnect(trace_postrun)
42+
signals.before_task_publish.disconnect(trace_before_publish)
43+
signals.after_task_publish.disconnect(trace_after_publish)
44+
signals.task_failure.disconnect(trace_failure)

ddtrace/contrib/celery/patch.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,7 @@
11
import celery
22

3-
from celery import signals
4-
53
from .app import patch_app, unpatch_app
64

7-
from .signals import (
8-
trace_prerun,
9-
trace_postrun,
10-
trace_before_publish,
11-
trace_after_publish,
12-
trace_failure,
13-
)
14-
155

166
def patch():
177
"""Instrument Celery base application and the `TaskRegistry` so
@@ -20,18 +10,8 @@ def patch():
2010
must be instrumented because Django doesn't use the Celery registry.
2111
"""
2212
patch_app(celery.Celery)
23-
signals.task_prerun.connect(trace_prerun)
24-
signals.task_postrun.connect(trace_postrun)
25-
signals.before_task_publish.connect(trace_before_publish)
26-
signals.after_task_publish.connect(trace_after_publish)
27-
signals.task_failure.connect(trace_failure)
2813

2914

3015
def unpatch():
3116
"""Disconnect all signals and remove Tracing capabilities"""
3217
unpatch_app(celery.Celery)
33-
signals.task_prerun.disconnect(trace_prerun)
34-
signals.task_postrun.disconnect(trace_postrun)
35-
signals.before_task_publish.disconnect(trace_before_publish)
36-
signals.after_task_publish.disconnect(trace_after_publish)
37-
signals.task_failure.disconnect(trace_failure)

ddtrace/contrib/celery/task.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from .app import patch_app
2+
3+
from ...utils.deprecation import deprecation
4+
5+
6+
def patch_task(task, pin=None):
7+
"""Deprecated API. The new API uses signals that can be activated via
8+
patch(celery=True) or through `ddtrace-run` script. Using this API
9+
enables instrumentation on all tasks.
10+
"""
11+
deprecation(
12+
name='ddtrace.contrib.celery.patch_task',
13+
message='Use `patch(celery=True)` or `ddtrace-run` script instead',
14+
version='1.0.0',
15+
)
16+
17+
# Enable instrumentation everywhere
18+
patch_app(task.app)
19+
return task
20+
21+
def unpatch_task(task):
22+
"""Deprecated API. The new API uses signals that can be deactivated
23+
via unpatch() API. This API is now a no-op implementation so it doesn't
24+
affect instrumented tasks.
25+
"""
26+
deprecation(
27+
name='ddtrace.contrib.celery.patch_task',
28+
message='Use `unpatch()` instead',
29+
version='1.0.0',
30+
)
31+
return task
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import warnings
2+
import unittest
3+
4+
from celery import Celery
5+
6+
from nose.tools import ok_
7+
8+
from ddtrace.contrib.celery import patch_task, unpatch_task, unpatch
9+
10+
11+
class CeleryDeprecatedTaskPatch(unittest.TestCase):
12+
"""Ensures that the previous Task instrumentation is available
13+
as Deprecated API.
14+
"""
15+
def setUp(self):
16+
# create a not instrumented Celery App
17+
self.app = Celery('celery.test_app')
18+
19+
def tearDown(self):
20+
# be sure the system is always unpatched
21+
unpatch()
22+
self.app = None
23+
24+
def test_patch_signals_connect(self):
25+
# calling `patch_task` enables instrumentation globally
26+
# while raising a Deprecation warning
27+
with warnings.catch_warnings(record=True) as w:
28+
warnings.simplefilter('always')
29+
30+
@patch_task
31+
@self.app.task
32+
def fn_task():
33+
return 42
34+
35+
ok_(len(w) == 1)
36+
ok_(issubclass(w[-1].category, DeprecationWarning))
37+
ok_('patch(celery=True)' in str(w[-1].message))
38+
39+
def test_unpatch_signals_diconnect(self):
40+
# calling `unpatch_task` is a no-op that raises a Deprecation
41+
# warning
42+
with warnings.catch_warnings(record=True) as w:
43+
warnings.simplefilter('always')
44+
45+
@unpatch_task
46+
@self.app.task
47+
def fn_task():
48+
return 42
49+
50+
ok_(len(w) == 1)
51+
ok_(issubclass(w[-1].category, DeprecationWarning))
52+
ok_('unpatch()' in str(w[-1].message))

0 commit comments

Comments
 (0)