@@ -259,6 +259,7 @@ def activate(
259259 self ._time_ns = act .timestamp .ToNanoseconds ()
260260 self ._is_replaying = act .is_replaying
261261
262+ activation_err : Optional [Exception ] = None
262263 try :
263264 # Split into job sets with patches, then signals, then non-queries, then
264265 # queries
@@ -287,7 +288,17 @@ def activate(
287288 # be checked in patch jobs (first index) or query jobs (last
288289 # index).
289290 self ._run_once (check_conditions = index == 1 or index == 2 )
291+ except temporalio .exceptions .FailureError as err :
292+ # We want failure errors during activation, like those that can
293+ # happen during payload conversion, to fail the workflow not the
294+ # task
295+ try :
296+ self ._set_workflow_failure (err )
297+ except Exception as inner_err :
298+ activation_err = inner_err
290299 except Exception as err :
300+ activation_err = err
301+ if activation_err :
291302 logger .warning (
292303 f"Failed activation on workflow { self ._info .workflow_type } with ID { self ._info .workflow_id } and run ID { self ._info .run_id } " ,
293304 exc_info = True ,
@@ -296,7 +307,7 @@ def activate(
296307 self ._current_completion .failed .failure .SetInParent ()
297308 try :
298309 self ._failure_converter .to_failure (
299- err ,
310+ activation_err ,
300311 self ._payload_converter ,
301312 self ._current_completion .failed .failure ,
302313 )
@@ -1134,6 +1145,9 @@ def _convert_payloads(
11341145 payloads ,
11351146 type_hints = types ,
11361147 )
1148+ except temporalio .exceptions .FailureError :
1149+ # Don't wrap payload conversion errors that would fail the workflow
1150+ raise
11371151 except Exception as err :
11381152 raise RuntimeError ("Failed decoding arguments" ) from err
11391153
@@ -1227,33 +1241,26 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
12271241 # cancel later on will show the workflow as cancelled. But this is
12281242 # a Temporal limitation in that cancellation is a state not an
12291243 # event.
1230- if self ._cancel_requested and (
1231- isinstance (err , temporalio .exceptions .CancelledError )
1232- or (
1233- (
1234- isinstance (err , temporalio .exceptions .ActivityError )
1235- or isinstance (err , temporalio .exceptions .ChildWorkflowError )
1236- )
1237- and isinstance (err .cause , temporalio .exceptions .CancelledError )
1238- )
1244+ if self ._cancel_requested and temporalio .exceptions .is_cancelled_exception (
1245+ err
12391246 ):
12401247 self ._add_command ().cancel_workflow_execution .SetInParent ()
12411248 elif isinstance (err , temporalio .exceptions .FailureError ):
12421249 # All other failure errors fail the workflow
1243- failure = self ._add_command ().fail_workflow_execution .failure
1244- failure .SetInParent ()
1245- try :
1246- self ._failure_converter .to_failure (
1247- err , self ._payload_converter , failure
1248- )
1249- except Exception as inner_err :
1250- raise ValueError (
1251- "Failed converting workflow exception"
1252- ) from inner_err
1250+ self ._set_workflow_failure (err )
12531251 else :
12541252 # All other exceptions fail the task
12551253 self ._current_activation_error = err
12561254
1255+ def _set_workflow_failure (self , err : temporalio .exceptions .FailureError ) -> None :
1256+ # All other failure errors fail the workflow
1257+ failure = self ._add_command ().fail_workflow_execution .failure
1258+ failure .SetInParent ()
1259+ try :
1260+ self ._failure_converter .to_failure (err , self ._payload_converter , failure )
1261+ except Exception as inner_err :
1262+ raise ValueError ("Failed converting workflow exception" ) from inner_err
1263+
12571264 async def _signal_external_workflow (
12581265 self ,
12591266 # Should not have seq set
0 commit comments