Skip to content

Commit a20b4c2

Browse files
fix(celery): stop closing prerun_span too soon to account for Celery chains scenario [backport 2.17] (#11805)
Backport e8aab65 from #11498 to 2.17. We've made a few changes to handle celery context recently, including: #10676 In particular the goal of #10676 was to handle a scenario where a long running task may run into an exception, preventing it from closing. Unfortunately, this scenario did not account for cases where tasks are chained and may not close until later. See: #11479 and #11624 With this PR, the sample app in #11479 would attach the celery specific span back to the root span. I also need to add tests for the chains scenario. Related to AIDM-494 ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) Co-authored-by: wantsui <[email protected]>
1 parent aa59f38 commit a20b4c2

File tree

6 files changed

+85
-14
lines changed

6 files changed

+85
-14
lines changed

ddtrace/contrib/internal/celery/app.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,6 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
133133
if task_span:
134134
task_span.set_exc_info(*sys.exc_info())
135135

136-
prerun_span = core.get_item("prerun_span")
137-
if prerun_span:
138-
prerun_span.set_exc_info(*sys.exc_info())
139-
140136
raise
141137
finally:
142138
task_span = core.get_item("task_span")
@@ -147,11 +143,4 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
147143
)
148144
task_span.finish()
149145

150-
prerun_span = core.get_item("prerun_span")
151-
if prerun_span:
152-
log.debug(
153-
"The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()
154-
)
155-
prerun_span.finish()
156-
157146
return _traced_apply_async_inner

ddtrace/contrib/internal/celery/signals.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ def trace_prerun(*args, **kwargs):
5454
service = config.celery["worker_service_name"]
5555
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)
5656

57-
# Store an item called "prerun span" in case task_postrun doesn't get called
58-
core.set_item("prerun_span", span)
59-
6057
# set span.kind to the type of request being performed
6158
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
6259

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
tracing(celery): Fixes an issue where ``celery.apply`` spans from Celery prerun got closed too soon leading to span tags being missing.

tests/contrib/celery/run_tasks.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from tasks import fn_a
2+
from tasks import fn_b
3+
4+
5+
(fn_a.si() | fn_b.si()).delay()

tests/contrib/celery/tasks.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from celery import Celery
2+
3+
4+
app = Celery("tasks")
5+
6+
7+
@app.task(name="tests.contrib.celery.tasks.fn_a")
8+
def fn_a():
9+
return "a"
10+
11+
12+
@app.task(name="tests.contrib.celery.tasks.fn_b")
13+
def fn_b():
14+
return "b"
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import os
2+
import re
3+
import subprocess
4+
import time
5+
6+
from celery import Celery
7+
8+
9+
# Ensure that when we call Celery chains, the root span has celery specific span tags
10+
# The test_integration.py setup doesn't perfectly mimic the condition of a worker process running.
11+
# This test runs the worker as a side so we can check the tracer logs afterwards to ensure expected span results.
12+
# See https://github.com/DataDog/dd-trace-py/issues/11479
13+
def test_task_chain_task_call_task():
14+
app = Celery("tasks")
15+
16+
celery_worker_cmd = "ddtrace-run celery -A tasks worker -c 1 -l DEBUG -n uniquename1 -P solo"
17+
celery_task_runner_cmd = "ddtrace-run python run_tasks.py"
18+
19+
# The commands need to run from the directory where this test file lives
20+
current_directory = str(os.path.dirname(__file__))
21+
22+
worker_process = subprocess.Popen(
23+
celery_worker_cmd.split(),
24+
stdout=subprocess.PIPE,
25+
stderr=subprocess.PIPE,
26+
preexec_fn=os.setsid,
27+
close_fds=True,
28+
cwd=current_directory,
29+
)
30+
31+
max_wait_time = 10
32+
waited_so_far = 0
33+
# {app.control.inspect().active() returns {'celery@uniquename1': []} when the worker is running}
34+
while app.control.inspect().active() is None and waited_so_far < max_wait_time:
35+
time.sleep(1)
36+
waited_so_far += 1
37+
38+
# The task should only run after the Celery worker has sufficient time to start up
39+
task_runner_process = subprocess.Popen(
40+
celery_task_runner_cmd.split(),
41+
stdout=subprocess.PIPE,
42+
stderr=subprocess.PIPE,
43+
preexec_fn=os.setsid,
44+
close_fds=True,
45+
cwd=current_directory,
46+
)
47+
48+
task_runner_process.wait()
49+
# Kill the process so it starts to send traces to the Trace Agent
50+
worker_process.kill()
51+
worker_logs = worker_process.stderr.read()
52+
53+
# Check that the root span was created with one of the Celery specific tags, such as celery.correlation_id
54+
# Some versions of python seem to require escaping when using `re.search`:
55+
old_pattern_match = r"resource=\\'tests.contrib.celery.tasks.fn_a\\' type=\\'worker\\' .* tags=.*correlation_id.*"
56+
new_pattern_match = r"resource=\'tests.contrib.celery.tasks.fn_a\' type=\'worker\' .* tags=.*correlation_id.*"
57+
58+
pattern_exists = (
59+
re.search(old_pattern_match, str(worker_logs)) is not None
60+
or re.search(new_pattern_match, str(worker_logs)) is not None
61+
)
62+
assert pattern_exists is not None

0 commit comments

Comments
 (0)