Skip to content

Commit 73401ab

Browse files
P403n1x87jdmergify[bot]
authored
perf(celery): reduce tracing overheads (#2579)
* perf(celery): reduce tracing overheads This change tries to reduce the overheads introduced by the Celery instrumentation. Tags are set straight from the context extraction rather than via an intermediate dictionary. Tag names are pre-computed to avoid the cost of string concats at runtime. * Update ddtrace/contrib/celery/utils.py Co-authored-by: Julien Danjou <[email protected]> * guard del operation Co-authored-by: Julien Danjou <[email protected]> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent 355df5e commit 73401ab

File tree

4 files changed

+66
-49
lines changed

4 files changed

+66
-49
lines changed

ddtrace/contrib/celery/signals.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from .utils import detach_span
1515
from .utils import retrieve_span
1616
from .utils import retrieve_task_id
17-
from .utils import tags_from_context
17+
from .utils import set_tags_from_context
1818

1919

2020
log = get_logger(__name__)
@@ -70,8 +70,8 @@ def trace_postrun(*args, **kwargs):
7070
else:
7171
# request context tags
7272
span.set_tag(c.TASK_TAG_KEY, c.TASK_RUN)
73-
span.set_tags(tags_from_context(kwargs))
74-
span.set_tags(tags_from_context(task.request))
73+
set_tags_from_context(span, kwargs)
74+
set_tags_from_context(span, task.request.__dict__)
7575
span.finish()
7676
detach_span(task, task_id)
7777

@@ -107,7 +107,7 @@ def trace_before_publish(*args, **kwargs):
107107
span.set_tag(SPAN_MEASURED_KEY)
108108
span.set_tag(c.TASK_TAG_KEY, c.TASK_APPLY_ASYNC)
109109
span.set_tag("celery.id", task_id)
110-
span.set_tags(tags_from_context(kwargs))
110+
set_tags_from_context(span, kwargs)
111111

112112
# Note: adding tags from `traceback` or `state` calls will make an
113113
# API call to the backend for the properties so we should rely

ddtrace/contrib/celery/utils.py

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,59 @@
1+
from typing import Any
2+
from typing import Dict
13
from weakref import WeakValueDictionary
24

5+
from ddtrace.span import Span
6+
37
from .constants import CTX_KEY
48

59

6-
def tags_from_context(context):
10+
TAG_KEYS = frozenset(
11+
[
12+
("compression", "celery.compression"),
13+
("correlation_id", "celery.correlation_id"),
14+
("countdown", "celery.countdown"),
15+
("delivery_info", "celery.delivery_info"),
16+
("eta", "celery.eta"),
17+
("exchange", "celery.exchange"),
18+
("expires", "celery.expires"),
19+
("hostname", "celery.hostname"),
20+
("id", "celery.id"),
21+
("priority", "celery.priority"),
22+
("queue", "celery.queue"),
23+
("reply_to", "celery.reply_to"),
24+
("retries", "celery.retries"),
25+
("routing_key", "celery.routing_key"),
26+
("serializer", "celery.serializer"),
27+
("timelimit", "celery.timelimit"),
28+
# Celery 4.0 uses `origin` instead of `hostname`; this change preserves
29+
# the same name for the tag despite Celery version
30+
("origin", "celery.hostname"),
31+
("state", "celery.state"),
32+
]
33+
)
34+
35+
36+
def set_tags_from_context(span, context):
37+
# type: (Span, Dict[str, Any]) -> None
738
"""Helper to extract meta values from a Celery Context"""
8-
tag_keys = (
9-
"compression",
10-
"correlation_id",
11-
"countdown",
12-
"delivery_info",
13-
"eta",
14-
"exchange",
15-
"expires",
16-
"hostname",
17-
"id",
18-
"priority",
19-
"queue",
20-
"reply_to",
21-
"retries",
22-
"routing_key",
23-
"serializer",
24-
"timelimit",
25-
"origin",
26-
"state",
27-
)
28-
29-
tags = {}
30-
for key in tag_keys:
39+
40+
for key, tag_name in TAG_KEYS:
3141
value = context.get(key)
3242

3343
# Skip this key if it is not set
3444
if value is None or value == "":
3545
continue
3646

37-
# Skip `timelimit` if it is not set (it's default/unset value is a
47+
# Skip `timelimit` if it is not set (its default/unset value is a
3848
# tuple or a list of `None` values
39-
if key == "timelimit" and value in [(None, None), [None, None]]:
49+
if key == "timelimit" and all(_ is None for _ in value):
4050
continue
4151

42-
# Skip `retries` if it's value is `0`
52+
# Skip `retries` if its value is `0`
4353
if key == "retries" and value == 0:
4454
continue
4555

46-
# Celery 4.0 uses `origin` instead of `hostname`; this change preserves
47-
# the same name for the tag despite Celery version
48-
if key == "origin":
49-
key = "hostname"
50-
51-
# prefix the tag as 'celery'
52-
tag_name = "celery.{}".format(key)
53-
tags[tag_name] = value
54-
return tags
56+
span.set_tag(tag_name, value)
5557

5658

5759
def attach_span(task, task_id, span, is_publish=False):
@@ -64,7 +66,7 @@ def attach_span(task, task_id, span, is_publish=False):
6466
task from within another task does not cause any conflicts.
6567
6668
This mostly happens when either a task fails and a retry policy is in place,
67-
or when a task is manually retries (e.g. `task.retry()`), we end up trying
69+
or when a task is manually retried (e.g. `task.retry()`), we end up trying
6870
to publish a task with the same id as the task currently running.
6971
7072
Previously publishing the new task would overwrite the existing `celery.run` span
@@ -90,7 +92,10 @@ def detach_span(task, task_id, is_publish=False):
9092
return
9193

9294
# DEV: See note in `attach_span` for key info
93-
weak_dict.pop((task_id, is_publish), None)
95+
try:
96+
del weak_dict[(task_id, is_publish)]
97+
except KeyError:
98+
pass
9499

95100

96101
def retrieve_span(task, task_id, is_publish=False):
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
Performance of the Celery integration has been improved.

tests/contrib/celery/test_utils.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
from ddtrace.contrib.celery.utils import detach_span
55
from ddtrace.contrib.celery.utils import retrieve_span
66
from ddtrace.contrib.celery.utils import retrieve_task_id
7-
from ddtrace.contrib.celery.utils import tags_from_context
7+
from ddtrace.contrib.celery.utils import set_tags_from_context
8+
from ddtrace.span import Span
89

910
from .base import CeleryBaseTestCase
1011

@@ -29,35 +30,42 @@ def test_tags_from_context(self):
2930
"custom_meta": "custom_value",
3031
}
3132

32-
metas = tags_from_context(context)
33+
span = Span(None, "test")
34+
set_tags_from_context(span, context)
35+
metas = span.meta
36+
metrics = span.metrics
37+
sentinel = object()
3338
assert metas["celery.correlation_id"] == "44b7f305"
3439
assert metas["celery.delivery_info"] == '{"eager": "True"}'
3540
assert metas["celery.eta"] == "soon"
3641
assert metas["celery.expires"] == "later"
3742
assert metas["celery.hostname"] == "localhost"
3843
assert metas["celery.id"] == "44b7f305"
3944
assert metas["celery.reply_to"] == "44b7f305"
40-
assert metas["celery.retries"] == 4
41-
assert metas["celery.timelimit"] == ("now", "later")
42-
assert metas.get("custom_meta", None) is None
45+
assert metrics["celery.retries"] == 4
46+
assert metas["celery.timelimit"] == "('now', 'later')"
47+
assert metas.get("custom_meta", sentinel) is sentinel
48+
assert metrics.get("custom_metric", sentinel) is sentinel
4349

4450
def test_tags_from_context_empty_keys(self):
4551
# it should not extract empty keys
52+
span = Span(None, "test")
4653
context = {
4754
"correlation_id": None,
4855
"exchange": "",
4956
"timelimit": (None, None),
5057
"retries": 0,
5158
}
59+
tags = span.meta
5260

53-
tags = tags_from_context(context)
61+
set_tags_from_context(span, context)
5462
assert {} == tags
5563
# edge case: `timelimit` can also be a list of None values
5664
context = {
5765
"timelimit": [None, None],
5866
}
5967

60-
tags = tags_from_context(context)
68+
set_tags_from_context(span, context)
6169
assert {} == tags
6270

6371
def test_span_propagation(self):

0 commit comments

Comments
 (0)