Skip to content

Commit 2458566

Browse files
committed
re-raise ALREADY_EXISTS as WorkflowAlreadyStartedError
1 parent 3c17209 commit 2458566

File tree

2 files changed

+42
-32
lines changed

2 files changed

+42
-32
lines changed

temporalio/client.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6078,11 +6078,24 @@ def on_start(
60786078
None,
60796079
)
60806080
if status and status.code in RPCStatusCode:
6081-
err = RPCError(
6082-
status.message,
6083-
RPCStatusCode(status.code),
6084-
err.raw_grpc_status,
6085-
)
6081+
if (
6082+
status.code == RPCStatusCode.ALREADY_EXISTS
6083+
and status.details
6084+
):
6085+
details = temporalio.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure()
6086+
if status.details[0].Unpack(details):
6087+
err = temporalio.exceptions.WorkflowAlreadyStartedError(
6088+
input.start_workflow_input.id,
6089+
input.start_workflow_input.workflow,
6090+
run_id=details.run_id,
6091+
)
6092+
else:
6093+
err = RPCError(
6094+
status.message,
6095+
RPCStatusCode(status.code),
6096+
err.raw_grpc_status,
6097+
)
6098+
60866099
raise err
60876100
finally:
60886101
if not seen_start:

tests/worker/test_update_with_start.py

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
Client,
1515
Interceptor,
1616
OutboundInterceptor,
17-
RPCError,
1817
StartWorkflowUpdateWithStartInput,
1918
WithStartWorkflowOperation,
2019
WorkflowUpdateFailedError,
@@ -24,8 +23,7 @@
2423
from temporalio.common import (
2524
WorkflowIDConflictPolicy,
2625
)
27-
from temporalio.exceptions import ApplicationError
28-
from temporalio.service import RPCStatusCode
26+
from temporalio.exceptions import ApplicationError, WorkflowAlreadyStartedError
2927
from tests.helpers import (
3028
new_worker,
3129
)
@@ -230,15 +228,14 @@ async def _do_execute_update_test(
230228
await start_op_2.workflow_handle()
231229
).first_execution_run_id is not None
232230
else:
233-
with pytest.raises(RPCError) as e:
234-
await client.execute_update_with_start_workflow(
231+
for aw in [
232+
client.execute_update_with_start_workflow(
235233
update_handler, "21", start_workflow_operation=start_op_2
236-
)
237-
238-
assert e.value.grpc_status.code == RPCStatusCode.ALREADY_EXISTS
239-
with pytest.raises(RPCError) as e2:
240-
await start_op_2.workflow_handle()
241-
assert e2.value.grpc_status.code == RPCStatusCode.ALREADY_EXISTS
234+
),
235+
start_op_2.workflow_handle(),
236+
]:
237+
with pytest.raises(WorkflowAlreadyStartedError):
238+
await aw
242239

243240
assert (
244241
await start_op_1.workflow_handle()
@@ -337,18 +334,18 @@ def make_start_op(workflow_id: str):
337334
# Second UWS start fails because the workflow already exists
338335
# first execution run ID is not set on the second UWS handle
339336
start_op_2 = make_start_op("wid-1")
340-
with pytest.raises(RPCError) as exc_info:
341-
await client.start_update_with_start_workflow(
337+
338+
for aw in [
339+
client.start_update_with_start_workflow(
342340
WorkflowForUpdateWithStartTest.my_non_blocking_update,
343341
"2",
344342
wait_for_stage=WorkflowUpdateStage.COMPLETED,
345343
start_workflow_operation=start_op_2,
346-
)
347-
assert exc_info.value.status == RPCStatusCode.ALREADY_EXISTS
348-
349-
with pytest.raises(RPCError) as e2:
350-
await start_op_2.workflow_handle()
351-
assert e2.value.grpc_status.code == RPCStatusCode.ALREADY_EXISTS
344+
),
345+
start_op_2.workflow_handle(),
346+
]:
347+
with pytest.raises(WorkflowAlreadyStartedError):
348+
await aw
352349

353350
# Third UWS start succeeds, but the update fails after acceptance
354351
start_op_3 = make_start_op("wid-2")
@@ -421,18 +418,18 @@ def make_start_op(workflow_id: str):
421418
)
422419

423420
start_op_2 = make_start_op(wid)
424-
with pytest.raises(RPCError) as exc_info:
425-
await client.start_update_with_start_workflow(
421+
422+
for aw in [
423+
client.start_update_with_start_workflow(
426424
WorkflowForUpdateWithStartTest.my_non_blocking_update,
427425
"2",
428426
wait_for_stage=WorkflowUpdateStage.COMPLETED,
429427
start_workflow_operation=start_op_2,
430-
)
431-
assert exc_info.value.grpc_status.code == RPCStatusCode.ALREADY_EXISTS
432-
433-
with pytest.raises(RPCError) as exc_info2:
434-
await start_op_2.workflow_handle()
435-
assert exc_info2.value.grpc_status.code == RPCStatusCode.ALREADY_EXISTS
428+
),
429+
start_op_2.workflow_handle(),
430+
]:
431+
with pytest.raises(WorkflowAlreadyStartedError):
432+
await aw
436433

437434

438435
async def test_workflow_update_poll_loop(client: Client):

0 commit comments

Comments
 (0)