Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion temporalio/worker/_nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import temporalio.common
import temporalio.converter
import temporalio.nexus
from temporalio.exceptions import ApplicationError
from temporalio.exceptions import ApplicationError, WorkflowAlreadyStartedError
from temporalio.nexus import Info, logger
from temporalio.service import RPCError, RPCStatusCode

Expand Down Expand Up @@ -445,6 +445,12 @@ def _exception_to_handler_error(err: BaseException) -> nexusrpc.HandlerError:
type=nexusrpc.HandlerErrorType.INTERNAL,
retryable_override=not err.non_retryable,
)
elif isinstance(err, WorkflowAlreadyStartedError):
handler_err = nexusrpc.HandlerError(
err.message,
type=nexusrpc.HandlerErrorType.INTERNAL,
retryable_override=False,
)
elif isinstance(err, RPCError):
if err.status == RPCStatusCode.INVALID_ARGUMENT:
handler_err = nexusrpc.HandlerError(
Expand Down
26 changes: 25 additions & 1 deletion tests/nexus/test_workflow_caller_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
sync_operation,
)

from temporalio import workflow
from temporalio import nexus, workflow
from temporalio.client import (
Client,
WorkflowFailureError,
Expand All @@ -47,6 +47,13 @@ class ErrorTestInput:
id: str


@workflow.defn
class NonTerminatingWorkflow:
@workflow.run
async def run(self) -> None:
await asyncio.Event().wait()


@nexusrpc.handler.service_handler
class ErrorTestService:
@nexusrpc.handler.sync_operation
Expand Down Expand Up @@ -87,6 +94,18 @@ def retried_due_to_internal_handler_error(
type=nexusrpc.HandlerErrorType.INTERNAL,
)

@nexusrpc.handler.sync_operation
async def fails_due_to_workflow_already_started(
self, ctx: nexusrpc.handler.StartOperationContext, input: ErrorTestInput
) -> None:
operation_invocation_counts[input.id] += 1
for _ in range(2):
await nexus.client().start_workflow(
NonTerminatingWorkflow.run,
id="second-start-request-will-fail",
task_queue=nexus.info().task_queue,
)


@workflow.defn(sandboxed=False)
class CallerWorkflow:
Expand Down Expand Up @@ -156,6 +175,11 @@ async def times_called() -> int:
nexusrpc.HandlerErrorType.NOT_FOUND,
"No handler for service",
),
(
"fails_due_to_workflow_already_started",
nexusrpc.HandlerErrorType.INTERNAL,
"already started",
),
],
)
async def test_nexus_operation_fails_without_retry_as_handler_error(
Expand Down
Loading