55import pytest
66from celery import Celery , VERSION
77from celery .bin import worker
8+ from celery .app .task import Task
9+ from opentelemetry import trace as otel_trace , context
810
911import sentry_sdk
10- from sentry_sdk import start_span , get_current_span
12+ from sentry_sdk import get_current_span
1113from sentry_sdk .integrations .celery import (
1214 CeleryIntegration ,
1315 _wrap_task_run ,
@@ -126,14 +128,14 @@ def dummy_task(x, y):
126128 foo = 42 # noqa
127129 return x / y
128130
129- with start_span (op = "unit test transaction" ) as transaction :
131+ with sentry_sdk . start_span (op = "unit test transaction" ) as root_span :
130132 celery_invocation (dummy_task , 1 , 2 )
131133 _ , expected_context = celery_invocation (dummy_task , 1 , 0 )
132134
133135 (_ , error_event , _ , _ ) = events
134136
135- assert error_event ["contexts" ]["trace" ]["trace_id" ] == transaction .trace_id
136- assert error_event ["contexts" ]["trace" ]["span_id" ] != transaction .span_id
137+ assert error_event ["contexts" ]["trace" ]["trace_id" ] == root_span .trace_id
138+ assert error_event ["contexts" ]["trace" ]["span_id" ] != root_span .span_id
137139 assert error_event ["transaction" ] == "dummy_task"
138140 assert "celery_task_id" in error_event ["tags" ]
139141 assert error_event ["extra" ]["celery-job" ] == dict (
@@ -190,17 +192,14 @@ def test_transaction_events(capture_events, init_celery, celery_invocation, task
190192 def dummy_task (x , y ):
191193 return x / y
192194
193- # XXX: For some reason the first call does not get instrumented properly.
194- celery_invocation (dummy_task , 1 , 1 )
195-
196195 events = capture_events ()
197196
198- with start_span (name = "submission" ) as transaction :
197+ with sentry_sdk . start_span (name = "submission" ) as root_span :
199198 celery_invocation (dummy_task , 1 , 0 if task_fails else 1 )
200199
201200 if task_fails :
202201 error_event = events .pop (0 )
203- assert error_event ["contexts" ]["trace" ]["trace_id" ] == transaction .trace_id
202+ assert error_event ["contexts" ]["trace" ]["trace_id" ] == root_span .trace_id
204203 assert error_event ["exception" ]["values" ][0 ]["type" ] == "ZeroDivisionError"
205204
206205 execution_event , submission_event = events
@@ -211,24 +210,21 @@ def dummy_task(x, y):
211210 assert submission_event ["transaction_info" ] == {"source" : "custom" }
212211
213212 assert execution_event ["type" ] == submission_event ["type" ] == "transaction"
214- assert execution_event ["contexts" ]["trace" ]["trace_id" ] == transaction .trace_id
215- assert submission_event ["contexts" ]["trace" ]["trace_id" ] == transaction .trace_id
213+ assert execution_event ["contexts" ]["trace" ]["trace_id" ] == root_span .trace_id
214+ assert submission_event ["contexts" ]["trace" ]["trace_id" ] == root_span .trace_id
216215
217216 if task_fails :
218217 assert execution_event ["contexts" ]["trace" ]["status" ] == "internal_error"
219218 else :
220219 assert execution_event ["contexts" ]["trace" ]["status" ] == "ok"
221220
222221 assert len (execution_event ["spans" ]) == 1
223- assert (
224- execution_event ["spans" ][0 ].items ()
225- >= {
226- "trace_id" : str (transaction .trace_id ),
227- "same_process_as_parent" : True ,
222+ assert execution_event ["spans" ][0 ] == ApproxDict (
223+ {
224+ "trace_id" : str (root_span .trace_id ),
228225 "op" : "queue.process" ,
229226 "description" : "dummy_task" ,
230- "data" : ApproxDict (),
231- }.items ()
227+ }
232228 )
233229 assert submission_event ["spans" ] == [
234230 {
@@ -237,11 +233,14 @@ def dummy_task(x, y):
237233 "op" : "queue.submit.celery" ,
238234 "origin" : "auto.queue.celery" ,
239235 "parent_span_id" : submission_event ["contexts" ]["trace" ]["span_id" ],
240- "same_process_as_parent" : True ,
241236 "span_id" : submission_event ["spans" ][0 ]["span_id" ],
242237 "start_timestamp" : submission_event ["spans" ][0 ]["start_timestamp" ],
243238 "timestamp" : submission_event ["spans" ][0 ]["timestamp" ],
244- "trace_id" : str (transaction .trace_id ),
239+ "trace_id" : str (root_span .trace_id ),
240+ "status" : "ok" ,
241+ "tags" : {
242+ "status" : "ok" ,
243+ },
245244 }
246245 ]
247246
@@ -268,14 +267,16 @@ def dummy_task():
268267
269268
270269def test_simple_no_propagation (capture_events , init_celery ):
271- celery = init_celery (propagate_traces = False )
270+ with pytest .warns (DeprecationWarning ):
271+ celery = init_celery (propagate_traces = False )
272+
272273 events = capture_events ()
273274
274275 @celery .task (name = "dummy_task" )
275276 def dummy_task ():
276277 1 / 0
277278
278- with start_span (name = "task" ) as root_span :
279+ with sentry_sdk . start_span (name = "task" ) as root_span :
279280 dummy_task .delay ()
280281
281282 (event ,) = events
@@ -350,7 +351,7 @@ def dummy_task(self):
350351 runs .append (1 )
351352 1 / 0
352353
353- with start_span (name = "submit_celery" ):
354+ with sentry_sdk . start_span (name = "submit_celery" ):
354355 # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes
355356 res = dummy_task .apply_async ()
356357
@@ -445,7 +446,7 @@ def walk_dogs(x, y):
445446 walk_dogs , [["Maisey" , "Charlie" , "Bodhi" , "Cory" ], "Dog park round trip" ], 1
446447 )
447448
448- sampling_context = traces_sampler .call_args_list [1 ][0 ][0 ]
449+ sampling_context = traces_sampler .call_args_list [0 ][0 ][0 ]
449450 assert sampling_context ["celery.job.task" ] == "dog_walk"
450451 for i , arg in enumerate (args_kwargs ["args" ]):
451452 assert sampling_context [f"celery.job.args.{ i } " ] == str (arg )
@@ -469,7 +470,7 @@ def __call__(self, *args, **kwargs):
469470 def dummy_task (x , y ):
470471 return x / y
471472
472- with start_span (name = "celery" ):
473+ with sentry_sdk . start_span (name = "celery" ):
473474 celery_invocation (dummy_task , 1 , 0 )
474475
475476 assert not events
@@ -510,7 +511,7 @@ def test_baggage_propagation(init_celery):
510511 def dummy_task (self , x , y ):
511512 return _get_headers (self )
512513
513- with start_span (name = "task" ) as root_span :
514+ with sentry_sdk . start_span (name = "task" ) as root_span :
514515 result = dummy_task .apply_async (
515516 args = (1 , 0 ),
516517 headers = {"baggage" : "custom=value" },
@@ -520,6 +521,7 @@ def dummy_task(self, x, y):
520521 [
521522 "sentry-release=abcdef" ,
522523 "sentry-trace_id={}" .format (root_span .trace_id ),
524+ "sentry-transaction=task" ,
523525 "sentry-environment=production" ,
524526 "sentry-sample_rate=1.0" ,
525527 "sentry-sampled=true" ,
@@ -533,30 +535,47 @@ def test_sentry_propagate_traces_override(init_celery):
533535 Test if the `sentry-propagate-traces` header given to `apply_async`
534536 overrides the `propagate_traces` parameter in the integration constructor.
535537 """
536- celery = init_celery (
537- propagate_traces = True , traces_sample_rate = 1.0 , release = "abcdef"
538- )
538+ with pytest .warns (DeprecationWarning ):
539+ celery = init_celery (
540+ propagate_traces = True , traces_sample_rate = 1.0 , release = "abcdef"
541+ )
542+
543+ # Since we're applying the task inline eagerly,
544+ # we need to cleanup the otel context for this test.
545+ # and since we patch build_tracer, we need to do this before that runs...
546+ # TODO: the right way is to not test this inline
547+ original_apply = Task .apply
548+
549+ def cleaned_apply (* args , ** kwargs ):
550+ token = context .attach (otel_trace .set_span_in_context (otel_trace .INVALID_SPAN ))
551+ rv = original_apply (* args , ** kwargs )
552+ context .detach (token )
553+ return rv
554+
555+ Task .apply = cleaned_apply
539556
540557 @celery .task (name = "dummy_task" , bind = True )
541558 def dummy_task (self , message ):
542559 trace_id = get_current_span ().trace_id
543560 return trace_id
544561
545- with start_span (name = "task" ) as root_span :
546- transaction_trace_id = root_span .trace_id
562+ with sentry_sdk . start_span (name = "task" ) as root_span :
563+ root_span_trace_id = root_span .trace_id
547564
548565 # should propagate trace
549- task_transaction_id = dummy_task .apply_async (
566+ task_trace_id = dummy_task .apply_async (
550567 args = ("some message" ,),
551568 ).get ()
552- assert transaction_trace_id == task_transaction_id
569+ assert root_span_trace_id == task_trace_id , "Trace should be propagated"
553570
554571 # should NOT propagate trace (overrides `propagate_traces` parameter in integration constructor)
555- task_transaction_id = dummy_task .apply_async (
572+ task_trace_id = dummy_task .apply_async (
556573 args = ("another message" ,),
557574 headers = {"sentry-propagate-traces" : False },
558575 ).get ()
559- assert transaction_trace_id != task_transaction_id
576+ assert root_span_trace_id != task_trace_id , "Trace should NOT be propagated"
577+
578+ Task .apply = original_apply
560579
561580
562581def test_apply_async_manually_span (sentry_init ):
@@ -710,7 +729,7 @@ def publish(*args, **kwargs):
710729 @celery .task ()
711730 def task (): ...
712731
713- with start_span (name = "task" ):
732+ with sentry_sdk . start_span (name = "task" ):
714733 task .apply_async ()
715734
716735 (event ,) = events
@@ -773,7 +792,7 @@ def publish(*args, **kwargs):
773792 @celery .task ()
774793 def task (): ...
775794
776- with start_span (name = "custom_transaction" ):
795+ with sentry_sdk . start_span (name = "custom_transaction" ):
777796 task .apply_async ()
778797
779798 (event ,) = events
0 commit comments