diff --git a/services/api-server/src/simcore_service_api_server/_service_function_jobs_task_client.py b/services/api-server/src/simcore_service_api_server/_service_function_jobs_task_client.py index f4a2709a80d..031e1933777 100644 --- a/services/api-server/src/simcore_service_api_server/_service_function_jobs_task_client.py +++ b/services/api-server/src/simcore_service_api_server/_service_function_jobs_task_client.py @@ -1,8 +1,9 @@ # pylint: disable=too-many-instance-attributes import contextlib +import logging from dataclasses import dataclass -from typing import Final +from celery_library.errors import TaskNotFoundError from common_library.exclude import as_dict_exclude_none from models_library.functions import ( FunctionClass, @@ -32,6 +33,10 @@ from models_library.users import UserID from servicelib.celery.models import TaskMetadata, TasksQueue, TaskUUID from servicelib.celery.task_manager import TaskManager +from servicelib.logging_errors import create_troubleshooting_log_kwargs +from simcore_service_api_server.models.schemas.functions import ( + FunctionJobCreationTaskStatus, +) from sqlalchemy.ext.asyncio import AsyncEngine from ._service_function_jobs import FunctionJobService @@ -43,15 +48,13 @@ ) from .models.api_resources import JobLinks from .models.domain.celery_models import ApiWorkerTaskFilter +from .models.schemas.functions import FunctionJobCreationTaskStatus from .models.schemas.jobs import JobInputs, JobPricingSpecification from .services_http.webserver import AuthSession from .services_rpc.storage import StorageService from .services_rpc.wb_api_server import WbApiRpcClient -_JOB_CREATION_TASK_STATUS_PREFIX: Final[str] = "JOB_CREATION_TASK_STATUS_" -_JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS: Final[str] = ( - f"{_JOB_CREATION_TASK_STATUS_PREFIX}NOT_YET_SCHEDULED" -) +_logger = logging.getLogger(__name__) def join_inputs( @@ -73,17 +76,34 @@ async def _celery_task_status( task_manager: TaskManager, user_id: UserID, product_name: ProductName, -) -> str: +) -> FunctionJobCreationTaskStatus: if job_creation_task_id is None: - return _JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS + return FunctionJobCreationTaskStatus.NOT_YET_SCHEDULED task_filter = ApiWorkerTaskFilter( user_id=user_id, product_name=product_name, ) - task_status = await task_manager.get_task_status( - task_uuid=TaskUUID(job_creation_task_id), task_filter=task_filter - ) - return f"{_JOB_CREATION_TASK_STATUS_PREFIX}{task_status.task_state}" + try: + task_status = await task_manager.get_task_status( + task_uuid=TaskUUID(job_creation_task_id), task_filter=task_filter + ) + return FunctionJobCreationTaskStatus[task_status.task_state] + except TaskNotFoundError as err: + user_msg = f"Job creation task not found for task_uuid={TaskUUID(job_creation_task_id)}" + _logger.exception( + **create_troubleshooting_log_kwargs( + user_msg, + error=err, + error_context={ + "task_uuid": TaskUUID(job_creation_task_id), + "task_filter": task_filter, + "user_id": user_id, + "product_name": product_name, + }, + tip="This probably means the celery task failed, because the task should have created the project_id.", + ) + ) + return FunctionJobCreationTaskStatus.ERROR @dataclass(frozen=True, kw_only=True) @@ -367,7 +387,7 @@ async def create_function_job_creation_task( task_uuid = await self._celery_task_manager.submit_task( TaskMetadata( name="run_function", - ephemeral=True, + ephemeral=False, queue=TasksQueue.API_WORKER_QUEUE, ), task_filter=task_filter, diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/models_schemas_function_filters.py b/services/api-server/src/simcore_service_api_server/api/dependencies/models_schemas_function_filters.py index 7a5b3ef75c0..1c2a9a791ce 100644 --- a/services/api-server/src/simcore_service_api_server/api/dependencies/models_schemas_function_filters.py +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/models_schemas_function_filters.py @@ -8,7 +8,7 @@ FunctionJobCollectionsListFilters, FunctionJobID, ) -from simcore_service_api_server.models.schemas.functions_filters import ( +from simcore_service_api_server.models.schemas.functions import ( FunctionJobsListFilters, ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py index 4e023dd583c..1977284b1c6 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py @@ -29,7 +29,7 @@ from ...models.domain.functions import PageRegisteredFunctionJobWithorWithoutStatus from ...models.pagination import PaginationParams from ...models.schemas.errors import ErrorGet -from ...models.schemas.functions_filters import FunctionJobsListFilters +from ...models.schemas.functions import FunctionJobsListFilters from ...services_rpc.wb_api_server import WbApiRpcClient from ..dependencies.authentication import get_current_user_id, get_product_name from ..dependencies.functions import ( diff --git a/services/api-server/src/simcore_service_api_server/models/schemas/functions.py b/services/api-server/src/simcore_service_api_server/models/schemas/functions.py new file mode 100644 index 00000000000..d31f2e5af47 --- /dev/null +++ b/services/api-server/src/simcore_service_api_server/models/schemas/functions.py @@ -0,0 +1,54 @@ +# pylint: disable=protected-access + +from enum import StrEnum +from typing import Annotated, Final + +from models_library.functions import FunctionID, FunctionJobCollectionID, FunctionJobID +from pydantic import BaseModel, ConfigDict, Field +from servicelib.celery.models import TaskState + +_JOB_TASK_RUN_STATUS_PREFIX: Final[str] = "JOB_TASK_RUN_STATUS_" + + +class FunctionJobsListFilters(BaseModel): + """Filters for listing function jobs""" + + function_id: Annotated[ + FunctionID | None, + Field( + description="Filter by function ID pattern", + ), + ] = None + + function_job_ids: Annotated[ + list[FunctionJobID] | None, + Field( + description="Filter by function job IDs", + ), + ] = None + + function_job_collection_id: Annotated[ + FunctionJobCollectionID | None, + Field( + description="Filter by function job collection ID", + ), + ] = None + + model_config = ConfigDict( + extra="ignore", + ) + + +class FunctionJobCreationTaskStatus(StrEnum): + PENDING = f"{_JOB_TASK_RUN_STATUS_PREFIX}PENDING" + STARTED = f"{_JOB_TASK_RUN_STATUS_PREFIX}STARTED" + RETRY = f"{_JOB_TASK_RUN_STATUS_PREFIX}RETRY" + SUCCESS = f"{_JOB_TASK_RUN_STATUS_PREFIX}SUCCESS" + FAILURE = f"{_JOB_TASK_RUN_STATUS_PREFIX}FAILURE" + NOT_YET_SCHEDULED = "JOB_TASK_NOT_YET_SCHEDULED" # api-server custom status + ERROR = "JOB_TASK_CREATION_FAILURE" # api-server custom status + + +assert {elm._name_ for elm in TaskState}.union({"NOT_YET_SCHEDULED", "ERROR"}) == { + elm._name_ for elm in FunctionJobCreationTaskStatus +} diff --git a/services/api-server/src/simcore_service_api_server/models/schemas/functions_filters.py b/services/api-server/src/simcore_service_api_server/models/schemas/functions_filters.py deleted file mode 100644 index d5cd6963464..00000000000 --- a/services/api-server/src/simcore_service_api_server/models/schemas/functions_filters.py +++ /dev/null @@ -1,33 +0,0 @@ -from typing import Annotated - -from models_library.functions import FunctionID, FunctionJobCollectionID, FunctionJobID -from pydantic import BaseModel, ConfigDict, Field - - -class FunctionJobsListFilters(BaseModel): - """Filters for listing function jobs""" - - function_id: Annotated[ - FunctionID | None, - Field( - description="Filter by function ID pattern", - ), - ] = None - - function_job_ids: Annotated[ - list[FunctionJobID] | None, - Field( - description="Filter by function job IDs", - ), - ] = None - - function_job_collection_id: Annotated[ - FunctionJobCollectionID | None, - Field( - description="Filter by function job collection ID", - ), - ] = None - - model_config = ConfigDict( - extra="ignore", - ) diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py index 540920ea755..be67fd38fce 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py @@ -36,11 +36,12 @@ from servicelib.celery.models import TaskFilter, TaskState, TaskStatus, TaskUUID from simcore_service_api_server._meta import API_VTAG from simcore_service_api_server._service_function_jobs_task_client import ( - _JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS, - _JOB_CREATION_TASK_STATUS_PREFIX, FunctionJobTaskClientService, ) from simcore_service_api_server.api.dependencies import services as service_dependencies +from simcore_service_api_server.models.schemas.functions import ( + FunctionJobCreationTaskStatus, +) from simcore_service_api_server.models.schemas.jobs import JobStatus _faker = Faker() @@ -368,11 +369,9 @@ async def _update_function_job_status_side_effect(*args, **kwargs): ): assert data["status"] == job_status elif project_job_id is None and job_creation_task_id is None: - assert data["status"] == _JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS + assert data["status"] == FunctionJobCreationTaskStatus.NOT_YET_SCHEDULED elif project_job_id is None and job_creation_task_id is not None: - assert ( - data["status"] == f"{_JOB_CREATION_TASK_STATUS_PREFIX}{celery_task_state}" - ) + assert data["status"] == FunctionJobCreationTaskStatus[celery_task_state.name] else: pytest.fail("Unexpected combination of parameters") diff --git a/services/api-server/tests/unit/service/test_service_function_jobs_task_client.py b/services/api-server/tests/unit/service/test_service_function_jobs_task_client.py new file mode 100644 index 00000000000..0a5adc695ea --- /dev/null +++ b/services/api-server/tests/unit/service/test_service_function_jobs_task_client.py @@ -0,0 +1,83 @@ +# pylint: disable=redefined-outer-name + +from collections.abc import Callable + +import pytest +from celery_library.errors import TaskNotFoundError +from faker import Faker +from models_library.products import ProductName +from models_library.progress_bar import ProgressReport +from models_library.users import UserID +from pytest_mock import MockerFixture, MockType +from servicelib.celery.models import TaskState, TaskStatus, TaskUUID +from servicelib.celery.task_manager import TaskManager +from simcore_service_api_server._service_function_jobs_task_client import ( + _celery_task_status, +) +from simcore_service_api_server.models.schemas.functions import ( + FunctionJobCreationTaskStatus, +) + +_faker = Faker() + + +@pytest.fixture +async def create_mock_task_manager( + mocker: MockerFixture, +) -> Callable[[TaskStatus | Exception], MockType]: + + def _(status_or_exception: TaskStatus | Exception) -> MockType: + mock_task_manager = mocker.Mock(spec=TaskManager) + if isinstance(status_or_exception, Exception): + + async def _raise(*args, **kwargs): + raise status_or_exception + + mock_task_manager.get_task_status.side_effect = _raise + else: + mock_task_manager.get_task_status.return_value = status_or_exception + return mock_task_manager + + return _ + + +@pytest.mark.parametrize( + "status_or_exception", + [ + TaskStatus( + task_uuid=TaskUUID(_faker.uuid4()), + task_state=state, + progress_report=ProgressReport(actual_value=3.14), + ) + for state in list(TaskState) + ] + + [TaskNotFoundError(task_id=_faker.uuid4())], +) +@pytest.mark.parametrize("job_creation_task_id", [_faker.uuid4(), None]) +async def test_celery_status_conversion( + status_or_exception: TaskStatus | Exception, + job_creation_task_id: str | None, + create_mock_task_manager: Callable[[TaskStatus | Exception], MockType], + user_id: UserID, + product_name: ProductName, +): + + mock_task_manager = create_mock_task_manager(status_or_exception) + + status = await _celery_task_status( + job_creation_task_id=job_creation_task_id, + task_manager=mock_task_manager, + user_id=user_id, + product_name=product_name, + ) + + if job_creation_task_id is None: + assert status == FunctionJobCreationTaskStatus.NOT_YET_SCHEDULED + elif isinstance(status_or_exception, TaskNotFoundError): + assert status == FunctionJobCreationTaskStatus.ERROR + elif isinstance(status_or_exception, TaskStatus): + assert ( + status == FunctionJobCreationTaskStatus[status_or_exception.task_state.name] + ) + else: + pytest.fail("Unexpected test input")