diff --git a/temporalio/worker/_nexus.py b/temporalio/worker/_nexus.py index 9e6783245..72b47187f 100644 --- a/temporalio/worker/_nexus.py +++ b/temporalio/worker/_nexus.py @@ -32,7 +32,7 @@ import temporalio.common import temporalio.converter import temporalio.nexus -from temporalio.exceptions import ApplicationError +from temporalio.exceptions import ApplicationError, WorkflowAlreadyStartedError from temporalio.nexus import Info, logger from temporalio.service import RPCError, RPCStatusCode @@ -445,6 +445,12 @@ def _exception_to_handler_error(err: BaseException) -> nexusrpc.HandlerError: type=nexusrpc.HandlerErrorType.INTERNAL, retryable_override=not err.non_retryable, ) + elif isinstance(err, WorkflowAlreadyStartedError): + handler_err = nexusrpc.HandlerError( + err.message, + type=nexusrpc.HandlerErrorType.INTERNAL, + retryable_override=False, + ) elif isinstance(err, RPCError): if err.status == RPCStatusCode.INVALID_ARGUMENT: handler_err = nexusrpc.HandlerError( diff --git a/tests/nexus/test_workflow_caller_errors.py b/tests/nexus/test_workflow_caller_errors.py index e07c608ed..2bff390da 100644 --- a/tests/nexus/test_workflow_caller_errors.py +++ b/tests/nexus/test_workflow_caller_errors.py @@ -21,7 +21,7 @@ sync_operation, ) -from temporalio import workflow +from temporalio import nexus, workflow from temporalio.client import ( Client, WorkflowFailureError, @@ -47,6 +47,13 @@ class ErrorTestInput: id: str +@workflow.defn +class NonTerminatingWorkflow: + @workflow.run + async def run(self) -> None: + await asyncio.Event().wait() + + @nexusrpc.handler.service_handler class ErrorTestService: @nexusrpc.handler.sync_operation @@ -87,6 +94,18 @@ def retried_due_to_internal_handler_error( type=nexusrpc.HandlerErrorType.INTERNAL, ) + @nexusrpc.handler.sync_operation + async def fails_due_to_workflow_already_started( + self, ctx: nexusrpc.handler.StartOperationContext, input: ErrorTestInput + ) -> None: + operation_invocation_counts[input.id] += 1 + for _ in range(2): + await nexus.client().start_workflow( + NonTerminatingWorkflow.run, + id="second-start-request-will-fail", + task_queue=nexus.info().task_queue, + ) + @workflow.defn(sandboxed=False) class CallerWorkflow: @@ -156,6 +175,11 @@ async def times_called() -> int: nexusrpc.HandlerErrorType.NOT_FOUND, "No handler for service", ), + ( + "fails_due_to_workflow_already_started", + nexusrpc.HandlerErrorType.INTERNAL, + "already started", + ), ], ) async def test_nexus_operation_fails_without_retry_as_handler_error(