diff --git a/packages/celery-library/src/celery_library/task_manager.py b/packages/celery-library/src/celery_library/task_manager.py index ad6e15844ce..afe0a3b267e 100644 --- a/packages/celery-library/src/celery_library/task_manager.py +++ b/packages/celery-library/src/celery_library/task_manager.py @@ -7,6 +7,7 @@ from celery.exceptions import CeleryError # type: ignore[import-untyped] from common_library.async_tools import make_async from models_library.progress_bar import ProgressReport +from pydantic import BaseModel from servicelib.celery.models import ( TASK_DONE_STATES, ExecutionMetadata, @@ -31,6 +32,21 @@ _MAX_PROGRESS_VALUE = 1.0 +def _serialize_task_params(task_params: dict[str, Any]) -> dict[str, Any]: + """Recursively convert Pydantic models to dicts for safe serialization.""" + + def _serialize(value: Any) -> Any: + if isinstance(value, BaseModel): + return value.model_dump(mode="json") + if isinstance(value, dict): + return {k: _serialize(v) for k, v in value.items()} + if isinstance(value, (list, tuple, set)): + return type(value)(_serialize(v) for v in value) + return value + + return {k: _serialize(v) for k, v in task_params.items()} + + @dataclass(frozen=True) class CeleryTaskManager: _celery_app: Celery @@ -66,7 +82,7 @@ async def submit_task( self._celery_app.send_task( execution_metadata.name, task_id=task_key, - kwargs={"task_key": task_key} | task_params, + kwargs={"task_key": task_key} | _serialize_task_params(task_params), queue=execution_metadata.queue.value, ) except CeleryError as exc: diff --git a/services/storage/src/simcore_service_storage/modules/celery/__init__.py b/services/storage/src/simcore_service_storage/modules/celery/__init__.py index 3f292337c73..202c5a7bca5 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/__init__.py +++ b/services/storage/src/simcore_service_storage/modules/celery/__init__.py @@ -38,7 +38,7 @@ async def on_startup() -> None: ) register_celery_types() - register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody) + # NOTE: remove register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody) async def on_shutdown() -> None: with log_context(_logger, logging.INFO, "Shutting down Celery"):