Skip to content

Commit 3ec80d7

Browse files
Emanuele PalazzettiKyle-Verhoog
authored andcommitted
[celery] add Config object to change Worker/Producer service names (#540)
* [celery] add Config object to change Worker/Producer service names * [celery] update documentation after the Config API change
1 parent 5d3a565 commit 3ec80d7

File tree

7 files changed

+76
-18
lines changed

7 files changed

+76
-18
lines changed

ddtrace/contrib/celery/__init__.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
The Celery integration will trace all tasks that are executed in the
33
background. Functions and class based tasks are traced only if the Celery API
44
is used, so calling the function directly or via the ``run()`` method will not
5-
generate traces. On the other hand, calling ``apply()`` and ``apply_async()``
5+
generate traces. On the other hand, calling ``apply()``, ``apply_async()`` and ``delay()``
66
will produce tracing data. To trace your Celery application, call the patch method::
77
88
import celery
@@ -20,8 +20,7 @@ def run(self):
2020
pass
2121
2222
23-
To change Celery service name, you can update the attached ``Pin``
24-
instance::
23+
To change Celery service name, you can use the ``Config`` API as follows::
2524
2625
from ddtrace import Pin
2726
@@ -31,12 +30,9 @@ def run(self):
3130
def compute_stats():
3231
pass
3332
34-
# globally
35-
Pin.override(app, service='background-jobs')
36-
37-
# by task
38-
Pin.override(compute_stats, service='data-processing')
39-
33+
# change service names for producers and workers
34+
config.celery['producer_service_name'] = 'task-queue'
35+
config.celery['worker_service_name'] = 'worker-notify'
4036
4137
By default, reported service names are:
4238
* ``celery-producer`` when tasks are enqueued for processing

ddtrace/contrib/celery/app.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from celery import signals
22

3-
from ddtrace import Pin
3+
from ddtrace import Pin, config
44
from ddtrace.pin import _DD_PIN_NAME
55
from ddtrace.ext import AppTypes
66

7-
from .constants import APP, WORKER_SERVICE
7+
from .constants import APP
88
from .signals import (
99
trace_prerun,
1010
trace_postrun,
@@ -23,7 +23,12 @@ def patch_app(app, pin=None):
2323
setattr(app, '__datadog_patch', True)
2424

2525
# attach the PIN object
26-
pin = pin or Pin(service=WORKER_SERVICE, app=APP, app_type=AppTypes.worker)
26+
pin = pin or Pin(
27+
service=config.celery['worker_service_name'],
28+
app=APP,
29+
app_type=AppTypes.worker,
30+
_config=config.celery,
31+
)
2732
pin.onto(app)
2833
# connect to the Signal framework
2934
signals.task_prerun.connect(trace_prerun)

ddtrace/contrib/celery/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,7 @@
1515

1616
# Service info
1717
APP = 'celery'
18+
# `getenv()` call must be kept for backward compatibility; we may remove it
19+
# later when we do a full migration to the `Config` class
1820
PRODUCER_SERVICE = getenv('DATADOG_SERVICE_NAME') or 'celery-producer'
1921
WORKER_SERVICE = getenv('DATADOG_SERVICE_NAME') or 'celery-worker'

ddtrace/contrib/celery/patch.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
11
import celery
22

3+
from ddtrace import config
4+
35
from .app import patch_app, unpatch_app
6+
from .constants import PRODUCER_SERVICE, WORKER_SERVICE
7+
from ...utils.formats import get_env
8+
9+
10+
# Celery default settings
11+
config._add('celery', {
12+
'producer_service_name': get_env('celery', 'producer_service_name', PRODUCER_SERVICE),
13+
'worker_service_name': get_env('celery', 'worker_service_name', WORKER_SERVICE),
14+
})
415

516

617
def patch():

ddtrace/contrib/celery/signals.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22

3-
from ddtrace import Pin
3+
from ddtrace import Pin, config
44

55
from celery import registry
66

@@ -32,7 +32,8 @@ def trace_prerun(*args, **kwargs):
3232
return
3333

3434
# propagate the `Span` in the current task Context
35-
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=c.WORKER_SERVICE, resource=task.name)
35+
service = config.celery['worker_service_name']
36+
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name)
3637
attach_span(task, task_id, span)
3738

3839

@@ -79,7 +80,8 @@ def trace_before_publish(*args, **kwargs):
7980

8081
# apply some tags here because most of the data is not available
8182
# in the task_after_publish signal
82-
span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=c.PRODUCER_SERVICE, resource=task_name)
83+
service = config.celery['producer_service_name']
84+
span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=service, resource=task_name)
8385
span.set_tag(c.TASK_TAG_KEY, c.TASK_APPLY_ASYNC)
8486
span.set_tag('celery.id', task_id)
8587
span.set_tags(tags_from_context(kwargs))

tests/contrib/celery/base.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from celery import Celery
44

5-
from ddtrace import Pin
5+
from ddtrace import Pin, config
66
from ddtrace.compat import PY2
77
from ddtrace.contrib.celery import patch, unpatch
88

@@ -21,19 +21,23 @@ class CeleryBaseTestCase(unittest.TestCase):
2121
"""
2222

2323
def setUp(self):
24+
# keep track of original config
25+
self._config = dict(config.celery)
2426
# instrument Celery and create an app with Broker and Result backends
2527
patch()
2628
self.tracer = get_dummy_tracer()
2729
self.pin = Pin(service='celery-unittest', tracer=self.tracer)
2830
self.app = Celery('celery.test_app', broker=BROKER_URL, backend=BACKEND_URL)
2931
# override pins to use our Dummy Tracer
3032
Pin.override(self.app, tracer=self.tracer)
31-
Pin.override(self.app.task, tracer=self.tracer)
32-
Pin.override(self.app.Task, tracer=self.tracer)
3333

3434
def tearDown(self):
35+
# remove instrumentation from Celery
3536
unpatch()
3637
self.app = None
38+
# restore the global configuration
39+
config.celery.update(self._config)
40+
self._config = None
3741

3842
def assert_items_equal(self, a, b):
3943
if PY2:

tests/contrib/celery/test_integration.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from nose.tools import eq_, ok_
44

5+
from ddtrace import config
56
from ddtrace.contrib.celery import patch, unpatch
67

78
from .base import CeleryBaseTestCase
@@ -272,3 +273,40 @@ def add(x ,y):
272273
eq_(span.get_tag('celery.id'), res.task_id)
273274
eq_(span.get_tag('celery.action'), 'run')
274275
eq_(span.get_tag('celery.state'), 'SUCCESS')
276+
277+
def test_worker_service_name(self):
278+
# Ensure worker service name can be changed via
279+
# configuration object
280+
config.celery['worker_service_name'] = 'worker-notify'
281+
282+
@self.app.task
283+
def fn_task():
284+
return 42
285+
286+
t = fn_task.apply()
287+
ok_(t.successful())
288+
eq_(42, t.result)
289+
290+
traces = self.tracer.writer.pop_traces()
291+
eq_(1, len(traces))
292+
eq_(1, len(traces[0]))
293+
span = traces[0][0]
294+
eq_(span.service, 'worker-notify')
295+
296+
def test_producer_service_name(self):
297+
# Ensure producer service name can be changed via
298+
# configuration object
299+
config.celery['producer_service_name'] = 'task-queue'
300+
301+
@self.app.task
302+
def fn_task():
303+
return 42
304+
305+
t = fn_task.delay()
306+
eq_('PENDING', t.status)
307+
308+
traces = self.tracer.writer.pop_traces()
309+
eq_(1, len(traces))
310+
eq_(1, len(traces[0]))
311+
span = traces[0][0]
312+
eq_(span.service, 'task-queue')

0 commit comments

Comments
 (0)