Skip to content

Commit c8bc329

Browse files
authored
Add tests for update-with-start reattach behavior (#1000)
* Set test server version back to `default` * Improve docstrings * Remove more experimental notices * Test UwS attach behaviors * Use an enum: Literal doesn't seem to pass mypy * Skip under JTS
1 parent fc564c4 commit c8bc329

File tree

3 files changed

+275
-19
lines changed

3 files changed

+275
-19
lines changed

temporalio/client.py

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -528,11 +528,12 @@ async def start_workflow(
528528
retries and continue as new.
529529
run_timeout: Timeout of a single workflow run.
530530
task_timeout: Timeout of a single workflow task.
531-
id_reuse_policy: How already-existing IDs are treated.
532-
id_conflict_policy: How already-running workflows of the same ID are
533-
treated. Default is unspecified which effectively means fail the
534-
start attempt. This cannot be set if ``id_reuse_policy`` is set
535-
to terminate if running.
531+
id_conflict_policy: Behavior when a workflow is currently running with the same ID.
532+
Default is UNSPECIFIED, which effectively means fail the start attempt.
533+
Set to USE_EXISTING for idempotent deduplication on workflow ID.
534+
Cannot be set if ``id_reuse_policy`` is set to TERMINATE_IF_RUNNING.
535+
id_reuse_policy: Behavior when a closed workflow with the same ID exists.
536+
Default is ALLOW_DUPLICATE.
536537
retry_policy: Retry policy for the workflow.
537538
cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/
538539
memo: Memo for the workflow.
@@ -2487,9 +2488,6 @@ class WithStartWorkflowOperation(Generic[SelfType, ReturnType]):
24872488
24882489
Update-With-Start allows you to send an update to a workflow, while starting the
24892490
workflow if necessary.
2490-
2491-
.. warning::
2492-
This API is experimental
24932491
"""
24942492

24952493
# Overload for no-param workflow, with_start
@@ -2657,9 +2655,6 @@ def __init__(
26572655
) -> None:
26582656
"""Create a WithStartWorkflowOperation.
26592657
2660-
.. warning::
2661-
This API is experimental
2662-
26632658
See :py:meth:`temporalio.client.Client.start_workflow` for documentation of the
26642659
arguments.
26652660
"""
@@ -2700,11 +2695,7 @@ def __init__(
27002695
self._used = False
27012696

27022697
async def workflow_handle(self) -> WorkflowHandle[SelfType, ReturnType]:
2703-
"""Wait until workflow is running and return a WorkflowHandle.
2704-
2705-
.. warning::
2706-
This API is experimental
2707-
"""
2698+
"""Wait until workflow is running and return a WorkflowHandle."""
27082699
return await self._workflow_handle
27092700

27102701

tests/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
# TODO: Change back to "default" after next CLI release
2-
DEV_SERVER_DOWNLOAD_VERSION = "v1.3.1-persistence-fix.0"
1+
DEV_SERVER_DOWNLOAD_VERSION = "default"

tests/worker/test_update_with_start.py

Lines changed: 267 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import uuid
45
from contextlib import contextmanager
56
from dataclasses import dataclass
67
from datetime import timedelta
7-
from enum import Enum
8+
from enum import Enum, IntEnum
89
from typing import Any, Iterator, Mapping, Optional
910
from unittest.mock import patch
1011

@@ -17,12 +18,14 @@
1718
OutboundInterceptor,
1819
StartWorkflowUpdateWithStartInput,
1920
WithStartWorkflowOperation,
21+
WorkflowExecutionStatus,
2022
WorkflowUpdateFailedError,
2123
WorkflowUpdateHandle,
2224
WorkflowUpdateStage,
2325
)
2426
from temporalio.common import (
2527
WorkflowIDConflictPolicy,
28+
WorkflowIDReusePolicy,
2629
)
2730
from temporalio.exceptions import ApplicationError, WorkflowAlreadyStartedError
2831
from temporalio.service import RPCError, RPCStatusCode, ServiceCall
@@ -859,3 +862,266 @@ async def __call__(
859862
assert err.value.status == RPCStatusCode.INTERNAL
860863
assert err.value.message == "empty details"
861864
assert len(err.value.grpc_status.details) == 0
865+
866+
867+
class ExecutionBehavior(IntEnum):
868+
COMPLETES = 0
869+
BLOCKS = 1
870+
871+
872+
@workflow.defn
873+
class WorkflowWithUpdate:
874+
def __init__(self) -> None:
875+
self._unblock_workflow = asyncio.Event()
876+
self._unblock_update = asyncio.Event()
877+
878+
@workflow.run
879+
async def run(self, behavior: ExecutionBehavior) -> str:
880+
if behavior == ExecutionBehavior.BLOCKS:
881+
await self._unblock_workflow.wait()
882+
return str(workflow.uuid4())
883+
884+
@workflow.update(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)
885+
async def update(self, behavior: ExecutionBehavior) -> str:
886+
if behavior == ExecutionBehavior.BLOCKS:
887+
await self._unblock_update.wait()
888+
return str(workflow.uuid4())
889+
890+
@workflow.signal
891+
async def unblock_workflow(self):
892+
self._unblock_workflow.set()
893+
894+
@workflow.signal
895+
async def unblock_update(self):
896+
self._unblock_update.set()
897+
898+
899+
@pytest.mark.parametrize(
900+
"workflow_behavior_name",
901+
[ExecutionBehavior.COMPLETES.name, ExecutionBehavior.BLOCKS.name],
902+
)
903+
@pytest.mark.parametrize(
904+
"id_conflict_policy_name",
905+
[
906+
WorkflowIDConflictPolicy.USE_EXISTING.name,
907+
WorkflowIDConflictPolicy.FAIL.name,
908+
],
909+
)
910+
@pytest.mark.parametrize(
911+
"id_reuse_policy_name",
912+
[
913+
WorkflowIDReusePolicy.ALLOW_DUPLICATE.name,
914+
WorkflowIDReusePolicy.REJECT_DUPLICATE.name,
915+
],
916+
)
917+
async def test_update_with_start_always_attaches_to_completed_update(
918+
env: WorkflowEnvironment,
919+
workflow_behavior_name: str,
920+
id_conflict_policy_name: str,
921+
id_reuse_policy_name: str,
922+
):
923+
"""
924+
A workflow exists and contains a completed update. An update-with-start sent for that workflow ID and that
925+
update ID attaches to the update if workflow is running. If the workflow is closed then it attaches iff
926+
the update is completed. The behavior is unaffected by the conflict policy or id reuse policy (so, for
927+
example, we attach to an update in an existing workflow even if the conflict policy is FAIL).
928+
"""
929+
if env.supports_time_skipping:
930+
pytest.skip("TODO: make update_with_start tests pass under Java test server")
931+
client = env.client
932+
id_conflict_policy = WorkflowIDConflictPolicy[id_conflict_policy_name]
933+
id_reuse_policy = WorkflowIDReusePolicy[id_reuse_policy_name]
934+
workflow_behavior = ExecutionBehavior[workflow_behavior_name]
935+
shared_workflow_id = f"workflow-id-{uuid.uuid4()}"
936+
shared_update_id = f"update-id-{uuid.uuid4()}"
937+
async with new_worker(client, WorkflowWithUpdate) as worker:
938+
939+
def start_op():
940+
return WithStartWorkflowOperation(
941+
WorkflowWithUpdate.run,
942+
workflow_behavior,
943+
id=shared_workflow_id,
944+
task_queue=worker.task_queue,
945+
id_conflict_policy=id_conflict_policy,
946+
id_reuse_policy=id_reuse_policy,
947+
)
948+
949+
start_op_1 = start_op()
950+
update_result_1 = await client.execute_update_with_start_workflow(
951+
WorkflowWithUpdate.update,
952+
ExecutionBehavior.COMPLETES,
953+
id=shared_update_id,
954+
start_workflow_operation=start_op_1,
955+
)
956+
wf_handle_1 = await start_op_1.workflow_handle()
957+
assert (await wf_handle_1.describe()).status == (
958+
WorkflowExecutionStatus.COMPLETED
959+
if workflow_behavior == ExecutionBehavior.COMPLETES
960+
else WorkflowExecutionStatus.RUNNING
961+
)
962+
963+
# Whether or not the workflow closed, the update exists in the last workflow run and is completed, so
964+
# we attach to it.
965+
966+
start_op_2 = start_op()
967+
update_result_2 = await client.execute_update_with_start_workflow(
968+
WorkflowWithUpdate.update,
969+
ExecutionBehavior.COMPLETES,
970+
id=shared_update_id,
971+
start_workflow_operation=start_op_2,
972+
)
973+
wf_handle_2 = await start_op_2.workflow_handle()
974+
assert wf_handle_1.first_execution_run_id == wf_handle_2.first_execution_run_id
975+
assert update_result_1 == update_result_2
976+
977+
978+
@pytest.mark.parametrize(
979+
"id_conflict_policy_name",
980+
[
981+
WorkflowIDConflictPolicy.USE_EXISTING.name,
982+
WorkflowIDConflictPolicy.FAIL.name,
983+
],
984+
)
985+
@pytest.mark.parametrize(
986+
"id_reuse_policy_name",
987+
[
988+
WorkflowIDReusePolicy.ALLOW_DUPLICATE.name,
989+
WorkflowIDReusePolicy.REJECT_DUPLICATE.name,
990+
],
991+
)
992+
async def test_update_with_start_attaches_to_non_completed_update_in_running_workflow(
993+
env: WorkflowEnvironment,
994+
id_conflict_policy_name: str,
995+
id_reuse_policy_name: str,
996+
):
997+
"""
998+
A workflow exists and is running and contains a non-completed update. An update-with-start sent for that
999+
workflow ID and that update ID attaches to the update. The behavior is unaffected by the conflict policy
1000+
or id reuse policy (so, for example, we attach to the update in an existing workflow even if the conflict
1001+
policy is FAIL).
1002+
"""
1003+
if env.supports_time_skipping:
1004+
pytest.skip("TODO: make update_with_start tests pass under Java test server")
1005+
client = env.client
1006+
id_conflict_policy = WorkflowIDConflictPolicy[id_conflict_policy_name]
1007+
id_reuse_policy = WorkflowIDReusePolicy[id_reuse_policy_name]
1008+
shared_workflow_id = f"workflow-id-{uuid.uuid4()}"
1009+
shared_update_id = f"update-id-{uuid.uuid4()}"
1010+
async with new_worker(client, WorkflowWithUpdate) as worker:
1011+
1012+
def start_op():
1013+
return WithStartWorkflowOperation(
1014+
WorkflowWithUpdate.run,
1015+
ExecutionBehavior.BLOCKS,
1016+
id=shared_workflow_id,
1017+
task_queue=worker.task_queue,
1018+
id_conflict_policy=id_conflict_policy,
1019+
id_reuse_policy=id_reuse_policy,
1020+
)
1021+
1022+
start_op_1 = start_op()
1023+
update_handle_1 = await client.start_update_with_start_workflow(
1024+
WorkflowWithUpdate.update,
1025+
ExecutionBehavior.BLOCKS,
1026+
id=shared_update_id,
1027+
start_workflow_operation=start_op_1,
1028+
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
1029+
)
1030+
wf_handle_1 = await start_op_1.workflow_handle()
1031+
assert (await wf_handle_1.describe()).status == WorkflowExecutionStatus.RUNNING
1032+
1033+
# The workflow is running with the update not-completed. We will attach to the update.
1034+
1035+
start_op_2 = start_op()
1036+
1037+
update_handle_2 = await client.start_update_with_start_workflow(
1038+
WorkflowWithUpdate.update,
1039+
ExecutionBehavior.COMPLETES,
1040+
id=shared_update_id,
1041+
start_workflow_operation=start_op_2,
1042+
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
1043+
)
1044+
wf_handle_2 = await start_op_2.workflow_handle()
1045+
assert wf_handle_1.first_execution_run_id == wf_handle_2.first_execution_run_id
1046+
await wf_handle_1.signal(WorkflowWithUpdate.unblock_update)
1047+
assert (await update_handle_1.result()) == (await update_handle_2.result())
1048+
1049+
1050+
@pytest.mark.parametrize(
1051+
"id_conflict_policy_name",
1052+
[
1053+
WorkflowIDConflictPolicy.USE_EXISTING.name,
1054+
WorkflowIDConflictPolicy.FAIL.name,
1055+
],
1056+
)
1057+
@pytest.mark.parametrize(
1058+
"id_reuse_policy_name",
1059+
[
1060+
WorkflowIDReusePolicy.ALLOW_DUPLICATE.name,
1061+
WorkflowIDReusePolicy.REJECT_DUPLICATE.name,
1062+
],
1063+
)
1064+
async def test_update_with_start_does_not_attach_to_non_completed_update_in_closed_workflow(
1065+
env: WorkflowEnvironment,
1066+
id_conflict_policy_name: str,
1067+
id_reuse_policy_name: str,
1068+
):
1069+
"""
1070+
A workflow exists but is closed and contains a non-completed update. An update-with-start sent for that workflow
1071+
ID and that update ID does not attach to the update. If the id reuse policy is ALLOW_DUPLICATE then a new
1072+
workflow is started and the update is issued.
1073+
"""
1074+
if env.supports_time_skipping:
1075+
pytest.skip("TODO: make update_with_start tests pass under Java test server")
1076+
client = env.client
1077+
id_conflict_policy = WorkflowIDConflictPolicy[id_conflict_policy_name]
1078+
id_reuse_policy = WorkflowIDReusePolicy[id_reuse_policy_name]
1079+
shared_workflow_id = f"workflow-id-{uuid.uuid4()}"
1080+
shared_update_id = f"update-id-{uuid.uuid4()}"
1081+
async with new_worker(client, WorkflowWithUpdate) as worker:
1082+
1083+
def start_op():
1084+
return WithStartWorkflowOperation(
1085+
WorkflowWithUpdate.run,
1086+
ExecutionBehavior.COMPLETES,
1087+
id=shared_workflow_id,
1088+
task_queue=worker.task_queue,
1089+
id_conflict_policy=id_conflict_policy,
1090+
id_reuse_policy=id_reuse_policy,
1091+
)
1092+
1093+
start_op_1 = start_op()
1094+
await client.start_update_with_start_workflow(
1095+
WorkflowWithUpdate.update,
1096+
ExecutionBehavior.BLOCKS,
1097+
id=shared_update_id,
1098+
start_workflow_operation=start_op_1,
1099+
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
1100+
)
1101+
wf_handle_1 = await start_op_1.workflow_handle()
1102+
assert (
1103+
await wf_handle_1.describe()
1104+
).status == WorkflowExecutionStatus.COMPLETED
1105+
1106+
# The workflow closed with the update not-completed. We will start a new workflow and issue the update
1107+
# iff reuse_policy is ALLOW_DUPLICATE. Conflict policy is irrelevant.
1108+
1109+
start_op_2 = start_op()
1110+
1111+
async def _do_update() -> Any:
1112+
return await client.execute_update_with_start_workflow(
1113+
WorkflowWithUpdate.update,
1114+
ExecutionBehavior.COMPLETES,
1115+
id=shared_update_id,
1116+
start_workflow_operation=start_op_2,
1117+
)
1118+
1119+
if id_reuse_policy == WorkflowIDReusePolicy.ALLOW_DUPLICATE:
1120+
await _do_update()
1121+
wf_handle_2 = await start_op_2.workflow_handle()
1122+
assert (
1123+
wf_handle_1.first_execution_run_id != wf_handle_2.first_execution_run_id
1124+
)
1125+
elif id_reuse_policy == WorkflowIDReusePolicy.REJECT_DUPLICATE:
1126+
with pytest.raises(WorkflowAlreadyStartedError):
1127+
await _do_update()

0 commit comments

Comments
 (0)