Skip to content

Commit 7f9de0d

Browse files
namoshizunbasepi
andauthored
Celery contrib: read custom parent span id from the task headers (#1500)
* Celery contrib: read parent span id from celery headers if provided * Simplify setting custom parent span if by copying from the current trace parent * Should attempt to get trace parent from celery task headers before trying the inner headers * CHANGELOG and lint Co-authored-by: Colton Myers <[email protected]>
1 parent 0b8b129 commit 7f9de0d

File tree

2 files changed

+18
-11
lines changed

2 files changed

+18
-11
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ endif::[]
4040
* Change default for `sanitize_field_names` to sanitize `*auth*` instead of `authorization` {pull}1494[#1494]
4141
* Add `span_stack_trace_min_duration` to replace deprecated `span_frames_min_duration` {pull}1498[#1498]
4242
* Enable exact_match span compression by default {pull}1504[#1504]
43+
* Allow parent celery tasks to specify the downstream `parent_span_id` in celery headers {pull}1500[#1500]
4344
4445
[float]
4546
===== Bug fixes

elasticapm/contrib/celery/__init__.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,24 +68,30 @@ def set_celery_headers(headers=None, **kwargs):
6868
transaction = execution_context.get_transaction()
6969
if transaction is not None:
7070
trace_parent = transaction.trace_parent
71-
trace_parent_string = trace_parent.to_string()
7271

73-
headers.update({"elasticapm": {"trace_parent_string": trace_parent_string}})
72+
# Customize parent span id (if provided)
73+
apm_headers = headers.get("elasticapm", dict())
74+
if "parent_span_id" in apm_headers:
75+
trace_parent = trace_parent.copy_from()
76+
trace_parent.span_id = apm_headers["parent_span_id"]
77+
78+
apm_headers["trace_parent_string"] = trace_parent.to_string()
79+
headers.update(elasticapm=apm_headers)
7480

7581

7682
def get_trace_parent(celery_task):
7783
"""
7884
Return a trace parent contained in the request headers of a Celery Task object or None
7985
"""
80-
trace_parent = None
81-
with suppress(AttributeError, KeyError, TypeError):
82-
if celery_task.request.headers is not None:
83-
trace_parent_string = celery_task.request.headers["elasticapm"]["trace_parent_string"]
84-
trace_parent = TraceParent.from_string(trace_parent_string)
85-
else:
86-
trace_parent_string = celery_task.request.elasticapm["trace_parent_string"]
87-
trace_parent = TraceParent.from_string(trace_parent_string)
88-
return trace_parent
86+
read_from_inner_headers = lambda: celery_task.request.headers["elasticapm"]["trace_parent_string"]
87+
read_from_request = lambda: celery_task.request.elasticapm["trace_parent_string"]
88+
89+
for read_fun in (read_from_request, read_from_inner_headers):
90+
with suppress(AttributeError, KeyError, TypeError):
91+
trace_parent_string = read_fun()
92+
return TraceParent.from_string(trace_parent_string)
93+
94+
return None
8995

9096

9197
def register_instrumentation(client):

0 commit comments

Comments
 (0)