Skip to content

Commit 399d8b1

Browse files
committed
WIP Last completion result
1 parent 7f228d8 commit 399d8b1

File tree

6 files changed

+84
-2
lines changed

6 files changed

+84
-2
lines changed

temporalio/worker/_workflow.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,7 @@ def _create_workflow_instance(
563563
extern_functions=self._extern_functions,
564564
disable_eager_activity_execution=self._disable_eager_activity_execution,
565565
worker_level_failure_exception_types=self._workflow_failure_exception_types,
566+
last_completion_result=init.last_completion_result,
566567
)
567568
if defn.sandboxed:
568569
return self._workflow_runner.create_instance(det)

temporalio/worker/_workflow_instance.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
WorkflowInboundInterceptor,
8080
WorkflowOutboundInterceptor,
8181
)
82+
from ..api.common.v1.message_pb2 import Payloads
8283

8384
logger = logging.getLogger(__name__)
8485

@@ -143,7 +144,7 @@ class WorkflowInstanceDetails:
143144
extern_functions: Mapping[str, Callable]
144145
disable_eager_activity_execution: bool
145146
worker_level_failure_exception_types: Sequence[Type[BaseException]]
146-
147+
last_completion_result: Payloads
147148

148149
class WorkflowInstance(ABC):
149150
"""Instance of a workflow that can handle activations."""
@@ -320,6 +321,8 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
320321
# metadata query
321322
self._current_details = ""
322323

324+
self._last_completion_result = det.last_completion_result
325+
323326
# The versioning behavior of this workflow, as established by annotation or by the dynamic
324327
# config function. Is only set once upon initialization.
325328
self._versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None
@@ -1686,6 +1689,17 @@ def workflow_set_current_details(self, details: str):
16861689
self._assert_not_read_only("set current details")
16871690
self._current_details = details
16881691

1692+
def workflow_last_completion_result(self, type_hint: Optional[Type]) -> Optional[Any]:
1693+
print("workflow_last_completion_result: ", self._last_completion_result, type(self._last_completion_result), "payload length:", len(self._last_completion_result.payloads))
1694+
if len(self._last_completion_result.payloads) == 0:
1695+
return None
1696+
elif len(self._last_completion_result.payloads) > 1:
1697+
warnings.warn(f"Expected single last completion result, got {len(self._last_completion_result.payloads)}")
1698+
return None
1699+
1700+
print("Payload:", self._last_completion_result.payloads[0])
1701+
return self._payload_converter.from_payload(self._last_completion_result.payloads[0], type_hint)
1702+
16891703
#### Calls from outbound impl ####
16901704
# These are in alphabetical order and all start with "_outbound_".
16911705

@@ -2766,6 +2780,14 @@ def _apply_schedule_command(
27662780
v.start_to_close_timeout.FromTimedelta(self._input.start_to_close_timeout)
27672781
if self._input.retry_policy:
27682782
self._input.retry_policy.apply_to_proto(v.retry_policy)
2783+
<<<<<<< Updated upstream
2784+
=======
2785+
print("Input summary:", self._input.summary)
2786+
if self._input.summary:
2787+
command.user_metadata.summary.CopyFrom(
2788+
self._instance._payload_converter.to_payload(self._input.summary)
2789+
)
2790+
>>>>>>> Stashed changes
27692791
v.cancellation_type = cast(
27702792
temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType,
27712793
int(self._input.cancellation_type),

temporalio/worker/workflow_sandbox/_runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
)
2828
from ._importer import Importer
2929
from ._restrictions import RestrictionContext, SandboxRestrictions
30+
from ...api.common.v1.message_pb2 import Payloads
3031

3132
_fake_info = temporalio.workflow.Info(
3233
attempt=-1,
@@ -84,6 +85,7 @@ def prepare_workflow(self, defn: temporalio.workflow._Definition) -> None:
8485
extern_functions={},
8586
disable_eager_activity_execution=False,
8687
worker_level_failure_exception_types=self._worker_level_failure_exception_types,
88+
last_completion_result=Payloads(),
8789
),
8890
)
8991

temporalio/workflow.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,9 @@ def workflow_get_current_details(self) -> str: ...
897897
@abstractmethod
898898
def workflow_set_current_details(self, details: str): ...
899899

900+
@abstractmethod
901+
def workflow_last_completion_result(self, type_hint: Optional[Type]) -> Optional[Any]: ...
902+
900903

901904
_current_update_info: contextvars.ContextVar[UpdateInfo] = contextvars.ContextVar(
902905
"__temporal_current_update_info"
@@ -1039,6 +1042,19 @@ def get_current_details() -> str:
10391042
return _Runtime.current().workflow_get_current_details()
10401043

10411044

1045+
@overload
1046+
def get_last_completion_result(type_hint: Type[ParamType]) -> Optional[ParamType]: ...
1047+
1048+
1049+
def get_last_completion_result(type_hint: Optional[Type] = None) -> Optional[Any]:
1050+
"""Get the current details of the workflow which may appear in the UI/CLI.
1051+
Unlike static details set at start, this value can be updated throughout
1052+
the life of the workflow and is independent of the static details.
1053+
This can be in Temporal markdown format and can span multiple lines.
1054+
"""
1055+
return _Runtime.current().workflow_last_completion_result(type_hint)
1056+
1057+
10421058
def set_current_details(description: str) -> None:
10431059
"""Set the current details of the workflow which may appear in the UI/CLI.
10441060
Unlike static details set at start, this value can be updated throughout

tests/test_client.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import dataclasses
23
import json
34
import os
@@ -1501,3 +1502,43 @@ async def test_cloud_client_simple():
15011502
GetNamespaceRequest(namespace=os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"])
15021503
)
15031504
assert os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"] == result.namespace.namespace
1505+
1506+
1507+
@workflow.defn
1508+
class LastCompletionResultWorkflow:
1509+
@workflow.run
1510+
async def run(self) -> str:
1511+
last_result = workflow.get_last_completion_result(type_hint=str)
1512+
if last_result is not None:
1513+
return "From last completion:" + last_result
1514+
else:
1515+
return "My First Result"
1516+
1517+
1518+
async def test_schedule_last_completion_result(
1519+
client: Client, env: WorkflowEnvironment
1520+
):
1521+
if env.supports_time_skipping:
1522+
pytest.skip("Java test server doesn't support schedules")
1523+
1524+
async with new_worker(client, LastCompletionResultWorkflow) as worker:
1525+
handle = await client.create_schedule(
1526+
f"schedule-{uuid.uuid4()}",
1527+
Schedule(
1528+
action=ScheduleActionStartWorkflow(
1529+
"LastCompletionResultWorkflow",
1530+
id=f"workflow-{uuid.uuid4()}",
1531+
task_queue=worker.task_queue,
1532+
),
1533+
spec=ScheduleSpec(),
1534+
),
1535+
)
1536+
await handle.trigger()
1537+
await asyncio.sleep(1)
1538+
await handle.trigger()
1539+
await asyncio.sleep(1)
1540+
print(await handle.describe())
1541+
1542+
await handle.delete()
1543+
assert False
1544+

0 commit comments

Comments
 (0)