diff --git a/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py b/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py index 7399b6ff303d..67c0dd5353c6 100644 --- a/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py +++ b/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py @@ -6,7 +6,7 @@ class BaseAsyncjobRpcError(OsparcErrorMixin, RuntimeError): class JobSchedulerError(BaseAsyncjobRpcError): - msg_template: str = "Celery exception: {exc}" + msg_template: str = "Async job scheduler exception: {exc}" class JobMissingError(BaseAsyncjobRpcError): diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/async_jobs_server.py b/packages/pytest-simcore/src/pytest_simcore/helpers/async_jobs_server.py new file mode 100644 index 000000000000..369396153efe --- /dev/null +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/async_jobs_server.py @@ -0,0 +1,88 @@ +# pylint: disable=unused-argument + +from dataclasses import dataclass + +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobGet, + AsyncJobId, + AsyncJobNameData, + AsyncJobResult, + AsyncJobStatus, +) +from models_library.api_schemas_rpc_async_jobs.exceptions import BaseAsyncjobRpcError +from models_library.progress_bar import ProgressReport +from models_library.rabbitmq_basic_types import RPCNamespace +from pydantic import validate_call +from pytest_mock import MockType +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient + + +@dataclass +class AsyncJobSideEffects: + exception: BaseAsyncjobRpcError | None = None + + @validate_call(config={"arbitrary_types_allowed": True}) + async def cancel( + self, + rabbitmq_rpc_client: RabbitMQRPCClient | MockType, + *, + rpc_namespace: RPCNamespace, + job_id: AsyncJobId, + job_id_data: AsyncJobNameData, + ) -> None: + if self.exception is not None: + raise self.exception + return None + + @validate_call(config={"arbitrary_types_allowed": True}) + async def status( + self, + rabbitmq_rpc_client: RabbitMQRPCClient | MockType, + *, + rpc_namespace: RPCNamespace, + job_id: AsyncJobId, + job_id_data: AsyncJobNameData, + ) -> AsyncJobStatus: + if self.exception is not None: + raise self.exception + + return AsyncJobStatus( + job_id=job_id, + progress=ProgressReport( + actual_value=50.0, + total=100.0, + attempt=1, + ), + done=False, + ) + + @validate_call(config={"arbitrary_types_allowed": True}) + async def result( + self, + rabbitmq_rpc_client: RabbitMQRPCClient | MockType, + *, + rpc_namespace: RPCNamespace, + job_id: AsyncJobId, + job_id_data: AsyncJobNameData, + ) -> AsyncJobResult: + if self.exception is not None: + raise self.exception + return AsyncJobResult(result="Success") + + @validate_call(config={"arbitrary_types_allowed": True}) + async def list_jobs( + self, + rabbitmq_rpc_client: RabbitMQRPCClient | MockType, + *, + rpc_namespace: RPCNamespace, + job_id_data: AsyncJobNameData, + filter_: str = "", + ) -> list[AsyncJobGet]: + if self.exception is not None: + raise self.exception + return [ + AsyncJobGet( + job_id=AsyncJobId("123e4567-e89b-12d3-a456-426614174000"), + job_name="Example Job", + ) + ] diff --git a/services/api-server/VERSION b/services/api-server/VERSION index ac39a106c485..f374f6662e9a 100644 --- a/services/api-server/VERSION +++ b/services/api-server/VERSION @@ -1 +1 @@ -0.9.0 +0.9.1 diff --git a/services/api-server/openapi.json b/services/api-server/openapi.json index f35f4723d923..6d57f716398a 100644 --- a/services/api-server/openapi.json +++ b/services/api-server/openapi.json @@ -3,7 +3,7 @@ "info": { "title": "osparc.io public API", "description": "osparc-simcore public API specifications", - "version": "0.9.0" + "version": "0.9.1" }, "paths": { "/v0/meta": { @@ -7977,6 +7977,18 @@ "items": {}, "type": "array", "title": "Errors" + }, + "support_id": { + "anyOf": [ + { + "type": "string", + "pattern": "OEC:([a-fA-F0-9]{12})-(\\d{13,14})" + }, + { + "type": "null" + } + ], + "title": "Support Id" } }, "type": "object", diff --git a/services/api-server/setup.cfg b/services/api-server/setup.cfg index 21f72b9aecc1..6f7fe90adca5 100644 --- a/services/api-server/setup.cfg +++ b/services/api-server/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.9.0 +current_version = 0.9.1 commit = True message = services/api-server version: {current_version} → {new_version} tag = False diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/tasks.py b/services/api-server/src/simcore_service_api_server/api/dependencies/tasks.py new file mode 100644 index 000000000000..645aba6a8ff4 --- /dev/null +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/tasks.py @@ -0,0 +1,13 @@ +from typing import Annotated + +from fastapi import Depends +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient + +from ...services_rpc.async_jobs import AsyncJobClient +from .rabbitmq import get_rabbitmq_rpc_client + + +def get_async_jobs_client( + rabbitmq_rpc_client: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_client)], +) -> AsyncJobClient: + return AsyncJobClient(_rabbitmq_rpc_client=rabbitmq_rpc_client) diff --git a/services/api-server/src/simcore_service_api_server/api/root.py b/services/api-server/src/simcore_service_api_server/api/root.py index 8c6dbc6de1a4..b5a295ee7283 100644 --- a/services/api-server/src/simcore_service_api_server/api/root.py +++ b/services/api-server/src/simcore_service_api_server/api/root.py @@ -18,6 +18,7 @@ solvers_jobs_read, studies, studies_jobs, + tasks, users, wallets, ) @@ -65,6 +66,7 @@ def create_router(settings: ApplicationSettings): router.include_router( functions_routes.function_router, tags=["functions"], prefix=_FUNCTIONS_PREFIX ) + router.include_router(tasks.router, tags=["tasks"], prefix="/tasks") # NOTE: multiple-files upload is currently disabled # Web form to upload files at http://localhost:8000/v0/upload-form-view diff --git a/services/api-server/src/simcore_service_api_server/api/routes/tasks.py b/services/api-server/src/simcore_service_api_server/api/routes/tasks.py new file mode 100644 index 000000000000..2c3f6b2f66cd --- /dev/null +++ b/services/api-server/src/simcore_service_api_server/api/routes/tasks.py @@ -0,0 +1,173 @@ +import logging +from typing import Annotated, Any + +from fastapi import APIRouter, Depends, FastAPI, status +from models_library.api_schemas_long_running_tasks.base import TaskProgress +from models_library.api_schemas_long_running_tasks.tasks import ( + TaskGet, + TaskResult, + TaskStatus, +) +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobId, + AsyncJobNameData, +) +from models_library.products import ProductName +from models_library.users import UserID +from servicelib.fastapi.dependencies import get_app + +from ...models.schemas.base import ApiServerEnvelope +from ...models.schemas.errors import ErrorGet +from ...services_rpc.async_jobs import AsyncJobClient +from ..dependencies.authentication import get_current_user_id, get_product_name +from ..dependencies.tasks import get_async_jobs_client +from ._constants import ( + FMSG_CHANGELOG_NEW_IN_VERSION, + create_route_description, +) + +router = APIRouter() +_logger = logging.getLogger(__name__) + + +def _get_job_id_data(user_id: UserID, product_name: ProductName) -> AsyncJobNameData: + return AsyncJobNameData(user_id=user_id, product_name=product_name) + + +_DEFAULT_TASK_STATUS_CODES: dict[int | str, dict[str, Any]] = { + status.HTTP_500_INTERNAL_SERVER_ERROR: { + "description": "Internal server error", + "model": ErrorGet, + }, +} + + +@router.get( + "", + response_model=ApiServerEnvelope[list[TaskGet]], + responses=_DEFAULT_TASK_STATUS_CODES, + description=create_route_description( + base="List all tasks", + changelog=[ + FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"), + ], + ), + include_in_schema=False, # TO BE RELEASED in 0.10-rc1 +) +async def list_tasks( + app: Annotated[FastAPI, Depends(get_app)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], + async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)], +): + user_async_jobs = await async_jobs.list_jobs( + job_id_data=_get_job_id_data(user_id, product_name), + filter_="", + ) + app_router = app.router + data = [ + TaskGet( + task_id=f"{job.job_id}", + task_name=job.job_name, + status_href=app_router.url_path_for( + "get_task_status", task_id=f"{job.job_id}" + ), + abort_href=app_router.url_path_for("cancel_task", task_id=f"{job.job_id}"), + result_href=app_router.url_path_for( + "get_task_result", task_id=f"{job.job_id}" + ), + ) + for job in user_async_jobs + ] + return ApiServerEnvelope(data=data) + + +@router.get( + "/{task_id}", + response_model=TaskStatus, + responses=_DEFAULT_TASK_STATUS_CODES, + description=create_route_description( + base="Get task status", + changelog=[ + FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"), + ], + ), + include_in_schema=False, # TO BE RELEASED in 0.10-rc1 +) +async def get_task_status( + task_id: AsyncJobId, + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], + async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)], +): + async_job_rpc_status = await async_jobs.status( + job_id=task_id, + job_id_data=_get_job_id_data(user_id, product_name), + ) + _task_id = f"{async_job_rpc_status.job_id}" + return TaskStatus( + task_progress=TaskProgress( + task_id=_task_id, percent=async_job_rpc_status.progress.percent_value + ), + done=async_job_rpc_status.done, + started=None, + ) + + +@router.post( + "/{task_id}:cancel", + status_code=status.HTTP_204_NO_CONTENT, + responses=_DEFAULT_TASK_STATUS_CODES, + description=create_route_description( + base="Cancel task", + changelog=[ + FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"), + ], + ), + include_in_schema=False, # TO BE RELEASED in 0.10-rc1 +) +async def cancel_task( + task_id: AsyncJobId, + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], + async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)], +): + await async_jobs.cancel( + job_id=task_id, + job_id_data=_get_job_id_data(user_id, product_name), + ) + + +@router.get( + "/{task_id}/result", + response_model=TaskResult, + responses={ + status.HTTP_404_NOT_FOUND: { + "description": "Task result not found", + "model": ErrorGet, + }, + status.HTTP_409_CONFLICT: { + "description": "Task is cancelled", + "model": ErrorGet, + }, + **_DEFAULT_TASK_STATUS_CODES, + }, + description=create_route_description( + base="Get task result", + changelog=[ + FMSG_CHANGELOG_NEW_IN_VERSION.format("0.10-rc1"), + ], + ), + include_in_schema=False, # TO BE RELEASED in 0.10-rc1 +) +async def get_task_result( + task_id: AsyncJobId, + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], + async_jobs: Annotated[AsyncJobClient, Depends(get_async_jobs_client)], +): + async_job_rpc_result = await async_jobs.result( + job_id=task_id, + job_id_data=_get_job_id_data(user_id, product_name), + ) + return TaskResult(result=async_job_rpc_result.result, error=None) diff --git a/services/api-server/src/simcore_service_api_server/exceptions/handlers/_handlers_backend_errors.py b/services/api-server/src/simcore_service_api_server/exceptions/handlers/_handlers_backend_errors.py index ca335deceacc..9d852b472ad0 100644 --- a/services/api-server/src/simcore_service_api_server/exceptions/handlers/_handlers_backend_errors.py +++ b/services/api-server/src/simcore_service_api_server/exceptions/handlers/_handlers_backend_errors.py @@ -1,12 +1,32 @@ +import logging + +from common_library.error_codes import create_error_code +from servicelib.logging_errors import create_troubleshootting_log_kwargs +from servicelib.status_codes_utils import is_5xx_server_error from starlette.requests import Request from starlette.responses import JSONResponse from ...exceptions.backend_errors import BaseBackEndError from ._utils import create_error_json_response +_logger = logging.getLogger(__name__) + async def backend_error_handler(request: Request, exc: Exception) -> JSONResponse: assert request # nosec assert isinstance(exc, BaseBackEndError) - - return create_error_json_response(f"{exc}", status_code=exc.status_code) + user_error_msg = f"{exc}" + support_id = None + if is_5xx_server_error(exc.status_code): + support_id = create_error_code(exc) + _logger.exception( + **create_troubleshootting_log_kwargs( + user_error_msg, + error=exc, + error_code=support_id, + tip="Unexpected error", + ) + ) + return create_error_json_response( + user_error_msg, status_code=exc.status_code, support_id=support_id + ) diff --git a/services/api-server/src/simcore_service_api_server/exceptions/handlers/_utils.py b/services/api-server/src/simcore_service_api_server/exceptions/handlers/_utils.py index da741fdb8b45..2c0024476cc9 100644 --- a/services/api-server/src/simcore_service_api_server/exceptions/handlers/_utils.py +++ b/services/api-server/src/simcore_service_api_server/exceptions/handlers/_utils.py @@ -1,6 +1,7 @@ from collections.abc import Awaitable, Callable from typing import Any, TypeAlias +from common_library.error_codes import ErrorCodeStr from fastapi.encoders import jsonable_encoder from fastapi.requests import Request from fastapi.responses import JSONResponse @@ -13,13 +14,13 @@ def create_error_json_response( - *errors: Any, status_code: int, **kwargs + *errors: Any, status_code: int, support_id: ErrorCodeStr | None = None, **kwargs ) -> JSONResponse: """ Converts errors to Error response model defined in the OAS """ - error_model = ErrorGet(errors=list(errors)) + error_model = ErrorGet(errors=list(errors), support_id=support_id, **kwargs) return JSONResponse( content=jsonable_encoder(error_model), status_code=status_code, diff --git a/services/api-server/src/simcore_service_api_server/exceptions/task_errors.py b/services/api-server/src/simcore_service_api_server/exceptions/task_errors.py new file mode 100644 index 000000000000..a12282404823 --- /dev/null +++ b/services/api-server/src/simcore_service_api_server/exceptions/task_errors.py @@ -0,0 +1,28 @@ +from fastapi import status + +from .backend_errors import BaseBackEndError + + +class TaskSchedulerError(BaseBackEndError): + msg_template: str = "TaskScheduler error" + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR + + +class TaskMissingError(BaseBackEndError): + msg_template: str = "Task {job_id} does not exist" + status_code = status.HTTP_404_NOT_FOUND + + +class TaskResultMissingError(BaseBackEndError): + msg_template: str = "Task {job_id} is not done" + status_code = status.HTTP_404_NOT_FOUND + + +class TaskCancelledError(BaseBackEndError): + msg_template: str = "Task {job_id} is cancelled" + status_code = status.HTTP_409_CONFLICT + + +class TaskError(BaseBackEndError): + msg_template: str = "Task '{job_id}' failed" + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR diff --git a/services/api-server/src/simcore_service_api_server/models/schemas/_base.py b/services/api-server/src/simcore_service_api_server/models/schemas/base.py similarity index 93% rename from services/api-server/src/simcore_service_api_server/models/schemas/_base.py rename to services/api-server/src/simcore_service_api_server/models/schemas/base.py index 07144ba5b766..6abe72ca9f14 100644 --- a/services/api-server/src/simcore_service_api_server/models/schemas/_base.py +++ b/services/api-server/src/simcore_service_api_server/models/schemas/base.py @@ -1,12 +1,12 @@ import urllib.parse -from typing import Annotated +from typing import Annotated, Generic, TypeVar import packaging.version from models_library.utils.change_case import camel_to_snake from models_library.utils.common_validators import trim_string_before from pydantic import BaseModel, ConfigDict, Field, HttpUrl, StringConstraints -from ...models._utils_pydantic import UriSchema +from .._utils_pydantic import UriSchema from ..basic_types import VersionStr @@ -83,3 +83,10 @@ def name(self) -> str: @classmethod def compose_resource_name(cls, key: str, version: str) -> str: raise NotImplementedError("Subclasses must implement this method") + + +DataT = TypeVar("DataT") + + +class ApiServerEnvelope(BaseModel, Generic[DataT]): + data: DataT diff --git a/services/api-server/src/simcore_service_api_server/models/schemas/errors.py b/services/api-server/src/simcore_service_api_server/models/schemas/errors.py index 3243f5e44b98..69d6f7d67003 100644 --- a/services/api-server/src/simcore_service_api_server/models/schemas/errors.py +++ b/services/api-server/src/simcore_service_api_server/models/schemas/errors.py @@ -1,5 +1,6 @@ from typing import Any +from common_library.error_codes import ErrorCodeStr from pydantic import BaseModel, ConfigDict @@ -10,6 +11,7 @@ class ErrorGet(BaseModel): # - https://github.com/ITISFoundation/osparc-simcore/issues/2520 # - https://github.com/ITISFoundation/osparc-simcore/issues/2446 errors: list[Any] + support_id: ErrorCodeStr | None = None model_config = ConfigDict( json_schema_extra={ diff --git a/services/api-server/src/simcore_service_api_server/models/schemas/files.py b/services/api-server/src/simcore_service_api_server/models/schemas/files.py index ebfee726adbc..5c6ea3b3bd89 100644 --- a/services/api-server/src/simcore_service_api_server/models/schemas/files.py +++ b/services/api-server/src/simcore_service_api_server/models/schemas/files.py @@ -18,7 +18,7 @@ from .._utils_pydantic import UriSchema from ..domain.files import File as DomainFile from ..domain.files import FileName -from ._base import ApiServerInputSchema, ApiServerOutputSchema +from .base import ApiServerInputSchema, ApiServerOutputSchema class UserFile(ApiServerInputSchema): diff --git a/services/api-server/src/simcore_service_api_server/models/schemas/jobs.py b/services/api-server/src/simcore_service_api_server/models/schemas/jobs.py index 86abb0a87414..08bfe901bbc4 100644 --- a/services/api-server/src/simcore_service_api_server/models/schemas/jobs.py +++ b/services/api-server/src/simcore_service_api_server/models/schemas/jobs.py @@ -39,7 +39,7 @@ from ..domain.files import File as DomainFile from ..domain.files import FileInProgramJobData from ..schemas.files import UserFile -from ._base import ApiServerInputSchema +from .base import ApiServerInputSchema # JOB SUB-RESOURCES ---------- # diff --git a/services/api-server/src/simcore_service_api_server/models/schemas/programs.py b/services/api-server/src/simcore_service_api_server/models/schemas/programs.py index 305f9eb28cf7..ca3e0c564ddd 100644 --- a/services/api-server/src/simcore_service_api_server/models/schemas/programs.py +++ b/services/api-server/src/simcore_service_api_server/models/schemas/programs.py @@ -9,7 +9,7 @@ from ..api_resources import compose_resource_name from ..basic_types import VersionStr -from ._base import ( +from .base import ( ApiServerOutputSchema, BaseService, ) diff --git a/services/api-server/src/simcore_service_api_server/models/schemas/solvers.py b/services/api-server/src/simcore_service_api_server/models/schemas/solvers.py index 63a383c4f2b1..d9b70e68f84c 100644 --- a/services/api-server/src/simcore_service_api_server/models/schemas/solvers.py +++ b/services/api-server/src/simcore_service_api_server/models/schemas/solvers.py @@ -13,8 +13,8 @@ from models_library.services_types import ServiceKey from pydantic import BaseModel, ConfigDict, Field, StringConstraints -from ...models.schemas._base import BaseService from ..api_resources import compose_resource_name +from .base import BaseService # NOTE: # - API does NOT impose prefix (simcore)/(services)/comp because does not know anything about registry deployed. This constraint diff --git a/services/api-server/src/simcore_service_api_server/services_rpc/async_jobs.py b/services/api-server/src/simcore_service_api_server/services_rpc/async_jobs.py new file mode 100644 index 000000000000..9e263d755057 --- /dev/null +++ b/services/api-server/src/simcore_service_api_server/services_rpc/async_jobs.py @@ -0,0 +1,99 @@ +import functools +from dataclasses import dataclass + +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobGet, + AsyncJobId, + AsyncJobNameData, + AsyncJobResult, + AsyncJobStatus, +) +from models_library.api_schemas_rpc_async_jobs.exceptions import ( + JobAbortedError, + JobError, + JobNotDoneError, + JobSchedulerError, +) +from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs + +from ..exceptions.service_errors_utils import service_exception_mapper +from ..exceptions.task_errors import ( + TaskCancelledError, + TaskError, + TaskResultMissingError, + TaskSchedulerError, +) + +_exception_mapper = functools.partial( + service_exception_mapper, service_name="Async jobs" +) + + +@dataclass +class AsyncJobClient: + _rabbitmq_rpc_client: RabbitMQRPCClient + + @_exception_mapper( + rpc_exception_map={ + JobSchedulerError: TaskSchedulerError, + } + ) + async def cancel( + self, *, job_id: AsyncJobId, job_id_data: AsyncJobNameData + ) -> None: + return await async_jobs.cancel( + self._rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=job_id, + job_id_data=job_id_data, + ) + + @_exception_mapper( + rpc_exception_map={ + JobSchedulerError: TaskSchedulerError, + } + ) + async def status( + self, *, job_id: AsyncJobId, job_id_data: AsyncJobNameData + ) -> AsyncJobStatus: + return await async_jobs.status( + self._rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=job_id, + job_id_data=job_id_data, + ) + + @_exception_mapper( + rpc_exception_map={ + JobSchedulerError: TaskSchedulerError, + JobNotDoneError: TaskResultMissingError, + JobAbortedError: TaskCancelledError, + JobError: TaskError, + } + ) + async def result( + self, *, job_id: AsyncJobId, job_id_data: AsyncJobNameData + ) -> AsyncJobResult: + return await async_jobs.result( + self._rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=job_id, + job_id_data=job_id_data, + ) + + @_exception_mapper( + rpc_exception_map={ + JobSchedulerError: TaskSchedulerError, + } + ) + async def list_jobs( + self, *, filter_: str, job_id_data: AsyncJobNameData + ) -> list[AsyncJobGet]: + return await async_jobs.list_jobs( + self._rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + filter_=filter_, + job_id_data=job_id_data, + ) diff --git a/services/api-server/tests/unit/test_tasks.py b/services/api-server/tests/unit/test_tasks.py new file mode 100644 index 000000000000..40f64eb31c44 --- /dev/null +++ b/services/api-server/tests/unit/test_tasks.py @@ -0,0 +1,189 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument + +from typing import Any + +import pytest +from faker import Faker +from fastapi import status +from httpx import AsyncClient, BasicAuth +from models_library.api_schemas_long_running_tasks.tasks import TaskGet, TaskStatus +from models_library.api_schemas_rpc_async_jobs.exceptions import ( + BaseAsyncjobRpcError, + JobAbortedError, + JobError, + JobNotDoneError, + JobSchedulerError, +) +from pytest_mock import MockerFixture, MockType +from pytest_simcore.helpers.async_jobs_server import AsyncJobSideEffects +from simcore_service_api_server.models.schemas.base import ApiServerEnvelope + +_faker = Faker() + + +@pytest.fixture +async def async_jobs_rpc_side_effects( + async_job_error: BaseAsyncjobRpcError | None, +) -> Any: + return AsyncJobSideEffects(exception=async_job_error) + + +@pytest.fixture +def mocked_async_jobs_rpc_api( + mocker: MockerFixture, + async_jobs_rpc_side_effects: Any, + mocked_app_dependencies: None, +) -> dict[str, MockType]: + """ + Mocks the catalog's simcore service RPC API for testing purposes. + """ + from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs + + mocks = {} + + # Get all callable methods from the side effects class that are not built-ins + side_effect_methods = [ + method_name + for method_name in dir(async_jobs_rpc_side_effects) + if not method_name.startswith("_") + and callable(getattr(async_jobs_rpc_side_effects, method_name)) + ] + + # Create mocks for each method in catalog_rpc that has a corresponding side effect + for method_name in side_effect_methods: + assert hasattr(async_jobs, method_name) + mocks[method_name] = mocker.patch.object( + async_jobs, + method_name, + autospec=True, + side_effect=getattr(async_jobs_rpc_side_effects, method_name), + ) + + return mocks + + +@pytest.mark.parametrize( + "async_job_error, expected_status_code", + [ + (None, status.HTTP_200_OK), + ( + JobSchedulerError( + exc=Exception("A very rare exception raised by the scheduler") + ), + status.HTTP_500_INTERNAL_SERVER_ERROR, + ), + ], +) +async def test_get_async_jobs( + client: AsyncClient, + mocked_async_jobs_rpc_api: dict[str, MockType], + auth: BasicAuth, + expected_status_code: int, +): + + response = await client.get("/v0/tasks", auth=auth) + assert mocked_async_jobs_rpc_api["list_jobs"].called + assert response.status_code == expected_status_code + + if response.status_code == status.HTTP_200_OK: + result = ApiServerEnvelope[list[TaskGet]].model_validate_json(response.text) + assert len(result.data) > 0 + assert all(isinstance(task, TaskGet) for task in result.data) + task = result.data[0] + assert task.abort_href == f"/v0/tasks/{task.task_id}:cancel" + assert task.result_href == f"/v0/tasks/{task.task_id}/result" + assert task.status_href == f"/v0/tasks/{task.task_id}" + + +@pytest.mark.parametrize( + "async_job_error, expected_status_code", + [ + (None, status.HTTP_200_OK), + ( + JobSchedulerError( + exc=Exception("A very rare exception raised by the scheduler") + ), + status.HTTP_500_INTERNAL_SERVER_ERROR, + ), + ], +) +async def test_get_async_jobs_status( + client: AsyncClient, + mocked_async_jobs_rpc_api: dict[str, MockType], + auth: BasicAuth, + expected_status_code: int, +): + task_id = f"{_faker.uuid4()}" + response = await client.get(f"/v0/tasks/{task_id}", auth=auth) + assert mocked_async_jobs_rpc_api["status"].called + assert f"{mocked_async_jobs_rpc_api['status'].call_args[1]['job_id']}" == task_id + assert response.status_code == expected_status_code + if response.status_code == status.HTTP_200_OK: + TaskStatus.model_validate_json(response.text) + + +@pytest.mark.parametrize( + "async_job_error, expected_status_code", + [ + (None, status.HTTP_204_NO_CONTENT), + ( + JobSchedulerError( + exc=Exception("A very rare exception raised by the scheduler") + ), + status.HTTP_500_INTERNAL_SERVER_ERROR, + ), + ], +) +async def test_cancel_async_job( + client: AsyncClient, + mocked_async_jobs_rpc_api: dict[str, MockType], + auth: BasicAuth, + expected_status_code: int, +): + task_id = f"{_faker.uuid4()}" + response = await client.post(f"/v0/tasks/{task_id}:cancel", auth=auth) + assert mocked_async_jobs_rpc_api["cancel"].called + assert f"{mocked_async_jobs_rpc_api['cancel'].call_args[1]['job_id']}" == task_id + assert response.status_code == expected_status_code + + +@pytest.mark.parametrize( + "async_job_error, expected_status_code", + [ + (None, status.HTTP_200_OK), + ( + JobError( + job_id=_faker.uuid4(), + exc_type=Exception, + exc_message="An exception from inside the async job", + ), + status.HTTP_500_INTERNAL_SERVER_ERROR, + ), + ( + JobNotDoneError(job_id=_faker.uuid4()), + status.HTTP_404_NOT_FOUND, + ), + ( + JobAbortedError(job_id=_faker.uuid4()), + status.HTTP_409_CONFLICT, + ), + ( + JobSchedulerError( + exc=Exception("A very rare exception raised by the scheduler") + ), + status.HTTP_500_INTERNAL_SERVER_ERROR, + ), + ], +) +async def test_get_async_job_result( + client: AsyncClient, + mocked_async_jobs_rpc_api: dict[str, MockType], + auth: BasicAuth, + expected_status_code: int, +): + task_id = f"{_faker.uuid4()}" + response = await client.get(f"/v0/tasks/{task_id}/result", auth=auth) + assert response.status_code == expected_status_code + assert mocked_async_jobs_rpc_api["result"].called + assert f"{mocked_async_jobs_rpc_api['result'].call_args[1]['job_id']}" == task_id