-
Notifications
You must be signed in to change notification settings - Fork 798
fix(opentelemetry-instrumentation-celery): attach incoming context on… #2385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b513742
eef8b9d
9126473
9674c16
2e19183
da60cef
ac7f261
1069c80
121a78a
0de7c50
39482d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,7 @@ def add(x, y): | |
from billiard.einfo import ExceptionInfo | ||
from celery import signals # pylint: disable=no-name-in-module | ||
|
||
from opentelemetry import context as context_api | ||
from opentelemetry import trace | ||
from opentelemetry.instrumentation.celery import utils | ||
from opentelemetry.instrumentation.celery.package import _instruments | ||
|
@@ -169,6 +170,7 @@ def _trace_prerun(self, *args, **kwargs): | |
self.update_task_duration_time(task_id) | ||
request = task.request | ||
tracectx = extract(request, getter=celery_getter) or None | ||
token = context_api.attach(tracectx) if tracectx is not None else None | ||
|
||
logger.debug("prerun signal start task_id=%s", task_id) | ||
|
||
|
@@ -179,7 +181,7 @@ def _trace_prerun(self, *args, **kwargs): | |
|
||
activation = trace.use_span(span, end_on_exit=True) | ||
activation.__enter__() # pylint: disable=E1101 | ||
utils.attach_span(task, task_id, (span, activation)) | ||
utils.attach_context(task, task_id, span, activation, token) | ||
|
||
def _trace_postrun(self, *args, **kwargs): | ||
task = utils.retrieve_task(kwargs) | ||
|
@@ -191,11 +193,14 @@ def _trace_postrun(self, *args, **kwargs): | |
logger.debug("postrun signal task_id=%s", task_id) | ||
|
||
# retrieve and finish the Span | ||
span, activation = utils.retrieve_span(task, task_id) | ||
if span is None: | ||
ctx = utils.retrieve_context(task, task_id) | ||
|
||
if ctx is None: | ||
logger.warning("no existing span found for task_id=%s", task_id) | ||
return | ||
|
||
span, activation, token = ctx | ||
|
||
# request context tags | ||
if span.is_recording(): | ||
span.set_attribute(_TASK_TAG_KEY, _TASK_RUN) | ||
|
@@ -204,10 +209,11 @@ def _trace_postrun(self, *args, **kwargs): | |
span.set_attribute(_TASK_NAME_KEY, task.name) | ||
|
||
activation.__exit__(None, None, None) | ||
utils.detach_span(task, task_id) | ||
utils.detach_context(task, task_id) | ||
self.update_task_duration_time(task_id) | ||
labels = {"task": task.name, "worker": task.request.hostname} | ||
self._record_histograms(task_id, labels) | ||
context_api.detach(token) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems this line introduces a bug where the token could be "None" if the prerun signal has no traceparent. This could happen for example if the process sending the task isn't instrumented, while the celery worker is instrumented. This results in an error since token cannot be None when calling context_api.detach() This seems related as well: open-telemetry/opentelemetry-python#4163 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a silly mistake on my end. I'll get a fix for this tonight. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. started a draft PR here: #2855 still trying to figure out how to write a test for this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If an integration test is hard to write maybe an unit test could help in moving things forward |
||
|
||
def _trace_before_publish(self, *args, **kwargs): | ||
task = utils.retrieve_task_from_sender(kwargs) | ||
|
@@ -238,7 +244,9 @@ def _trace_before_publish(self, *args, **kwargs): | |
activation = trace.use_span(span, end_on_exit=True) | ||
activation.__enter__() # pylint: disable=E1101 | ||
|
||
utils.attach_span(task, task_id, (span, activation), is_publish=True) | ||
utils.attach_context( | ||
task, task_id, span, activation, None, is_publish=True | ||
) | ||
|
||
headers = kwargs.get("headers") | ||
if headers: | ||
|
@@ -253,13 +261,16 @@ def _trace_after_publish(*args, **kwargs): | |
return | ||
|
||
# retrieve and finish the Span | ||
_, activation = utils.retrieve_span(task, task_id, is_publish=True) | ||
if activation is None: | ||
ctx = utils.retrieve_context(task, task_id, is_publish=True) | ||
|
||
if ctx is None: | ||
logger.warning("no existing span found for task_id=%s", task_id) | ||
return | ||
|
||
_, activation, _ = ctx | ||
|
||
activation.__exit__(None, None, None) # pylint: disable=E1101 | ||
utils.detach_span(task, task_id, is_publish=True) | ||
utils.detach_context(task, task_id, is_publish=True) | ||
|
||
@staticmethod | ||
def _trace_failure(*args, **kwargs): | ||
|
@@ -269,9 +280,14 @@ def _trace_failure(*args, **kwargs): | |
if task is None or task_id is None: | ||
return | ||
|
||
# retrieve and pass exception info to activation | ||
span, _ = utils.retrieve_span(task, task_id) | ||
if span is None or not span.is_recording(): | ||
ctx = utils.retrieve_context(task, task_id) | ||
|
||
if ctx is None: | ||
return | ||
|
||
span, _, _ = ctx | ||
|
||
if not span.is_recording(): | ||
return | ||
|
||
status_kwargs = {"status_code": StatusCode.ERROR} | ||
|
@@ -311,8 +327,14 @@ def _trace_retry(*args, **kwargs): | |
if task is None or task_id is None or reason is None: | ||
return | ||
|
||
span, _ = utils.retrieve_span(task, task_id) | ||
if span is None or not span.is_recording(): | ||
ctx = utils.retrieve_context(task, task_id) | ||
|
||
if ctx is None: | ||
return | ||
|
||
span, _, _ = ctx | ||
|
||
if not span.is_recording(): | ||
return | ||
|
||
# Add retry reason metadata to span | ||
|
Uh oh!
There was an error while loading. Please reload this page.