Skip to content

Commit 1f4a6f8

Browse files
committed
Add test that update respects first_execution_run_id
1 parent 173826f commit 1f4a6f8

File tree

1 file changed

+47
-0
lines changed

1 file changed

+47
-0
lines changed

tests/worker/test_workflow.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4423,6 +4423,53 @@ async def test_workflow_update_task_fails(client: Client, env: WorkflowEnvironme
44234423
assert bad_validator_fail_ct == 2
44244424

44254425

4426+
@workflow.defn
4427+
class UpdateRespectsFirstExecutionRunIdWorkflow:
4428+
def __init__(self) -> None:
4429+
self.update_received = False
4430+
4431+
@workflow.run
4432+
async def run(self) -> None:
4433+
await workflow.wait_condition(lambda: self.update_received)
4434+
4435+
@workflow.update
4436+
async def update(self) -> None:
4437+
self.update_received = True
4438+
4439+
4440+
async def test_workflow_update_respects_first_execution_run_id(
4441+
client: Client, env: WorkflowEnvironment
4442+
):
4443+
# Start one workflow, obtain the run ID (r1), and let it complete. Start a second
4444+
# workflow with the same workflow ID, and try to send an update using the handle from
4445+
# r1.
4446+
workflow_id = f"update-respects-first-execution-run-id-{uuid.uuid4()}"
4447+
async with new_worker(client, UpdateRespectsFirstExecutionRunIdWorkflow) as worker:
4448+
4449+
async def start_workflow(workflow_id: str) -> WorkflowHandle:
4450+
return await client.start_workflow(
4451+
UpdateRespectsFirstExecutionRunIdWorkflow.run,
4452+
id=workflow_id,
4453+
task_queue=worker.task_queue,
4454+
)
4455+
4456+
wf_execution_1_handle = await start_workflow(workflow_id)
4457+
await wf_execution_1_handle.execute_update(
4458+
UpdateRespectsFirstExecutionRunIdWorkflow.update
4459+
)
4460+
await wf_execution_1_handle.result()
4461+
await start_workflow(workflow_id)
4462+
4463+
# Execution 1 has closed. This would succeed if the update incorrectly targets
4464+
# the second execution
4465+
with pytest.raises(RPCError) as exc_info:
4466+
await wf_execution_1_handle.execute_update(
4467+
UpdateRespectsFirstExecutionRunIdWorkflow.update
4468+
)
4469+
assert exc_info.value.status == RPCStatusCode.NOT_FOUND
4470+
assert "workflow execution not found" in str(exc_info.value)
4471+
4472+
44264473
@workflow.defn
44274474
class ImmediatelyCompleteUpdateAndWorkflow:
44284475
def __init__(self) -> None:

0 commit comments

Comments
 (0)