Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,15 @@ def from_payloads(
raise RuntimeError(
f"Payload at index {index} with encoding {encoding.decode()} could not be converted"
) from err
except Exception as err:
if (hasattr(err, '__module__') and
err.__module__ and
'pydantic' in err.__module__ and
'ValidationError' in err.__class__.__name__):
raise RuntimeError(
f"Payload at index {index} with encoding {encoding.decode()} could not be converted"
) from err
raise
return values


Expand Down
40 changes: 40 additions & 0 deletions tests/worker/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from time import sleep
from typing import Any, Callable, List, NoReturn, Optional, Sequence, Type

import pydantic
import temporalio.api.workflowservice.v1
from temporalio import activity, workflow
from temporalio.client import (
Expand Down Expand Up @@ -1593,3 +1594,42 @@ async def wait_cancel() -> str:
e.value.cause.cause.message
== "Unhandled activity cancel error produced by activity reset"
)


class Foo(pydantic.BaseModel):
bar: str


@activity.defn
async def pydantic_validation_activity(params: Foo) -> str:
return f"Hello {params.bar}"


@workflow.defn
class PydanticActivityValidationWorkflow:
@workflow.run
async def run(self, params: dict) -> str:
return await workflow.execute_activity(
pydantic_validation_activity,
args=[params],
schedule_to_close_timeout=timedelta(seconds=5)
)


async def test_activity_pydantic_validation_error_handling(client: Client):
from tests.helpers import new_worker

async with new_worker(
client,
PydanticActivityValidationWorkflow,
activities=[pydantic_validation_activity]
) as worker:
with pytest.raises(WorkflowFailureError) as err:
await client.execute_workflow(
PydanticActivityValidationWorkflow.run,
{"bar": 123},
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
assert isinstance(err.value.cause, ApplicationError)
assert "Failed decoding arguments" in err.value.cause.message
20 changes: 20 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5272,6 +5272,26 @@ async def run(self, param: str) -> None:
pass


@workflow.defn
class PydanticValidationErrorWorkflow:
@workflow.run
async def run(self, params: Foo) -> None:
pass


async def test_workflow_pydantic_validation_error_general_handling(client: Client):
async with new_worker(client, PydanticValidationErrorWorkflow) as worker:
with pytest.raises(WorkflowFailureError) as err:
await client.execute_workflow(
"PydanticValidationErrorWorkflow",
{"bar": 123},
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
assert isinstance(err.value.cause, ApplicationError)
assert "Failed decoding arguments" in err.value.cause.message


async def test_workflow_fail_on_bad_input(client: Client):
async with new_worker(client, FailOnBadInputWorkflow) as worker:
with pytest.raises(WorkflowFailureError) as err:
Expand Down
Loading