@@ -113,7 +113,6 @@ def _capture_exception(task, exc_info):
113113 return
114114
115115 if isinstance (exc_info [1 ], CELERY_CONTROL_FLOW_EXCEPTIONS ):
116- # ??? Doesn't map to anything
117116 _set_status ("aborted" )
118117 return
119118
@@ -277,16 +276,25 @@ def apply_async(*args, **kwargs):
277276 op = OP .QUEUE_SUBMIT_CELERY ,
278277 name = task_name ,
279278 origin = CeleryIntegration .origin ,
279+ only_if_parent = True ,
280280 )
281281 if not task_started_from_beat
282282 else NoOpMgr ()
283283 ) # type: Union[Span, NoOpMgr]
284284
285285 with span_mgr as span :
286- kwargs ["headers" ] = _update_celery_task_headers (
287- kwarg_headers , span , integration .monitor_beat_tasks
288- )
289- return f (* args , ** kwargs )
286+ try :
287+ kwargs ["headers" ] = _update_celery_task_headers (
288+ kwarg_headers , span , integration .monitor_beat_tasks
289+ )
290+ return_value = f (* args , ** kwargs )
291+
292+ except Exception :
293+ reraise (* sys .exc_info ())
294+ else :
295+ span .set_status (SPANSTATUS .OK )
296+
297+ return return_value
290298
291299 return apply_async # type: ignore
292300
@@ -314,24 +322,30 @@ def _inner(*args, **kwargs):
314322 # something such as attribute access can fail.
315323 headers = args [3 ].get ("headers" ) or {}
316324 with sentry_sdk .continue_trace (headers ):
317- with sentry_sdk .start_span (
318- op = OP .QUEUE_TASK_CELERY ,
319- name = task .name ,
320- source = TRANSACTION_SOURCE_TASK ,
321- origin = CeleryIntegration .origin ,
322- custom_sampling_context = {
323- "celery_job" : {
324- "task" : task .name ,
325- # for some reason, args[1] is a list if non-empty but a
326- # tuple if empty
327- "args" : list (args [1 ]),
328- "kwargs" : args [2 ],
329- }
330- },
331- ) as transaction :
332- return_value = f (* args , ** kwargs )
333- transaction .set_status (SPANSTATUS .OK )
334- return return_value
325+ try :
326+ with sentry_sdk .start_span (
327+ op = OP .QUEUE_TASK_CELERY ,
328+ name = task .name ,
329+ source = TRANSACTION_SOURCE_TASK ,
330+ origin = CeleryIntegration .origin ,
331+ custom_sampling_context = {
332+ "celery_job" : {
333+ "task" : task .name ,
334+ # for some reason, args[1] is a list if non-empty but a
335+ # tuple if empty
336+ "args" : list (args [1 ]),
337+ "kwargs" : args [2 ],
338+ }
339+ },
340+ ) as span :
341+ return_value = f (* args , ** kwargs )
342+
343+ except Exception :
344+ reraise (* sys .exc_info ())
345+ else :
346+ span .set_status (SPANSTATUS .OK )
347+
348+ return return_value
335349
336350 return _inner # type: ignore
337351
@@ -368,6 +382,7 @@ def _inner(*args, **kwargs):
368382 op = OP .QUEUE_PROCESS ,
369383 name = task .name ,
370384 origin = CeleryIntegration .origin ,
385+ only_if_parent = True ,
371386 ) as span :
372387 _set_messaging_destination_name (task , span )
373388
@@ -398,12 +413,17 @@ def _inner(*args, **kwargs):
398413 task .app .connection ().transport .driver_type ,
399414 )
400415
401- return f (* args , ** kwargs )
416+ result = f (* args , ** kwargs )
417+
402418 except Exception :
403419 exc_info = sys .exc_info ()
404420 with capture_internal_exceptions ():
405421 _capture_exception (task , exc_info )
406422 reraise (* exc_info )
423+ else :
424+ span .set_status (SPANSTATUS .OK )
425+
426+ return result
407427
408428 return _inner # type: ignore
409429
@@ -493,28 +513,36 @@ def sentry_publish(self, *args, **kwargs):
493513 routing_key = kwargs .get ("routing_key" )
494514 exchange = kwargs .get ("exchange" )
495515
496- with sentry_sdk .start_span (
497- op = OP .QUEUE_PUBLISH ,
498- name = task_name ,
499- origin = CeleryIntegration .origin ,
500- only_if_parent = True ,
501- ) as span :
502- if task_id is not None :
503- span .set_data (SPANDATA .MESSAGING_MESSAGE_ID , task_id )
504-
505- if exchange == "" and routing_key is not None :
506- # Empty exchange indicates the default exchange, meaning messages are
507- # routed to the queue with the same name as the routing key.
508- span .set_data (SPANDATA .MESSAGING_DESTINATION_NAME , routing_key )
516+ try :
517+ with sentry_sdk .start_span (
518+ op = OP .QUEUE_PUBLISH ,
519+ name = task_name ,
520+ origin = CeleryIntegration .origin ,
521+ only_if_parent = True ,
522+ ) as span :
523+ if task_id is not None :
524+ span .set_data (SPANDATA .MESSAGING_MESSAGE_ID , task_id )
509525
510- if retries is not None :
511- span .set_data (SPANDATA .MESSAGING_MESSAGE_RETRY_COUNT , retries )
526+ if exchange == "" and routing_key is not None :
527+ # Empty exchange indicates the default exchange, meaning messages are
528+ # routed to the queue with the same name as the routing key.
529+ span .set_data (SPANDATA .MESSAGING_DESTINATION_NAME , routing_key )
512530
513- with capture_internal_exceptions ():
514- span .set_data (
515- SPANDATA .MESSAGING_SYSTEM , self .connection .transport .driver_type
516- )
531+ if retries is not None :
532+ span .set_data (SPANDATA .MESSAGING_MESSAGE_RETRY_COUNT , retries )
533+
534+ with capture_internal_exceptions ():
535+ span .set_data (
536+ SPANDATA .MESSAGING_SYSTEM , self .connection .transport .driver_type
537+ )
538+
539+ return_value = original_publish (self , * args , ** kwargs )
540+
541+ except Exception :
542+ reraise (* sys .exc_info ())
543+ else :
544+ span .set_status (SPANSTATUS .OK )
517545
518- return original_publish ( self , * args , ** kwargs )
546+ return return_value
519547
520548 Producer .publish = sentry_publish
0 commit comments