Skip to content

Commit 214b314

Browse files
committed
Add previous run failure
1 parent 3443f81 commit 214b314

File tree

5 files changed

+69
-4
lines changed

5 files changed

+69
-4
lines changed

temporalio/worker/_workflow.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,10 @@ def _create_workflow_instance(
552552
priority=temporalio.common.Priority._from_proto(init.priority),
553553
)
554554

555+
previous_run_failure = None
556+
if init.HasField("continued_failure"):
557+
previous_run_failure = init.continued_failure
558+
555559
# Create instance from details
556560
det = WorkflowInstanceDetails(
557561
payload_converter_class=self._data_converter.payload_converter_class,
@@ -564,6 +568,7 @@ def _create_workflow_instance(
564568
disable_eager_activity_execution=self._disable_eager_activity_execution,
565569
worker_level_failure_exception_types=self._workflow_failure_exception_types,
566570
last_completion_result=init.last_completion_result,
571+
previous_run_failure=previous_run_failure,
567572
)
568573
if defn.sandboxed:
569574
return self._workflow_runner.create_instance(det)

temporalio/worker/_workflow_instance.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import temporalio.workflow
6565
from temporalio.service import __version__
6666

67+
from ..api.failure.v1.message_pb2 import Failure
6768
from ._interceptor import (
6869
ContinueAsNewInput,
6970
ExecuteWorkflowInput,
@@ -144,6 +145,7 @@ class WorkflowInstanceDetails:
144145
disable_eager_activity_execution: bool
145146
worker_level_failure_exception_types: Sequence[Type[BaseException]]
146147
last_completion_result: temporalio.api.common.v1.Payloads
148+
previous_run_failure: Optional[Failure]
147149

148150

149151
class WorkflowInstance(ABC):
@@ -322,6 +324,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
322324
self._current_details = ""
323325

324326
self._last_completion_result = det.last_completion_result
327+
self._previous_run_failure = det.previous_run_failure
325328

326329
# The versioning behavior of this workflow, as established by annotation or by the dynamic
327330
# config function. Is only set once upon initialization.
@@ -1689,6 +1692,9 @@ def workflow_set_current_details(self, details: str):
16891692
self._assert_not_read_only("set current details")
16901693
self._current_details = details
16911694

1695+
def workflow_has_last_completion_result(self) -> bool:
1696+
return len(self._last_completion_result.payloads) > 0
1697+
16921698
def workflow_last_completion_result(
16931699
self, type_hint: Optional[Type]
16941700
) -> Optional[Any]:
@@ -1709,6 +1715,14 @@ def workflow_last_completion_result(
17091715
self._last_completion_result.payloads[0], type_hint
17101716
)
17111717

1718+
def workflow_previous_run_failure(self) -> Optional[BaseException]:
1719+
if self._previous_run_failure:
1720+
return self._failure_converter.from_failure(
1721+
self._previous_run_failure, self._payload_converter
1722+
)
1723+
1724+
return None
1725+
17121726
#### Calls from outbound impl ####
17131727
# These are in alphabetical order and all start with "_outbound_".
17141728

temporalio/worker/workflow_sandbox/_runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import temporalio.workflow
2020

2121
from ...api.common.v1.message_pb2 import Payloads
22+
from ...api.failure.v1.message_pb2 import Failure
2223

2324
# Workflow instance has to be relative import
2425
from .._workflow_instance import (
@@ -87,6 +88,7 @@ def prepare_workflow(self, defn: temporalio.workflow._Definition) -> None:
8788
disable_eager_activity_execution=False,
8889
worker_level_failure_exception_types=self._worker_level_failure_exception_types,
8990
last_completion_result=Payloads(),
91+
previous_run_failure=Failure(),
9092
),
9193
)
9294

temporalio/workflow.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import temporalio.workflow
6262
from temporalio.nexus._util import ServiceHandlerT
6363

64+
from .api.failure.v1.message_pb2 import Failure
6465
from .types import (
6566
AnyType,
6667
CallableAsyncNoParam,
@@ -897,11 +898,17 @@ def workflow_get_current_details(self) -> str: ...
897898
@abstractmethod
898899
def workflow_set_current_details(self, details: str): ...
899900

901+
@abstractmethod
902+
def workflow_has_last_completion_result(self) -> bool: ...
903+
900904
@abstractmethod
901905
def workflow_last_completion_result(
902906
self, type_hint: Optional[Type]
903907
) -> Optional[Any]: ...
904908

909+
@abstractmethod
910+
def workflow_previous_run_failure(self) -> Optional[BaseException]: ...
911+
905912

906913
_current_update_info: contextvars.ContextVar[UpdateInfo] = contextvars.ContextVar(
907914
"__temporal_current_update_info"
@@ -1044,6 +1051,13 @@ def get_current_details() -> str:
10441051
return _Runtime.current().workflow_get_current_details()
10451052

10461053

1054+
def has_last_completion_result() -> bool:
1055+
"""Get the last completion result of the workflow. This be None if there was
1056+
no previous completion or the result was None
1057+
"""
1058+
return _Runtime.current().workflow_has_last_completion_result()
1059+
1060+
10471061
@overload
10481062
def get_last_completion_result() -> Optional[Any]: ...
10491063

@@ -1053,14 +1067,18 @@ def get_last_completion_result(type_hint: Type[ParamType]) -> Optional[ParamType
10531067

10541068

10551069
def get_last_completion_result(type_hint: Optional[Type] = None) -> Optional[Any]:
1056-
"""Get the current details of the workflow which may appear in the UI/CLI.
1057-
Unlike static details set at start, this value can be updated throughout
1058-
the life of the workflow and is independent of the static details.
1059-
This can be in Temporal markdown format and can span multiple lines.
1070+
"""Get the last completion result of the workflow. This be None if there was
1071+
no previous completion or the result was None. has_last_completion_result()
1072+
can be used to differentiate.
10601073
"""
10611074
return _Runtime.current().workflow_last_completion_result(type_hint)
10621075

10631076

1077+
def get_previous_run_failure() -> Optional[BaseException]:
1078+
"""Get the last failure of the workflow."""
1079+
return _Runtime.current().workflow_previous_run_failure()
1080+
1081+
10641082
def set_current_details(description: str) -> None:
10651083
"""Set the current details of the workflow which may appear in the UI/CLI.
10661084
Unlike static details set at start, this value can be updated throughout

tests/worker/test_workflow.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8327,3 +8327,29 @@ async def test_workflow_headers_with_codec(
83278327
assert headers["foo"].data == b"bar"
83288328
else:
83298329
assert headers["foo"].data != b"bar"
8330+
8331+
8332+
@workflow.defn
8333+
class PreviousRunFailureWorkflow:
8334+
@workflow.run
8335+
async def run(self) -> str:
8336+
if workflow.info().attempt != 1:
8337+
previous_failure = workflow.get_previous_run_failure()
8338+
assert isinstance(previous_failure, ApplicationError)
8339+
assert previous_failure.message == "Intentional Failure"
8340+
return "Done"
8341+
raise ApplicationError("Intentional Failure")
8342+
8343+
8344+
async def test_previous_run_failure(client: Client):
8345+
async with new_worker(client, PreviousRunFailureWorkflow) as worker:
8346+
handle = await client.start_workflow(
8347+
PreviousRunFailureWorkflow.run,
8348+
id=f"previous-run-failure-workflow-{uuid.uuid4()}",
8349+
task_queue=worker.task_queue,
8350+
retry_policy=RetryPolicy(
8351+
initial_interval=timedelta(milliseconds=10),
8352+
),
8353+
)
8354+
result = await handle.result()
8355+
assert result == "Done"

0 commit comments

Comments
 (0)