Skip to content

Commit edb3c7e

Browse files
committed
Add test that update respects first_execution_run_id
1 parent 341d949 commit edb3c7e

File tree

1 file changed

+47
-0
lines changed

1 file changed

+47
-0
lines changed

tests/worker/test_workflow.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4425,6 +4425,53 @@ async def test_workflow_update_task_fails(client: Client, env: WorkflowEnvironme
44254425
assert bad_validator_fail_ct == 2
44264426

44274427

4428+
@workflow.defn
4429+
class UpdateRespectsFirstExecutionRunIdWorkflow:
4430+
def __init__(self) -> None:
4431+
self.update_received = False
4432+
4433+
@workflow.run
4434+
async def run(self) -> None:
4435+
await workflow.wait_condition(lambda: self.update_received)
4436+
4437+
@workflow.update
4438+
async def update(self) -> None:
4439+
self.update_received = True
4440+
4441+
4442+
async def test_workflow_update_respects_first_execution_run_id(
4443+
client: Client, env: WorkflowEnvironment
4444+
):
4445+
# Start one workflow, obtain the run ID (r1), and let it complete. Start a second
4446+
# workflow with the same workflow ID, and try to send an update using the handle from
4447+
# r1.
4448+
workflow_id = f"update-respects-first-execution-run-id-{uuid.uuid4()}"
4449+
async with new_worker(client, UpdateRespectsFirstExecutionRunIdWorkflow) as worker:
4450+
4451+
async def start_workflow(workflow_id: str) -> WorkflowHandle:
4452+
return await client.start_workflow(
4453+
UpdateRespectsFirstExecutionRunIdWorkflow.run,
4454+
id=workflow_id,
4455+
task_queue=worker.task_queue,
4456+
)
4457+
4458+
wf_execution_1_handle = await start_workflow(workflow_id)
4459+
await wf_execution_1_handle.execute_update(
4460+
UpdateRespectsFirstExecutionRunIdWorkflow.update
4461+
)
4462+
await wf_execution_1_handle.result()
4463+
await start_workflow(workflow_id)
4464+
4465+
# Execution 1 has closed. This would succeed if the update incorrectly targets
4466+
# the second execution
4467+
with pytest.raises(RPCError) as exc_info:
4468+
await wf_execution_1_handle.execute_update(
4469+
UpdateRespectsFirstExecutionRunIdWorkflow.update
4470+
)
4471+
assert exc_info.value.status == RPCStatusCode.NOT_FOUND
4472+
assert "workflow execution not found" in str(exc_info.value)
4473+
4474+
44284475
@workflow.defn
44294476
class ImmediatelyCompleteUpdateAndWorkflow:
44304477
def __init__(self) -> None:

0 commit comments

Comments
 (0)