3030 TASK_WAKEUP_HANDLE ,
3131 TASK_WAKEUP_UNBLOCK ,
3232)
33+ from pulpcore .exceptions .base import PulpException , InternalErrorException
34+ from pulp_glue .common .exceptions import PulpException as PulpGlueException
35+
3336from pulpcore .middleware import x_task_diagnostics_var
3437from pulpcore .tasking .kafka import send_task_notification
3538
39+
3640_logger = logging .getLogger (__name__ )
3741
3842
@@ -70,17 +74,22 @@ def _execute_task(task):
7074 log_task_start (task , domain )
7175 task_function = get_task_function (task )
7276 result = task_function ()
77+ except (PulpException , PulpGlueException ):
78+ # Log expected ways to fail without a stacktrace.
79+ exc_type , exc , _ = sys .exc_info ()
80+ log_task_failed (task , exc_type , exc , None , domain )
81+ task .set_failed (exc )
7382 except Exception :
83+ # Unexpected Exceptions are most probably a programming error.
84+ # Log error with stack trace and return generic error (HTTP 500).
7485 exc_type , exc , tb = sys .exc_info ()
75- task .set_failed (exc , tb )
7686 log_task_failed (task , exc_type , exc , tb , domain )
77- send_task_notification (task )
87+ safe_exc = InternalErrorException ()
88+ task .set_failed (safe_exc )
7889 else :
7990 task .set_completed (result )
8091 log_task_completed (task , domain )
81- send_task_notification (task )
82- return result
83- return None
92+ send_task_notification (task )
8493
8594
8695async def aexecute_task (task ):
@@ -95,17 +104,23 @@ async def _aexecute_task(task):
95104 try :
96105 task_coroutine_fn = await aget_task_function (task )
97106 result = await task_coroutine_fn ()
107+ except (PulpException , PulpGlueException ):
108+ # Log expected ways to fail without a stacktrace.
109+ exc_type , exc , _ = sys .exc_info ()
110+ log_task_failed (task , exc_type , exc , None , domain )
111+ await sync_to_async (task .set_failed )(exc )
98112 except Exception :
113+ # Unexpected Exceptions are most probably a programming error.
114+ # Log error with stack trace and return generic error (HTTP 500).
99115 exc_type , exc , tb = sys .exc_info ()
100- await sync_to_async (task .set_failed )(exc , tb )
101116 log_task_failed (task , exc_type , exc , tb , domain )
102- send_task_notification (task )
117+ safe_exc = InternalErrorException ()
118+ await sync_to_async (task .set_failed )(safe_exc )
103119 else :
104120 await sync_to_async (task .set_completed )(result )
105121 send_task_notification (task )
106122 log_task_completed (task , domain )
107- return result
108- return None
123+ send_task_notification (task )
109124
110125
111126def log_task_start (task , domain ):
@@ -144,7 +159,8 @@ def log_task_failed(task, exc_type, exc, tb, domain):
144159 domain = domain .name ,
145160 )
146161 )
147- _logger .info ("\n " .join (traceback .format_list (traceback .extract_tb (tb ))))
162+ if tb :
163+ _logger .info ("\n " .join (traceback .format_list (traceback .extract_tb (tb ))))
148164
149165
150166async def aget_task_function (task ):
0 commit comments