diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index fba87366af..6b74af1cb7 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -112,7 +112,6 @@ def _capture_exception(task, exc_info): return if isinstance(exc_info[1], CELERY_CONTROL_FLOW_EXCEPTIONS): - # ??? Doesn't map to anything _set_status("aborted") return @@ -276,6 +275,7 @@ def apply_async(*args, **kwargs): op=OP.QUEUE_SUBMIT_CELERY, name=task_name, origin=CeleryIntegration.origin, + only_if_parent=True, ) if not task_started_from_beat else NoOpMgr() @@ -306,11 +306,13 @@ def _inner(*args, **kwargs): with isolation_scope() as scope: scope._name = "celery" scope.clear_breadcrumbs() + scope.set_transaction_name(task.name, source=TRANSACTION_SOURCE_TASK) scope.add_event_processor(_make_event_processor(task, *args, **kwargs)) # Celery task objects are not a thing to be trusted. Even # something such as attribute access can fail. headers = args[3].get("headers") or {} + with sentry_sdk.continue_trace(headers): with sentry_sdk.start_span( op=OP.QUEUE_TASK_CELERY, @@ -320,9 +322,13 @@ def _inner(*args, **kwargs): # for some reason, args[1] is a list if non-empty but a # tuple if empty attributes=_prepopulate_attributes(task, list(args[1]), args[2]), - ) as transaction: - transaction.set_status(SPANSTATUS.OK) - return f(*args, **kwargs) + ) as root_span: + return_value = f(*args, **kwargs) + + if root_span.status is None: + root_span.set_status(SPANSTATUS.OK) + + return return_value return _inner # type: ignore @@ -359,6 +365,7 @@ def _inner(*args, **kwargs): op=OP.QUEUE_PROCESS, name=task.name, origin=CeleryIntegration.origin, + only_if_parent=True, ) as span: _set_messaging_destination_name(task, span) @@ -390,6 +397,7 @@ def _inner(*args, **kwargs): ) return f(*args, **kwargs) + except Exception: exc_info = sys.exc_info() with capture_internal_exceptions(): diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 2f8de6968a..c9a110af9e 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -5,9 +5,11 @@ import pytest from celery import Celery, VERSION from celery.bin import worker +from celery.app.task import Task +from opentelemetry import trace as otel_trace, context import sentry_sdk -from sentry_sdk import start_span, get_current_span +from sentry_sdk import get_current_span from sentry_sdk.integrations.celery import ( CeleryIntegration, _wrap_task_run, @@ -126,14 +128,14 @@ def dummy_task(x, y): foo = 42 # noqa return x / y - with start_span(op="unit test transaction") as transaction: + with sentry_sdk.start_span(op="unit test transaction") as root_span: celery_invocation(dummy_task, 1, 2) _, expected_context = celery_invocation(dummy_task, 1, 0) (_, error_event, _, _) = events - assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert error_event["contexts"]["trace"]["span_id"] != transaction.span_id + assert error_event["contexts"]["trace"]["trace_id"] == root_span.trace_id + assert error_event["contexts"]["trace"]["span_id"] != root_span.span_id assert error_event["transaction"] == "dummy_task" assert "celery_task_id" in error_event["tags"] assert error_event["extra"]["celery-job"] == dict( @@ -190,17 +192,14 @@ def test_transaction_events(capture_events, init_celery, celery_invocation, task def dummy_task(x, y): return x / y - # XXX: For some reason the first call does not get instrumented properly. - celery_invocation(dummy_task, 1, 1) - events = capture_events() - with start_span(name="submission") as transaction: + with sentry_sdk.start_span(name="submission") as root_span: celery_invocation(dummy_task, 1, 0 if task_fails else 1) if task_fails: error_event = events.pop(0) - assert error_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert error_event["contexts"]["trace"]["trace_id"] == root_span.trace_id assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" execution_event, submission_event = events @@ -211,8 +210,8 @@ def dummy_task(x, y): assert submission_event["transaction_info"] == {"source": "custom"} assert execution_event["type"] == submission_event["type"] == "transaction" - assert execution_event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert submission_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert execution_event["contexts"]["trace"]["trace_id"] == root_span.trace_id + assert submission_event["contexts"]["trace"]["trace_id"] == root_span.trace_id if task_fails: assert execution_event["contexts"]["trace"]["status"] == "internal_error" @@ -220,15 +219,12 @@ def dummy_task(x, y): assert execution_event["contexts"]["trace"]["status"] == "ok" assert len(execution_event["spans"]) == 1 - assert ( - execution_event["spans"][0].items() - >= { - "trace_id": str(transaction.trace_id), - "same_process_as_parent": True, + assert execution_event["spans"][0] == ApproxDict( + { + "trace_id": str(root_span.trace_id), "op": "queue.process", "description": "dummy_task", - "data": ApproxDict(), - }.items() + } ) assert submission_event["spans"] == [ { @@ -237,11 +233,14 @@ def dummy_task(x, y): "op": "queue.submit.celery", "origin": "auto.queue.celery", "parent_span_id": submission_event["contexts"]["trace"]["span_id"], - "same_process_as_parent": True, "span_id": submission_event["spans"][0]["span_id"], "start_timestamp": submission_event["spans"][0]["start_timestamp"], "timestamp": submission_event["spans"][0]["timestamp"], - "trace_id": str(transaction.trace_id), + "trace_id": str(root_span.trace_id), + "status": "ok", + "tags": { + "status": "ok", + }, } ] @@ -275,7 +274,7 @@ def test_simple_no_propagation(capture_events, init_celery): def dummy_task(): 1 / 0 - with start_span(name="task") as root_span: + with sentry_sdk.start_span(name="task") as root_span: dummy_task.delay() (event,) = events @@ -350,7 +349,7 @@ def dummy_task(self): runs.append(1) 1 / 0 - with start_span(name="submit_celery"): + with sentry_sdk.start_span(name="submit_celery"): # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes res = dummy_task.apply_async() @@ -445,7 +444,7 @@ def walk_dogs(x, y): walk_dogs, [["Maisey", "Charlie", "Bodhi", "Cory"], "Dog park round trip"], 1 ) - sampling_context = traces_sampler.call_args_list[1][0][0] + sampling_context = traces_sampler.call_args_list[0][0][0] assert sampling_context["celery.job.task"] == "dog_walk" for i, arg in enumerate(args_kwargs["args"]): assert sampling_context[f"celery.job.args.{i}"] == str(arg) @@ -469,7 +468,7 @@ def __call__(self, *args, **kwargs): def dummy_task(x, y): return x / y - with start_span(name="celery"): + with sentry_sdk.start_span(name="celery"): celery_invocation(dummy_task, 1, 0) assert not events @@ -510,7 +509,7 @@ def test_baggage_propagation(init_celery): def dummy_task(self, x, y): return _get_headers(self) - with start_span(name="task") as root_span: + with sentry_sdk.start_span(name="task") as root_span: result = dummy_task.apply_async( args=(1, 0), headers={"baggage": "custom=value"}, @@ -520,6 +519,7 @@ def dummy_task(self, x, y): [ "sentry-release=abcdef", "sentry-trace_id={}".format(root_span.trace_id), + "sentry-transaction=task", "sentry-environment=production", "sentry-sample_rate=1.0", "sentry-sampled=true", @@ -537,26 +537,42 @@ def test_sentry_propagate_traces_override(init_celery): propagate_traces=True, traces_sample_rate=1.0, release="abcdef" ) + # Since we're applying the task inline eagerly, + # we need to cleanup the otel context for this test. + # and since we patch build_tracer, we need to do this before that runs... + # TODO: the right way is to not test this inline + original_apply = Task.apply + + def cleaned_apply(*args, **kwargs): + token = context.attach(otel_trace.set_span_in_context(otel_trace.INVALID_SPAN)) + rv = original_apply(*args, **kwargs) + context.detach(token) + return rv + + Task.apply = cleaned_apply + @celery.task(name="dummy_task", bind=True) def dummy_task(self, message): trace_id = get_current_span().trace_id return trace_id - with start_span(name="task") as root_span: - transaction_trace_id = root_span.trace_id + with sentry_sdk.start_span(name="task") as root_span: + root_span_trace_id = root_span.trace_id # should propagate trace - task_transaction_id = dummy_task.apply_async( + task_trace_id = dummy_task.apply_async( args=("some message",), ).get() - assert transaction_trace_id == task_transaction_id + assert root_span_trace_id == task_trace_id, "Trace should be propagated" # should NOT propagate trace (overrides `propagate_traces` parameter in integration constructor) - task_transaction_id = dummy_task.apply_async( + task_trace_id = dummy_task.apply_async( args=("another message",), headers={"sentry-propagate-traces": False}, ).get() - assert transaction_trace_id != task_transaction_id + assert root_span_trace_id != task_trace_id, "Trace should NOT be propagated" + + Task.apply = original_apply def test_apply_async_manually_span(sentry_init): @@ -710,7 +726,7 @@ def publish(*args, **kwargs): @celery.task() def task(): ... - with start_span(name="task"): + with sentry_sdk.start_span(name="task"): task.apply_async() (event,) = events @@ -773,7 +789,7 @@ def publish(*args, **kwargs): @celery.task() def task(): ... - with start_span(name="custom_transaction"): + with sentry_sdk.start_span(name="custom_transaction"): task.apply_async() (event,) = events