diff --git a/temporalio/converter.py b/temporalio/converter.py index a9f8c0c98..2c4fe6187 100644 --- a/temporalio/converter.py +++ b/temporalio/converter.py @@ -313,6 +313,15 @@ def from_payloads( raise RuntimeError( f"Payload at index {index} with encoding {encoding.decode()} could not be converted" ) from err + except Exception as err: + if (hasattr(err, '__module__') and + err.__module__ and + 'pydantic' in err.__module__ and + 'ValidationError' in err.__class__.__name__): + raise RuntimeError( + f"Payload at index {index} with encoding {encoding.decode()} could not be converted" + ) from err + raise return values diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index eba7df21f..f84860177 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -17,6 +17,7 @@ from time import sleep from typing import Any, Callable, List, NoReturn, Optional, Sequence, Type +import pydantic import temporalio.api.workflowservice.v1 from temporalio import activity, workflow from temporalio.client import ( @@ -1593,3 +1594,42 @@ async def wait_cancel() -> str: e.value.cause.cause.message == "Unhandled activity cancel error produced by activity reset" ) + + +class Foo(pydantic.BaseModel): + bar: str + + +@activity.defn +async def pydantic_validation_activity(params: Foo) -> str: + return f"Hello {params.bar}" + + +@workflow.defn +class PydanticActivityValidationWorkflow: + @workflow.run + async def run(self, params: dict) -> str: + return await workflow.execute_activity( + pydantic_validation_activity, + args=[params], + schedule_to_close_timeout=timedelta(seconds=5) + ) + + +async def test_activity_pydantic_validation_error_handling(client: Client): + from tests.helpers import new_worker + + async with new_worker( + client, + PydanticActivityValidationWorkflow, + activities=[pydantic_validation_activity] + ) as worker: + with pytest.raises(WorkflowFailureError) as err: + await client.execute_workflow( + PydanticActivityValidationWorkflow.run, + {"bar": 123}, + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + assert isinstance(err.value.cause, ApplicationError) + assert "Failed decoding arguments" in err.value.cause.message diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 4ca9890c2..226b3ea8d 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5272,6 +5272,26 @@ async def run(self, param: str) -> None: pass +@workflow.defn +class PydanticValidationErrorWorkflow: + @workflow.run + async def run(self, params: Foo) -> None: + pass + + +async def test_workflow_pydantic_validation_error_general_handling(client: Client): + async with new_worker(client, PydanticValidationErrorWorkflow) as worker: + with pytest.raises(WorkflowFailureError) as err: + await client.execute_workflow( + "PydanticValidationErrorWorkflow", + {"bar": 123}, + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + assert isinstance(err.value.cause, ApplicationError) + assert "Failed decoding arguments" in err.value.cause.message + + async def test_workflow_fail_on_bad_input(client: Client): async with new_worker(client, FailOnBadInputWorkflow) as worker: with pytest.raises(WorkflowFailureError) as err: