diff --git a/api/specs/web-server/_long_running_tasks.py b/api/specs/web-server/_long_running_tasks.py index 859bd470d294..0dacf42a03bc 100644 --- a/api/specs/web-server/_long_running_tasks.py +++ b/api/specs/web-server/_long_running_tasks.py @@ -4,13 +4,17 @@ # pylint: disable=too-many-arguments -from typing import Annotated +from typing import Annotated, Any from fastapi import APIRouter, Depends, status from models_library.generics import Envelope +from models_library.rest_error import EnvelopedError from servicelib.aiohttp.long_running_tasks._routes import _PathParam from servicelib.long_running_tasks._models import TaskGet, TaskStatus from simcore_service_webserver._meta import API_VTAG +from simcore_service_webserver.tasks._exception_handlers import ( + _TO_HTTP_ERROR_MAP as data_export_http_error_map, +) router = APIRouter( prefix=f"/{API_VTAG}", @@ -19,37 +23,52 @@ ], ) +_data_export_responses: dict[int | str, dict[str, Any]] = { + i.status_code: {"model": EnvelopedError} + for i in data_export_http_error_map.values() +} + @router.get( "/tasks", response_model=Envelope[list[TaskGet]], + name="list_tasks", + description="Lists all long running tasks", + responses=_data_export_responses, ) -def list_tasks(): - ... +def get_async_jobs(): ... @router.get( "/tasks/{task_id}", response_model=Envelope[TaskStatus], + name="get_task_status", + description="Retrieves the status of a task", + responses=_data_export_responses, ) -def get_task_status( +def get_async_job_status( _path_params: Annotated[_PathParam, Depends()], -): - ... +): ... @router.delete( "/tasks/{task_id}", + name="cancel_and_delete_task", + description="Cancels and deletes a task", + responses=_data_export_responses, status_code=status.HTTP_204_NO_CONTENT, ) -def cancel_and_delete_task( +def abort_async_job( _path_params: Annotated[_PathParam, Depends()], -): - ... +): ... -@router.get("/tasks/{task_id}/result") -def get_task_result( +@router.get( + "/tasks/{task_id}/result", + name="get_task_result", + description="Retrieves the result of a task", + responses=_data_export_responses, +) +def get_async_job_result( _path_params: Annotated[_PathParam, Depends()], -): - ... +): ... diff --git a/api/specs/web-server/_storage.py b/api/specs/web-server/_storage.py index 7eae53bf742e..56a175d75521 100644 --- a/api/specs/web-server/_storage.py +++ b/api/specs/web-server/_storage.py @@ -4,10 +4,12 @@ # pylint: disable=too-many-arguments -from typing import Annotated, TypeAlias -from uuid import UUID +from typing import Annotated, Any, TypeAlias from fastapi import APIRouter, Depends, Query, status +from models_library.api_schemas_long_running_tasks.tasks import ( + TaskGet, +) from models_library.api_schemas_storage.storage_schemas import ( FileLocation, FileMetaDataGet, @@ -22,19 +24,19 @@ from models_library.api_schemas_webserver.storage import ( DataExportPost, ListPathsQueryParams, - StorageAsyncJobGet, - StorageAsyncJobResult, - StorageAsyncJobStatus, StorageLocationPathParams, StoragePathComputeSizeParams, ) from models_library.generics import Envelope from models_library.projects_nodes_io import LocationID -from models_library.users import UserID +from models_library.rest_error import EnvelopedError from pydantic import AnyUrl, ByteSize from servicelib.fastapi.rest_pagination import CustomizedPathsCursorPage from simcore_service_webserver._meta import API_VTAG from simcore_service_webserver.storage.schemas import DatasetMetaData, FileMetaData +from simcore_service_webserver.tasks._exception_handlers import ( + _TO_HTTP_ERROR_MAP as data_export_http_error_map, +) router = APIRouter( prefix=f"/{API_VTAG}", @@ -71,7 +73,7 @@ async def list_storage_paths( @router.post( "/storage/locations/{location_id}/paths/{path}:size", - response_model=Envelope[StorageAsyncJobGet], + response_model=Envelope[TaskGet], status_code=status.HTTP_202_ACCEPTED, ) async def compute_path_size(_path: Annotated[StoragePathComputeSizeParams, Depends()]): @@ -205,46 +207,18 @@ async def is_completed_upload_file( # data export +_data_export_responses: dict[int | str, dict[str, Any]] = { + i.status_code: {"model": EnvelopedError} + for i in data_export_http_error_map.values() +} + + @router.post( "/storage/locations/{location_id}/export-data", - response_model=Envelope[StorageAsyncJobGet], + response_model=Envelope[TaskGet], name="export_data", description="Export data", + responses=_data_export_responses, ) async def export_data(data_export: DataExportPost, location_id: LocationID): """Trigger data export. Returns async job id for getting status and results""" - - -@router.get( - "/storage/async-jobs/{job_id}/status", - response_model=Envelope[StorageAsyncJobStatus], - name="get_async_job_status", -) -async def get_async_job_status(job_id: UUID): - """Get async job status""" - - -@router.post( - "/storage/async-jobs/{job_id}:abort", - name="abort_async_job", -) -async def abort_async_job(job_id: UUID): - """aborts execution of an async job""" - - -@router.get( - "/storage/async-jobs/{job_id}/result", - response_model=Envelope[StorageAsyncJobResult], - name="get_async_job_result", -) -async def get_async_job_result(job_id: UUID): - """Get the result of the async job""" - - -@router.get( - "/storage/async-jobs", - response_model=Envelope[list[StorageAsyncJobGet]], - name="get_async_jobs", -) -async def get_async_jobs(user_id: UserID): - """Retrunsa list of async jobs for the user""" diff --git a/api/specs/web-server/_tasks.py b/api/specs/web-server/_tasks.py deleted file mode 100644 index a2c6fbe14022..000000000000 --- a/api/specs/web-server/_tasks.py +++ /dev/null @@ -1,43 +0,0 @@ -from fastapi import APIRouter -from simcore_service_webserver._meta import API_VTAG - -router = APIRouter( - prefix=f"/{API_VTAG}", - tags=[ - "tasks", - ], -) - - -@router.get("/tasks", response_model=List[TasksGetResponse]) -def list_tasks() -> List[TasksGetResponse]: - pass - - -@router.get( - "/tasks/{task_id}", - response_model=TasksTaskIdGetResponse, - responses={"default": {"model": TasksTaskIdGetResponse1}}, -) -def get_task_status( - task_id: str, -) -> Union[TasksTaskIdGetResponse, TasksTaskIdGetResponse1]: - pass - - -@router.delete( - "/tasks/{task_id}", - response_model=None, - responses={"default": {"model": TasksTaskIdDeleteResponse}}, -) -def cancel_and_delete_task(task_id: str) -> Union[None, TasksTaskIdDeleteResponse]: - pass - - -@router.get( - "/tasks/{task_id}/result", - response_model=None, - responses={"default": {"model": TasksTaskIdResultGetResponse}}, -) -def get_task_result(task_id: str) -> Union[None, TasksTaskIdResultGetResponse]: - pass diff --git a/packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py b/packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py index b5a8d8443b93..acd73831b22f 100644 --- a/packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py +++ b/packages/models-library/src/models_library/api_schemas_long_running_tasks/tasks.py @@ -10,7 +10,7 @@ class TaskStatus(BaseModel): task_progress: TaskProgress done: bool - started: datetime + started: datetime | None class TaskResult(BaseModel): diff --git a/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py b/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py index 953cd1f819cc..3fb24ae952dc 100644 --- a/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py +++ b/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py @@ -16,8 +16,7 @@ class AsyncJobStatus(BaseModel): class AsyncJobResult(BaseModel): - result: Any | None - error: Any | None + result: Any class AsyncJobGet(BaseModel): diff --git a/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py b/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py index 5902bf317387..8403bdd2ff03 100644 --- a/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py +++ b/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/exceptions.py @@ -5,9 +5,27 @@ class BaseAsyncjobRpcError(OsparcErrorMixin, RuntimeError): pass -class StatusError(BaseAsyncjobRpcError): +class JobSchedulerError(BaseAsyncjobRpcError): + msg_template: str = "Celery exception: {exc}" + + +class JobMissingError(BaseAsyncjobRpcError): + msg_template: str = "Job {job_id} does not exist" + + +class JobStatusError(BaseAsyncjobRpcError): msg_template: str = "Could not get status of job {job_id}" -class ResultError(BaseAsyncjobRpcError): - msg_template: str = "Could not get results of job {job_id}" +class JobNotDoneError(BaseAsyncjobRpcError): + msg_template: str = "Job {job_id} not done" + + +class JobAbortedError(BaseAsyncjobRpcError): + msg_template: str = "Job {job_id} aborted" + + +class JobError(BaseAsyncjobRpcError): + msg_template: str = ( + "Job {job_id} failed with exception type {exc_type} and message {exc_msg}" + ) diff --git a/packages/models-library/src/models_library/api_schemas_storage/data_export_async_jobs.py b/packages/models-library/src/models_library/api_schemas_storage/data_export_async_jobs.py index a3db991452f9..57a39c34ecbe 100644 --- a/packages/models-library/src/models_library/api_schemas_storage/data_export_async_jobs.py +++ b/packages/models-library/src/models_library/api_schemas_storage/data_export_async_jobs.py @@ -17,17 +17,11 @@ class StorageRpcBaseError(OsparcErrorMixin, RuntimeError): pass -class InvalidLocationIdError(StorageRpcBaseError): - msg_template: str = "Invalid location_id {location_id}" - - class InvalidFileIdentifierError(StorageRpcBaseError): msg_template: str = "Could not find the file {file_id}" class AccessRightError(StorageRpcBaseError): - msg_template: str = "User {user_id} does not have access to file {file_id} with location {location_id}" - - -class DataExportError(StorageRpcBaseError): - msg_template: str = "Could not complete data export job with id {job_id}" + msg_template: str = ( + "User {user_id} does not have access to file {file_id} with location {location_id}" + ) diff --git a/packages/models-library/src/models_library/api_schemas_webserver/storage.py b/packages/models-library/src/models_library/api_schemas_webserver/storage.py index 07659121a426..3049bf4d0bdc 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/storage.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/storage.py @@ -1,25 +1,18 @@ from pathlib import Path -from typing import Annotated, Any +from typing import Annotated from pydantic import BaseModel, Field -from ..api_schemas_rpc_async_jobs.async_jobs import ( - AsyncJobGet, - AsyncJobId, - AsyncJobResult, - AsyncJobStatus, -) from ..api_schemas_storage.data_export_async_jobs import DataExportTaskStartInput from ..api_schemas_storage.storage_schemas import ( DEFAULT_NUMBER_OF_PATHS_PER_PAGE, MAX_NUMBER_OF_PATHS_PER_PAGE, ) -from ..progress_bar import ProgressReport from ..projects_nodes_io import LocationID, StorageFileID from ..rest_pagination import ( CursorQueryParameters, ) -from ._base import InputSchema, OutputSchema +from ._base import InputSchema class StorageLocationPathParams(BaseModel): @@ -51,40 +44,3 @@ def to_rpc_schema(self, location_id: LocationID) -> DataExportTaskStartInput: file_and_folder_ids=self.paths, location_id=location_id, ) - - -class StorageAsyncJobGet(OutputSchema): - job_id: AsyncJobId - - @classmethod - def from_rpc_schema(cls, async_job_rpc_get: AsyncJobGet) -> "StorageAsyncJobGet": - return StorageAsyncJobGet(job_id=async_job_rpc_get.job_id) - - -class StorageAsyncJobStatus(OutputSchema): - job_id: AsyncJobId - progress: ProgressReport - done: bool - - @classmethod - def from_rpc_schema( - cls, async_job_rpc_status: AsyncJobStatus - ) -> "StorageAsyncJobStatus": - return StorageAsyncJobStatus( - job_id=async_job_rpc_status.job_id, - progress=async_job_rpc_status.progress, - done=async_job_rpc_status.done, - ) - - -class StorageAsyncJobResult(OutputSchema): - result: Any | None - error: Any | None - - @classmethod - def from_rpc_schema( - cls, async_job_rpc_result: AsyncJobResult - ) -> "StorageAsyncJobResult": - return StorageAsyncJobResult( - result=async_job_rpc_result.result, error=async_job_rpc_result.error - ) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py index 022da445b839..db81b8d9f58d 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py @@ -1,7 +1,6 @@ from typing import Final from models_library.api_schemas_rpc_async_jobs.async_jobs import ( - AsyncJobAbort, AsyncJobGet, AsyncJobId, AsyncJobNameData, @@ -18,58 +17,56 @@ _RPC_METHOD_NAME_ADAPTER = TypeAdapter(RPCMethodName) -async def abort( +async def cancel( rabbitmq_rpc_client: RabbitMQRPCClient, *, rpc_namespace: RPCNamespace, job_id: AsyncJobId, job_id_data: AsyncJobNameData, -) -> AsyncJobAbort: - result = await rabbitmq_rpc_client.request( +) -> None: + await rabbitmq_rpc_client.request( rpc_namespace, - _RPC_METHOD_NAME_ADAPTER.validate_python("abort"), + _RPC_METHOD_NAME_ADAPTER.validate_python("cancel"), job_id=job_id, job_id_data=job_id_data, timeout_s=_DEFAULT_TIMEOUT_S, ) - assert isinstance(result, AsyncJobAbort) - return result -async def get_status( +async def status( rabbitmq_rpc_client: RabbitMQRPCClient, *, rpc_namespace: RPCNamespace, job_id: AsyncJobId, job_id_data: AsyncJobNameData, ) -> AsyncJobStatus: - result = await rabbitmq_rpc_client.request( + _result = await rabbitmq_rpc_client.request( rpc_namespace, - _RPC_METHOD_NAME_ADAPTER.validate_python("get_status"), + _RPC_METHOD_NAME_ADAPTER.validate_python("status"), job_id=job_id, job_id_data=job_id_data, timeout_s=_DEFAULT_TIMEOUT_S, ) - assert isinstance(result, AsyncJobStatus) - return result + assert isinstance(_result, AsyncJobStatus) + return _result -async def get_result( +async def result( rabbitmq_rpc_client: RabbitMQRPCClient, *, rpc_namespace: RPCNamespace, job_id: AsyncJobId, job_id_data: AsyncJobNameData, ) -> AsyncJobResult: - result = await rabbitmq_rpc_client.request( + _result = await rabbitmq_rpc_client.request( rpc_namespace, - _RPC_METHOD_NAME_ADAPTER.validate_python("get_result"), + _RPC_METHOD_NAME_ADAPTER.validate_python("result"), job_id=job_id, job_id_data=job_id_data, timeout_s=_DEFAULT_TIMEOUT_S, ) - assert isinstance(result, AsyncJobResult) - return result + assert isinstance(_result, AsyncJobResult) + return _result async def list_jobs( @@ -79,17 +76,17 @@ async def list_jobs( filter_: str, job_id_data: AsyncJobNameData, ) -> list[AsyncJobGet]: - result: list[AsyncJobGet] = await rabbitmq_rpc_client.request( + _result: list[AsyncJobGet] = await rabbitmq_rpc_client.request( rpc_namespace, _RPC_METHOD_NAME_ADAPTER.validate_python("list_jobs"), filter_=filter_, job_id_data=job_id_data, timeout_s=_DEFAULT_TIMEOUT_S, ) - return result + return _result -async def submit_job( +async def submit( rabbitmq_rpc_client: RabbitMQRPCClient, *, rpc_namespace: RPCNamespace, @@ -97,12 +94,12 @@ async def submit_job( job_id_data: AsyncJobNameData, **kwargs, ) -> AsyncJobGet: - result = await rabbitmq_rpc_client.request( + _result = await rabbitmq_rpc_client.request( rpc_namespace, _RPC_METHOD_NAME_ADAPTER.validate_python(method_name), job_id_data=job_id_data, **kwargs, timeout_s=_DEFAULT_TIMEOUT_S, ) - assert isinstance(result, AsyncJobGet) # nosec - return result + assert isinstance(_result, AsyncJobGet) # nosec + return _result diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/data_export.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/data_export.py new file mode 100644 index 000000000000..cd9770cb688d --- /dev/null +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/data_export.py @@ -0,0 +1,22 @@ +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobGet, + AsyncJobNameData, +) +from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE +from models_library.rabbitmq_basic_types import RPCMethodName +from pydantic import TypeAdapter + +from ... import RabbitMQRPCClient +from ..async_jobs.async_jobs import submit + + +async def start_data_export( + rabbitmq_rpc_client: RabbitMQRPCClient, *, job_id_data: AsyncJobNameData, **kwargs +) -> AsyncJobGet: + return await submit( + rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + method_name=TypeAdapter(RPCMethodName).validate_python("start_data_export"), + job_id_data=job_id_data, + **kwargs, + ) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py index d924a94fbe7a..a549b8fcffc2 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/storage/paths.py @@ -1,16 +1,16 @@ from pathlib import Path from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobGet, AsyncJobNameData, ) from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE -from models_library.api_schemas_webserver.storage import StorageAsyncJobGet from models_library.projects_nodes_io import LocationID from models_library.rabbitmq_basic_types import RPCMethodName from models_library.users import UserID from ..._client_rpc import RabbitMQRPCClient -from ..async_jobs.async_jobs import submit_job +from ..async_jobs.async_jobs import submit async def compute_path_size( @@ -20,9 +20,9 @@ async def compute_path_size( product_name: str, location_id: LocationID, path: Path, -) -> tuple[StorageAsyncJobGet, AsyncJobNameData]: +) -> tuple[AsyncJobGet, AsyncJobNameData]: job_id_data = AsyncJobNameData(user_id=user_id, product_name=product_name) - async_job_rpc_get = await submit_job( + async_job_rpc_get = await submit( rabbitmq_rpc_client=client, rpc_namespace=STORAGE_RPC_NAMESPACE, method_name=RPCMethodName("compute_path_size"), @@ -30,4 +30,4 @@ async def compute_path_size( location_id=location_id, path=path, ) - return StorageAsyncJobGet.from_rpc_schema(async_job_rpc_get), job_id_data + return async_job_rpc_get, job_id_data diff --git a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py index f901502dda8d..ba7c920f8aa9 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py @@ -1,8 +1,10 @@ # pylint: disable=unused-argument +import logging + +from celery.exceptions import CeleryError # type: ignore[import-untyped] from fastapi import FastAPI from models_library.api_schemas_rpc_async_jobs.async_jobs import ( - AsyncJobAbort, AsyncJobGet, AsyncJobId, AsyncJobNameData, @@ -10,37 +12,71 @@ AsyncJobStatus, ) from models_library.api_schemas_rpc_async_jobs.exceptions import ( - ResultError, - StatusError, + JobAbortedError, + JobError, + JobMissingError, + JobNotDoneError, + JobSchedulerError, ) +from servicelib.logging_utils import log_catch from servicelib.rabbitmq import RPCRouter from ...modules.celery import get_celery_client -from ...modules.celery.models import TaskStatus +from ...modules.celery.client import CeleryTaskQueueClient +from ...modules.celery.models import TaskError, TaskState +_logger = logging.getLogger(__name__) router = RPCRouter() -@router.expose() -async def abort( - app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData -) -> AsyncJobAbort: - assert app # nosec - assert job_id_data # nosec - return AsyncJobAbort(result=True, job_id=job_id) +async def _assert_job_exists( + *, + job_id: AsyncJobId, + job_id_data: AsyncJobNameData, + celery_client: CeleryTaskQueueClient, +) -> None: + """Raises JobMissingError if job doesn't exist""" + job_ids = await celery_client.get_task_uuids( + task_context=job_id_data.model_dump(), + ) + if job_id not in job_ids: + raise JobMissingError(job_id=f"{job_id}") -@router.expose(reraise_if_error_type=(StatusError,)) -async def get_status( +@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError)) +async def cancel(app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData): + assert app # nosec + assert job_id_data # nosec + try: + await _assert_job_exists( + job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app) + ) + await get_celery_client(app).abort_task( + task_context=job_id_data.model_dump(), + task_uuid=job_id, + ) + except CeleryError as exc: + raise JobSchedulerError(exc=f"{exc}") from exc + + +@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError)) +async def status( app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData ) -> AsyncJobStatus: assert app # nosec assert job_id_data # nosec - task_status: TaskStatus = await get_celery_client(app).get_task_status( - task_context=job_id_data.model_dump(), - task_uuid=job_id, - ) + try: + await _assert_job_exists( + job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app) + ) + task_status = await get_celery_client(app).get_task_status( + task_context=job_id_data.model_dump(), + task_uuid=job_id, + ) + except CeleryError as exc: + raise JobSchedulerError(exc=f"{exc}") from exc + return AsyncJobStatus( job_id=job_id, progress=task_status.progress_report, @@ -48,30 +84,64 @@ async def get_status( ) -@router.expose(reraise_if_error_type=(ResultError,)) -async def get_result( +@router.expose( + reraise_if_error_type=( + JobError, + JobNotDoneError, + JobAbortedError, + JobSchedulerError, + JobMissingError, + ) +) +async def result( app: FastAPI, job_id: AsyncJobId, job_id_data: AsyncJobNameData ) -> AsyncJobResult: assert app # nosec assert job_id # nosec assert job_id_data # nosec - result = await get_celery_client(app).get_task_result( - task_context=job_id_data.model_dump(), - task_uuid=job_id, - ) - - return AsyncJobResult(result=result, error=None) - - -@router.expose() + try: + await _assert_job_exists( + job_id=job_id, job_id_data=job_id_data, celery_client=get_celery_client(app) + ) + _status = await get_celery_client(app).get_task_status( + task_context=job_id_data.model_dump(), + task_uuid=job_id, + ) + if not _status.is_done: + raise JobNotDoneError(job_id=job_id) + _result = await get_celery_client(app).get_task_result( + task_context=job_id_data.model_dump(), + task_uuid=job_id, + ) + except CeleryError as exc: + raise JobSchedulerError(exc=f"{exc}") from exc + + if _status.task_state == TaskState.ABORTED: + raise JobAbortedError(job_id=job_id) + if _status.task_state == TaskState.ERROR: + exc_type = "" + exc_msg = "" + with log_catch(logger=_logger, reraise=False): + task_error = TaskError.model_validate_json(_result) + exc_type = task_error.exc_type + exc_msg = task_error.exc_msg + raise JobError(job_id=job_id, exc_type=exc_type, exc_msg=exc_msg) + + return AsyncJobResult(result=_result) + + +@router.expose(reraise_if_error_type=(JobSchedulerError,)) async def list_jobs( app: FastAPI, filter_: str, job_id_data: AsyncJobNameData ) -> list[AsyncJobGet]: assert app # nosec - task_uuids = await get_celery_client(app).get_task_uuids( - task_context=job_id_data.model_dump(), - ) + try: + task_uuids = await get_celery_client(app).get_task_uuids( + task_context=job_id_data.model_dump(), + ) + except CeleryError as exc: + raise JobSchedulerError(exc=f"{exc}") from exc return [AsyncJobGet(job_id=task_uuid) for task_uuid in task_uuids] diff --git a/services/storage/src/simcore_service_storage/api/rpc/_data_export.py b/services/storage/src/simcore_service_storage/api/rpc/_data_export.py index aab9d7339f62..7fe6612e5e39 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_data_export.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_data_export.py @@ -1,11 +1,12 @@ +from celery.exceptions import CeleryError # type: ignore[import-untyped] from fastapi import FastAPI from models_library.api_schemas_rpc_async_jobs.async_jobs import ( AsyncJobGet, AsyncJobNameData, ) +from models_library.api_schemas_rpc_async_jobs.exceptions import JobSchedulerError from models_library.api_schemas_storage.data_export_async_jobs import ( AccessRightError, - DataExportError, DataExportTaskStartInput, InvalidFileIdentifierError, ) @@ -25,7 +26,7 @@ reraise_if_error_type=( InvalidFileIdentifierError, AccessRightError, - DataExportError, + JobSchedulerError, ) ) async def start_data_export( @@ -51,12 +52,14 @@ async def start_data_export( location_id=data_export_start.location_id, ) from err - task_uuid = await get_celery_client(app).send_task( - "export_data", - task_context=job_id_data.model_dump(), - files=data_export_start.file_and_folder_ids, # ANE: adapt here your signature - ) - + try: + task_uuid = await get_celery_client(app).send_task( + "export_data", + task_context=job_id_data.model_dump(), + files=data_export_start.file_and_folder_ids, # ANE: adapt here your signature + ) + except CeleryError as exc: + raise JobSchedulerError(exc=f"{exc}") from exc return AsyncJobGet( job_id=task_uuid, ) diff --git a/services/storage/src/simcore_service_storage/core/settings.py b/services/storage/src/simcore_service_storage/core/settings.py index 66142a82b3ee..4d246a89eeb0 100644 --- a/services/storage/src/simcore_service_storage/core/settings.py +++ b/services/storage/src/simcore_service_storage/core/settings.py @@ -78,7 +78,7 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): RabbitSettings | None, Field( json_schema_extra={"auto_default_from_env": True}, - ) + ), ] STORAGE_S3_CLIENT_MAX_TRANSFER_CONCURRENCY: Annotated[ diff --git a/services/storage/tests/unit/test_data_export.py b/services/storage/tests/unit/test_data_export.py new file mode 100644 index 000000000000..05c0f99a176a --- /dev/null +++ b/services/storage/tests/unit/test_data_export.py @@ -0,0 +1,601 @@ +# pylint: disable=W0621 +# pylint: disable=W0613 +# pylint: disable=R6301 +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Literal, NamedTuple +from uuid import UUID + +import pytest +from celery.exceptions import CeleryError +from faker import Faker +from fastapi import FastAPI +from models_library.api_schemas_long_running_tasks.tasks import TaskResult +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobGet, + AsyncJobId, + AsyncJobNameData, + AsyncJobResult, + AsyncJobStatus, +) +from models_library.api_schemas_rpc_async_jobs.exceptions import ( + JobAbortedError, + JobError, + JobMissingError, + JobNotDoneError, + JobSchedulerError, +) +from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE +from models_library.api_schemas_storage.data_export_async_jobs import ( + DataExportTaskStartInput, +) +from models_library.progress_bar import ProgressReport +from models_library.projects_nodes_io import NodeID, SimcoreS3FileID +from models_library.users import UserID +from pydantic import ByteSize, TypeAdapter +from pytest_mock import MockerFixture +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict +from pytest_simcore.helpers.storage_utils import FileIDDict, ProjectWithFilesParams +from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs +from servicelib.rabbitmq.rpc_interfaces.storage.data_export import start_data_export +from settings_library.rabbit import RabbitSettings +from simcore_service_storage.api.rpc._data_export import AccessRightError +from simcore_service_storage.core.settings import ApplicationSettings +from simcore_service_storage.modules.celery.client import TaskUUID +from simcore_service_storage.modules.celery.models import TaskState, TaskStatus +from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager + +pytest_plugins = [ + "pytest_simcore.rabbit_service", +] + + +pytest_simcore_core_services_selection = [ + "rabbit", + "postgres", +] + +_faker = Faker() + + +@dataclass +class _MockCeleryClient: + send_task_object: UUID | Exception | None = None + get_task_status_object: TaskStatus | Exception | None = None + get_task_result_object: TaskResult | Exception | None = None + get_task_uuids_object: set[UUID] | Exception | None = None + abort_task_object: Exception | None = None + + async def send_task(self, *args, **kwargs) -> TaskUUID: + assert self.send_task_object is not None + if isinstance(self.send_task_object, Exception): + raise self.send_task_object + return self.send_task_object + + async def get_task_status(self, *args, **kwargs) -> TaskStatus: + assert self.get_task_status_object is not None + if isinstance(self.get_task_status_object, Exception): + raise self.get_task_status_object + return self.get_task_status_object + + async def get_task_result(self, *args, **kwargs) -> Any: + assert self.get_task_result_object is not None + if isinstance(self.get_task_result_object, Exception): + raise self.get_task_result_object + return self.get_task_result_object + + async def get_task_uuids(self, *args, **kwargs) -> set[TaskUUID]: + assert self.get_task_uuids_object is not None + if isinstance(self.get_task_uuids_object, Exception): + raise self.get_task_uuids_object + return self.get_task_uuids_object + + async def abort_task(self, *args, **kwargs) -> None: + if isinstance(self.abort_task_object, Exception): + raise self.abort_task_object + return self.abort_task_object + + +@pytest.fixture +async def mock_celery_client( + mocker: MockerFixture, + request: pytest.FixtureRequest, +) -> _MockCeleryClient: + params = request.param if hasattr(request, "param") else {} + _celery_client = _MockCeleryClient( + send_task_object=params.get("send_task_object", None), + get_task_status_object=params.get("get_task_status_object", None), + get_task_result_object=params.get("get_task_result_object", None), + get_task_uuids_object=params.get("get_task_uuids_object", None), + abort_task_object=params.get("abort_task_object", None), + ) + mocker.patch( + "simcore_service_storage.api.rpc._async_jobs.get_celery_client", + return_value=_celery_client, + ) + mocker.patch( + "simcore_service_storage.api.rpc._data_export.get_celery_client", + return_value=_celery_client, + ) + return _celery_client + + +@pytest.fixture +async def app_environment( + app_environment: EnvVarsDict, + rabbit_service: RabbitSettings, + monkeypatch: pytest.MonkeyPatch, +): + new_envs = setenvs_from_dict( + monkeypatch, + { + **app_environment, + "RABBIT_HOST": rabbit_service.RABBIT_HOST, + "RABBIT_PORT": f"{rabbit_service.RABBIT_PORT}", + "RABBIT_USER": rabbit_service.RABBIT_USER, + "RABBIT_SECURE": f"{rabbit_service.RABBIT_SECURE}", + "RABBIT_PASSWORD": rabbit_service.RABBIT_PASSWORD.get_secret_value(), + }, + ) + + settings = ApplicationSettings.create_from_envs() + assert settings.STORAGE_RABBITMQ + + return new_envs + + +@pytest.fixture +async def rpc_client( + initialized_app: FastAPI, + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], +) -> RabbitMQRPCClient: + return await rabbitmq_rpc_client("client") + + +class UserWithFile(NamedTuple): + user: UserID + file: Path + + +@pytest.mark.parametrize( + "location_id", + [SimcoreS3DataManager.get_location_id()], + ids=[SimcoreS3DataManager.get_location_name()], + indirect=True, +) +@pytest.mark.parametrize( + "project_params,selection_type", + [ + ( + ProjectWithFilesParams( + num_nodes=1, + allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("1b"),), + workspace_files_count=10, + ), + "file", + ), + ( + ProjectWithFilesParams( + num_nodes=1, + allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("1b"),), + workspace_files_count=10, + ), + "folder", + ), + ], + ids=str, +) +@pytest.mark.parametrize( + "mock_celery_client", + [ + {"send_task_object": TaskUUID(_faker.uuid4())}, + ], + indirect=True, +) +async def test_start_data_export_success( + rpc_client: RabbitMQRPCClient, + mock_celery_client: _MockCeleryClient, + with_random_project_with_files: tuple[ + dict[str, Any], + dict[NodeID, dict[SimcoreS3FileID, FileIDDict]], + ], + user_id: UserID, + selection_type: Literal["file", "folder"], +): + _, list_of_files = with_random_project_with_files + workspace_files = [ + p for p in next(iter(list_of_files.values())) if "/workspace/" in p + ] + assert len(workspace_files) > 0 + file_or_folder_id: SimcoreS3FileID + if selection_type == "file": + file_or_folder_id = workspace_files[0] + elif selection_type == "folder": + parts = Path(workspace_files[0]).parts + parts = parts[0 : parts.index("workspace") + 1] + assert len(parts) > 0 + folder = Path(*parts) + assert folder.name == "workspace" + file_or_folder_id = f"{folder}" + else: + pytest.fail(f"invalid parameter: {selection_type=}") + + result = await start_data_export( + rpc_client, + job_id_data=AsyncJobNameData(user_id=user_id, product_name="osparc"), + data_export_start=DataExportTaskStartInput( + location_id=0, + file_and_folder_ids=[file_or_folder_id], + ), + ) + assert isinstance(result, AsyncJobGet) + + +@pytest.mark.parametrize( + "location_id", + [SimcoreS3DataManager.get_location_id()], + ids=[SimcoreS3DataManager.get_location_name()], + indirect=True, +) +@pytest.mark.parametrize( + "project_params", + [ + ProjectWithFilesParams( + num_nodes=1, + allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("1b"),), + workspace_files_count=10, + ), + ], + ids=str, +) +@pytest.mark.parametrize( + "mock_celery_client", + [ + {"send_task_object": CeleryError("error")}, + ], + indirect=True, +) +async def test_start_data_export_scheduler_error( + rpc_client: RabbitMQRPCClient, + mock_celery_client: _MockCeleryClient, + with_random_project_with_files: tuple[ + dict[str, Any], + dict[NodeID, dict[SimcoreS3FileID, FileIDDict]], + ], + user_id: UserID, +): + + _, list_of_files = with_random_project_with_files + workspace_files = [ + p for p in list(list_of_files.values())[0].keys() if "/workspace/" in p + ] + assert len(workspace_files) > 0 + file_or_folder_id = workspace_files[0] + + with pytest.raises(JobSchedulerError): + _ = await start_data_export( + rpc_client, + job_id_data=AsyncJobNameData(user_id=user_id, product_name="osparc"), + data_export_start=DataExportTaskStartInput( + location_id=0, + file_and_folder_ids=[file_or_folder_id], + ), + ) + + +@pytest.mark.parametrize( + "mock_celery_client", + [ + {"send_task_object": TaskUUID(_faker.uuid4())}, + ], + indirect=True, +) +async def test_start_data_export_access_error( + rpc_client: RabbitMQRPCClient, + mock_celery_client: _MockCeleryClient, + user_id: UserID, + faker: Faker, +): + with pytest.raises(AccessRightError): + _ = await async_jobs.submit( + rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + method_name="start_data_export", + job_id_data=AsyncJobNameData(user_id=user_id, product_name="osparc"), + data_export_start=DataExportTaskStartInput( + location_id=0, + file_and_folder_ids=[ + f"{faker.uuid4()}/{faker.uuid4()}/{faker.file_name()}" + ], + ), + ) + + +@pytest.mark.parametrize( + "mock_celery_client", + [ + { + "abort_task_object": None, + "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], + }, + ], + indirect=True, +) +async def test_abort_data_export_success( + rpc_client: RabbitMQRPCClient, + mock_celery_client: _MockCeleryClient, +): + assert mock_celery_client.get_task_uuids_object is not None + assert not isinstance(mock_celery_client.get_task_uuids_object, Exception) + await async_jobs.cancel( + rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id_data=AsyncJobNameData( + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" + ), + job_id=next(iter(mock_celery_client.get_task_uuids_object)), + ) + + +@pytest.mark.parametrize( + "mock_celery_client, expected_exception_type", + [ + ({"abort_task_object": None, "get_task_uuids_object": []}, JobMissingError), + ( + { + "abort_task_object": CeleryError("error"), + "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], + }, + JobSchedulerError, + ), + ], + indirect=["mock_celery_client"], +) +async def test_abort_data_export_error( + rpc_client: RabbitMQRPCClient, + mock_celery_client: _MockCeleryClient, + expected_exception_type: type[Exception], +): + job_ids = mock_celery_client.get_task_uuids_object + assert job_ids is not None + assert not isinstance(job_ids, Exception) + _job_id = next(iter(job_ids)) if len(job_ids) > 0 else AsyncJobId(_faker.uuid4()) + with pytest.raises(expected_exception_type): + await async_jobs.cancel( + rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id_data=AsyncJobNameData( + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" + ), + job_id=_job_id, + ) + + +@pytest.mark.parametrize( + "mock_celery_client", + [ + { + "get_task_status_object": TaskStatus( + task_uuid=TaskUUID(_faker.uuid4()), + task_state=TaskState.RUNNING, + progress_report=ProgressReport(actual_value=0), + ), + "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], + }, + ], + indirect=True, +) +async def test_get_data_export_status( + rpc_client: RabbitMQRPCClient, + mock_celery_client: _MockCeleryClient, +): + job_ids = mock_celery_client.get_task_uuids_object + assert job_ids is not None + assert not isinstance(job_ids, Exception) + _job_id = next(iter(job_ids)) if len(job_ids) > 0 else AsyncJobId(_faker.uuid4()) + result = await async_jobs.status( + rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=_job_id, + job_id_data=AsyncJobNameData( + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" + ), + ) + assert isinstance(result, AsyncJobStatus) + assert result.job_id == _job_id + + +@pytest.mark.parametrize( + "mock_celery_client, expected_exception_type", + [ + ( + {"get_task_status_object": None, "get_task_uuids_object": []}, + JobMissingError, + ), + ( + { + "get_task_status_object": CeleryError("error"), + "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], + }, + JobSchedulerError, + ), + ], + indirect=["mock_celery_client"], +) +async def test_get_data_export_status_error( + rpc_client: RabbitMQRPCClient, + mock_celery_client: _MockCeleryClient, + expected_exception_type: type[Exception], +): + job_ids = mock_celery_client.get_task_uuids_object + assert job_ids is not None + assert not isinstance(job_ids, Exception) + _job_id = next(iter(job_ids)) if len(job_ids) > 0 else AsyncJobId(_faker.uuid4()) + with pytest.raises(expected_exception_type): + _ = await async_jobs.status( + rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=_job_id, + job_id_data=AsyncJobNameData( + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" + ), + ) + + +@pytest.mark.parametrize( + "mock_celery_client", + [ + { + "get_task_status_object": TaskStatus( + task_uuid=TaskUUID(_faker.uuid4()), + task_state=TaskState.SUCCESS, + progress_report=ProgressReport(actual_value=100), + ), + "get_task_result_object": "result", + "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], + }, + ], + indirect=True, +) +async def test_get_data_export_result_success( + rpc_client: RabbitMQRPCClient, + mock_celery_client: _MockCeleryClient, +): + job_ids = mock_celery_client.get_task_uuids_object + assert job_ids is not None + assert not isinstance(job_ids, Exception) + _job_id = next(iter(job_ids)) if len(job_ids) > 0 else AsyncJobId(_faker.uuid4()) + result = await async_jobs.result( + rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=_job_id, + job_id_data=AsyncJobNameData( + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" + ), + ) + assert isinstance(result, AsyncJobResult) + + +@pytest.mark.parametrize( + "mock_celery_client, expected_exception", + [ + ( + { + "get_task_status_object": TaskStatus( + task_uuid=TaskUUID(_faker.uuid4()), + task_state=TaskState.RUNNING, + progress_report=ProgressReport(actual_value=50), + ), + "get_task_result_object": _faker.text(), + "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], + }, + JobNotDoneError, + ), + ( + { + "get_task_status_object": TaskStatus( + task_uuid=TaskUUID(_faker.uuid4()), + task_state=TaskState.ABORTED, + progress_report=ProgressReport(actual_value=100), + ), + "get_task_result_object": _faker.text(), + "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], + }, + JobAbortedError, + ), + ( + { + "get_task_status_object": TaskStatus( + task_uuid=TaskUUID(_faker.uuid4()), + task_state=TaskState.ERROR, + progress_report=ProgressReport(actual_value=100), + ), + "get_task_result_object": _faker.text(), + "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], + }, + JobError, + ), + ( + { + "get_task_status_object": CeleryError("error"), + "get_task_result_object": _faker.text(), + "get_task_uuids_object": [AsyncJobId(_faker.uuid4())], + }, + JobSchedulerError, + ), + ( + { + "get_task_uuids_object": [], + }, + JobMissingError, + ), + ], + indirect=["mock_celery_client"], +) +async def test_get_data_export_result_error( + rpc_client: RabbitMQRPCClient, + mock_celery_client: _MockCeleryClient, + expected_exception: type[Exception], +): + job_ids = mock_celery_client.get_task_uuids_object + assert job_ids is not None + assert not isinstance(job_ids, Exception) + _job_id = next(iter(job_ids)) if len(job_ids) > 0 else AsyncJobId(_faker.uuid4()) + + with pytest.raises(expected_exception): + _ = await async_jobs.result( + rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=_job_id, + job_id_data=AsyncJobNameData( + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" + ), + ) + + +@pytest.mark.parametrize( + "mock_celery_client", + [ + {"get_task_uuids_object": [_faker.uuid4() for _ in range(_faker.pyint(1, 10))]}, + ], + indirect=True, +) +async def test_list_jobs_success( + rpc_client: RabbitMQRPCClient, + mock_celery_client: MockerFixture, +): + result = await async_jobs.list_jobs( + rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id_data=AsyncJobNameData( + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" + ), + filter_="", + ) + assert isinstance(result, list) + assert all(isinstance(elm, AsyncJobGet) for elm in result) + + +@pytest.mark.parametrize( + "mock_celery_client", + [ + {"get_task_uuids_object": CeleryError("error")}, + ], + indirect=True, +) +async def test_list_jobs_error( + rpc_client: RabbitMQRPCClient, + mock_celery_client: MockerFixture, +): + with pytest.raises(JobSchedulerError): + _ = await async_jobs.list_jobs( + rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id_data=AsyncJobNameData( + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" + ), + filter_="", + ) diff --git a/services/storage/tests/unit/test_db_data_export.py b/services/storage/tests/unit/test_db_data_export.py deleted file mode 100644 index 0bf9db1a2edb..000000000000 --- a/services/storage/tests/unit/test_db_data_export.py +++ /dev/null @@ -1,281 +0,0 @@ -# pylint: disable=W0621 -# pylint: disable=W0613 -# pylint: disable=R6301 -from collections.abc import Awaitable, Callable -from pathlib import Path -from typing import Any, Literal, NamedTuple - -import pytest -from faker import Faker -from fastapi import FastAPI -from models_library.api_schemas_rpc_async_jobs.async_jobs import ( - AsyncJobAbort, - AsyncJobGet, - AsyncJobId, - AsyncJobResult, - AsyncJobStatus, -) -from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE -from models_library.api_schemas_storage.data_export_async_jobs import ( - DataExportTaskStartInput, -) -from models_library.progress_bar import ProgressReport -from models_library.projects_nodes_io import NodeID, SimcoreS3FileID -from models_library.users import UserID -from pydantic import ByteSize, TypeAdapter -from pytest_mock import MockerFixture -from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict -from pytest_simcore.helpers.storage_utils import FileIDDict, ProjectWithFilesParams -from pytest_simcore.helpers.typing_env import EnvVarsDict -from servicelib.rabbitmq import RabbitMQRPCClient -from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs -from settings_library.rabbit import RabbitSettings -from simcore_service_storage.api.rpc._async_jobs import AsyncJobNameData, TaskStatus -from simcore_service_storage.api.rpc._data_export import AccessRightError -from simcore_service_storage.core.settings import ApplicationSettings -from simcore_service_storage.modules.celery.client import TaskUUID -from simcore_service_storage.modules.celery.models import TaskState -from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager - -pytest_plugins = [ - "pytest_simcore.rabbit_service", -] - - -pytest_simcore_core_services_selection = [ - "rabbit", - "postgres", -] - -_faker = Faker() - - -@pytest.fixture -async def mock_rabbit_setup(mocker: MockerFixture): - # fixture to avoid mocking the rabbit - pass - - -class _MockCeleryClient: - async def send_task(self, *args, **kwargs) -> TaskUUID: - return _faker.uuid4() - - async def get_task_status(self, *args, **kwargs) -> TaskStatus: - return TaskStatus( - task_uuid=_faker.uuid4(), - task_state=TaskState.RUNNING, - progress_report=ProgressReport(actual_value=42.0), - ) - - async def get_task_result(self, *args, **kwargs) -> Any: - return {} - - async def get_task_uuids(self, *args, **kwargs) -> set[TaskUUID]: - return {_faker.uuid4()} - - -@pytest.fixture -async def mock_celery_client(mocker: MockerFixture) -> MockerFixture: - _celery_client = _MockCeleryClient() - mocker.patch( - "simcore_service_storage.api.rpc._async_jobs.get_celery_client", - return_value=_celery_client, - ) - mocker.patch( - "simcore_service_storage.api.rpc._data_export.get_celery_client", - return_value=_celery_client, - ) - return mocker - - -@pytest.fixture -async def app_environment( - app_environment: EnvVarsDict, - rabbit_service: RabbitSettings, - monkeypatch: pytest.MonkeyPatch, -): - new_envs = setenvs_from_dict( - monkeypatch, - { - **app_environment, - "RABBIT_HOST": rabbit_service.RABBIT_HOST, - "RABBIT_PORT": f"{rabbit_service.RABBIT_PORT}", - "RABBIT_USER": rabbit_service.RABBIT_USER, - "RABBIT_SECURE": f"{rabbit_service.RABBIT_SECURE}", - "RABBIT_PASSWORD": rabbit_service.RABBIT_PASSWORD.get_secret_value(), - }, - ) - - settings = ApplicationSettings.create_from_envs() - assert settings.STORAGE_RABBITMQ - - return new_envs - - -@pytest.fixture -async def rpc_client( - initialized_app: FastAPI, - rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], -) -> RabbitMQRPCClient: - return await rabbitmq_rpc_client("client") - - -class UserWithFile(NamedTuple): - user: UserID - file: Path - - -@pytest.mark.parametrize( - "location_id", - [SimcoreS3DataManager.get_location_id()], - ids=[SimcoreS3DataManager.get_location_name()], - indirect=True, -) -@pytest.mark.parametrize( - "project_params,_type", - [ - ( - ProjectWithFilesParams( - num_nodes=1, - allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("1b"),), - workspace_files_count=10, - ), - "file", - ), - ( - ProjectWithFilesParams( - num_nodes=1, - allowed_file_sizes=(TypeAdapter(ByteSize).validate_python("1b"),), - workspace_files_count=10, - ), - "folder", - ), - ], - ids=str, -) -async def test_start_data_export_success( - rpc_client: RabbitMQRPCClient, - mock_celery_client: MockerFixture, - with_random_project_with_files: tuple[ - dict[str, Any], - dict[NodeID, dict[SimcoreS3FileID, FileIDDict]], - ], - user_id: UserID, - _type: Literal["file", "folder"], -): - _, list_of_files = with_random_project_with_files - workspace_files = [ - p for p in next(iter(list_of_files.values())) if "/workspace/" in p - ] - assert len(workspace_files) > 0 - file_or_folder_id: SimcoreS3FileID - if _type == "file": - file_or_folder_id = workspace_files[0] - elif _type == "folder": - parts = Path(workspace_files[0]).parts - parts = parts[0 : parts.index("workspace") + 1] - assert len(parts) > 0 - folder = Path(*parts) - assert folder.name == "workspace" - file_or_folder_id = f"{folder}" - else: - pytest.fail("invalid parameter: to_check") - - result = await async_jobs.submit_job( - rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - method_name="start_data_export", - job_id_data=AsyncJobNameData(user_id=user_id, product_name="osparc"), - data_export_start=DataExportTaskStartInput( - location_id=0, - file_and_folder_ids=[file_or_folder_id], - ), - ) - assert isinstance(result, AsyncJobGet) - - -async def test_start_data_export_fail( - rpc_client: RabbitMQRPCClient, - mock_celery_client: MockerFixture, - user_id: UserID, - faker: Faker, -): - with pytest.raises(AccessRightError): - _ = await async_jobs.submit_job( - rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - method_name="start_data_export", - job_id_data=AsyncJobNameData(user_id=user_id, product_name="osparc"), - data_export_start=DataExportTaskStartInput( - location_id=0, - file_and_folder_ids=[ - f"{faker.uuid4()}/{faker.uuid4()}/{faker.file_name()}" - ], - ), - ) - - -async def test_abort_data_export( - rpc_client: RabbitMQRPCClient, - mock_celery_client: MockerFixture, -): - _job_id = AsyncJobId(_faker.uuid4()) - result = await async_jobs.abort( - rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id_data=AsyncJobNameData( - user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" - ), - job_id=_job_id, - ) - assert isinstance(result, AsyncJobAbort) - assert result.job_id == _job_id - - -async def test_get_data_export_status( - rpc_client: RabbitMQRPCClient, - mock_celery_client: MockerFixture, -): - _job_id = AsyncJobId(_faker.uuid4()) - result = await async_jobs.get_status( - rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id=_job_id, - job_id_data=AsyncJobNameData( - user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" - ), - ) - assert isinstance(result, AsyncJobStatus) - assert result.job_id == _job_id - - -async def test_get_data_export_result( - rpc_client: RabbitMQRPCClient, - mock_celery_client: MockerFixture, -): - _job_id = AsyncJobId(_faker.uuid4()) - result = await async_jobs.get_result( - rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id=_job_id, - job_id_data=AsyncJobNameData( - user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" - ), - ) - assert isinstance(result, AsyncJobResult) - - -async def test_list_jobs( - rpc_client: RabbitMQRPCClient, - mock_celery_client: MockerFixture, -): - result = await async_jobs.list_jobs( - rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id_data=AsyncJobNameData( - user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" - ), - filter_="", - ) - assert isinstance(result, list) - assert all(isinstance(elm, AsyncJobGet) for elm in result) diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 17490658b23b..79d94f09c73b 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -2979,7 +2979,8 @@ paths: tags: - long-running-tasks summary: List Tasks - operationId: list_tasks + description: Lists all long running tasks + operationId: get_async_jobs responses: '200': description: Successful Response @@ -2987,12 +2988,37 @@ paths: application/json: schema: $ref: '#/components/schemas/Envelope_list_TaskGet__' + '404': + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + '403': + description: Forbidden + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + '410': + description: Gone + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + '500': + description: Internal Server Error + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' /v0/tasks/{task_id}: get: tags: - long-running-tasks summary: Get Task Status - operationId: get_task_status + description: Retrieves the status of a task + operationId: get_async_job_status parameters: - name: task_id in: path @@ -3007,11 +3033,36 @@ paths: application/json: schema: $ref: '#/components/schemas/Envelope_TaskStatus_' + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Not Found + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Forbidden + '410': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Gone + '500': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Internal Server Error delete: tags: - long-running-tasks summary: Cancel And Delete Task - operationId: cancel_and_delete_task + description: Cancels and deletes a task + operationId: abort_async_job parameters: - name: task_id in: path @@ -3022,12 +3073,37 @@ paths: responses: '204': description: Successful Response + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Not Found + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Forbidden + '410': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Gone + '500': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Internal Server Error /v0/tasks/{task_id}/result: get: tags: - long-running-tasks summary: Get Task Result - operationId: get_task_result + description: Retrieves the result of a task + operationId: get_async_job_result parameters: - name: task_id in: path @@ -3041,6 +3117,30 @@ paths: content: application/json: schema: {} + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Not Found + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Forbidden + '410': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Gone + '500': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Internal Server Error /v0/catalog/licensed-items: get: tags: @@ -6083,7 +6183,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Envelope_StorageAsyncJobGet_' + $ref: '#/components/schemas/Envelope_TaskGet_' /v0/storage/locations/{location_id}/datasets: get: tags: @@ -6437,95 +6537,31 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/Envelope_StorageAsyncJobGet_' - /v0/storage/async-jobs/{job_id}/status: - get: - tags: - - storage - summary: Get Async Job Status - description: Get async job status - operationId: get_async_job_status - parameters: - - name: job_id - in: path - required: true - schema: - type: string - format: uuid - title: Job Id - responses: - '200': - description: Successful Response + $ref: '#/components/schemas/Envelope_TaskGet_' + '404': content: application/json: schema: - $ref: '#/components/schemas/Envelope_StorageAsyncJobStatus_' - /v0/storage/async-jobs/{job_id}:abort: - post: - tags: - - storage - summary: Abort Async Job - description: aborts execution of an async job - operationId: abort_async_job - parameters: - - name: job_id - in: path - required: true - schema: - type: string - format: uuid - title: Job Id - responses: - '200': - description: Successful Response + $ref: '#/components/schemas/EnvelopedError' + description: Not Found + '403': content: application/json: - schema: {} - /v0/storage/async-jobs/{job_id}/result: - get: - tags: - - storage - summary: Get Async Job Result - description: Get the result of the async job - operationId: get_async_job_result - parameters: - - name: job_id - in: path - required: true - schema: - type: string - format: uuid - title: Job Id - responses: - '200': - description: Successful Response + schema: + $ref: '#/components/schemas/EnvelopedError' + description: Forbidden + '410': content: application/json: schema: - $ref: '#/components/schemas/Envelope_StorageAsyncJobResult_' - /v0/storage/async-jobs: - get: - tags: - - storage - summary: Get Async Jobs - description: Retrunsa list of async jobs for the user - operationId: get_async_jobs - parameters: - - name: user_id - in: query - required: true - schema: - type: integer - exclusiveMinimum: true - title: User Id - minimum: 0 - responses: - '200': - description: Successful Response + $ref: '#/components/schemas/EnvelopedError' + description: Gone + '500': content: application/json: schema: - $ref: '#/components/schemas/Envelope_list_StorageAsyncJobGet__' + $ref: '#/components/schemas/EnvelopedError' + description: Internal Server Error /v0/trash:empty: post: tags: @@ -9467,45 +9503,6 @@ components: title: Error type: object title: Envelope[StatusDiagnosticsGet] - Envelope_StorageAsyncJobGet_: - properties: - data: - anyOf: - - $ref: '#/components/schemas/StorageAsyncJobGet' - - type: 'null' - error: - anyOf: - - {} - - type: 'null' - title: Error - type: object - title: Envelope[StorageAsyncJobGet] - Envelope_StorageAsyncJobResult_: - properties: - data: - anyOf: - - $ref: '#/components/schemas/StorageAsyncJobResult' - - type: 'null' - error: - anyOf: - - {} - - type: 'null' - title: Error - type: object - title: Envelope[StorageAsyncJobResult] - Envelope_StorageAsyncJobStatus_: - properties: - data: - anyOf: - - $ref: '#/components/schemas/StorageAsyncJobStatus' - - type: 'null' - error: - anyOf: - - {} - - type: 'null' - title: Error - type: object - title: Envelope[StorageAsyncJobStatus] Envelope_TagGet_: properties: data: @@ -10125,22 +10122,6 @@ components: title: Error type: object title: Envelope[list[ServiceOutputGet]] - Envelope_list_StorageAsyncJobGet__: - properties: - data: - anyOf: - - items: - $ref: '#/components/schemas/StorageAsyncJobGet' - type: array - - type: 'null' - title: Data - error: - anyOf: - - {} - - type: 'null' - title: Error - type: object - title: Envelope[list[StorageAsyncJobGet]] Envelope_list_TagGet__: properties: data: @@ -13321,59 +13302,6 @@ components: - productName - ui title: ProductUIGet - ProgressReport: - properties: - actual_value: - type: number - title: Actual Value - total: - type: number - title: Total - default: 1.0 - attempt: - type: integer - title: Attempt - default: 0 - unit: - anyOf: - - type: string - const: Byte - - type: 'null' - title: Unit - message: - anyOf: - - $ref: '#/components/schemas/ProgressStructuredMessage' - - type: 'null' - type: object - required: - - actual_value - title: ProgressReport - ProgressStructuredMessage: - properties: - description: - type: string - title: Description - current: - type: number - title: Current - total: - type: integer - title: Total - unit: - anyOf: - - type: string - - type: 'null' - title: Unit - sub: - anyOf: - - $ref: '#/components/schemas/ProgressStructuredMessage' - - type: 'null' - type: object - required: - - description - - current - - total - title: ProgressStructuredMessage ProjectCopyOverride: properties: name: @@ -15026,50 +14954,6 @@ components: - loop_tasks - top_tracemalloc title: StatusDiagnosticsGet - StorageAsyncJobGet: - properties: - jobId: - type: string - format: uuid - title: Jobid - type: object - required: - - jobId - title: StorageAsyncJobGet - StorageAsyncJobResult: - properties: - result: - anyOf: - - {} - - type: 'null' - title: Result - error: - anyOf: - - {} - - type: 'null' - title: Error - type: object - required: - - result - - error - title: StorageAsyncJobResult - StorageAsyncJobStatus: - properties: - jobId: - type: string - format: uuid - title: Jobid - progress: - $ref: '#/components/schemas/ProgressReport' - done: - type: boolean - title: Done - type: object - required: - - jobId - - progress - - done - title: StorageAsyncJobStatus Structure: properties: key: @@ -15374,8 +15258,10 @@ components: type: boolean title: Done started: - type: string - format: date-time + anyOf: + - type: string + format: date-time + - type: 'null' title: Started type: object required: diff --git a/services/web/server/src/simcore_service_webserver/application.py b/services/web/server/src/simcore_service_webserver/application.py index abb11c29ba16..073ebb0c08b1 100644 --- a/services/web/server/src/simcore_service_webserver/application.py +++ b/services/web/server/src/simcore_service_webserver/application.py @@ -1,6 +1,4 @@ -""" Main application - -""" +"""Main application""" import logging from pprint import pformat @@ -8,6 +6,7 @@ from aiohttp import web from servicelib.aiohttp.application import create_safe_application +from simcore_service_webserver.tasks.plugin import setup_tasks from ._meta import WELCOME_DB_LISTENER_MSG, WELCOME_GC_MSG, WELCOME_MSG, info from .activity.plugin import setup_activity @@ -121,6 +120,7 @@ def create_application() -> web.Application: setup_director_v2(app) setup_dynamic_scheduler(app) setup_storage(app) + setup_tasks(app) setup_catalog(app) # resource management diff --git a/services/web/server/src/simcore_service_webserver/long_running_tasks.py b/services/web/server/src/simcore_service_webserver/long_running_tasks.py index 29dd8d7caec9..c2f842eab7a4 100644 --- a/services/web/server/src/simcore_service_webserver/long_running_tasks.py +++ b/services/web/server/src/simcore_service_webserver/long_running_tasks.py @@ -29,7 +29,7 @@ async def _test_task_context_decorator( def setup_long_running_tasks(app: web.Application) -> None: setup( app, - router_prefix=f"/{API_VTAG}/tasks", + router_prefix=f"/{API_VTAG}/tasks-legacy", handler_check_decorator=login_required, task_request_context_decorator=_webserver_request_context_decorator, ) diff --git a/services/web/server/src/simcore_service_webserver/storage/_exception_handlers.py b/services/web/server/src/simcore_service_webserver/storage/_exception_handlers.py deleted file mode 100644 index 4c68394b6a20..000000000000 --- a/services/web/server/src/simcore_service_webserver/storage/_exception_handlers.py +++ /dev/null @@ -1,33 +0,0 @@ -from models_library.api_schemas_storage.data_export_async_jobs import ( - AccessRightError, - DataExportError, - InvalidFileIdentifierError, -) -from servicelib.aiohttp import status - -from ..exception_handling import ( - ExceptionToHttpErrorMap, - HttpErrorInfo, - exception_handling_decorator, - to_exceptions_handlers_map, -) - -_TO_HTTP_ERROR_MAP: ExceptionToHttpErrorMap = { - InvalidFileIdentifierError: HttpErrorInfo( - status.HTTP_404_NOT_FOUND, - "Could not find file.", - ), - AccessRightError: HttpErrorInfo( - status.HTTP_403_FORBIDDEN, - "Accessright error.", - ), - DataExportError: HttpErrorInfo( - status.HTTP_500_INTERNAL_SERVER_ERROR, - "Could not export data.", - ), -} - - -handle_data_export_exceptions = exception_handling_decorator( - to_exceptions_handlers_map(_TO_HTTP_ERROR_MAP) -) diff --git a/services/web/server/src/simcore_service_webserver/storage/_rest.py b/services/web/server/src/simcore_service_webserver/storage/_rest.py index 6e244cef4c01..fbc419d90153 100644 --- a/services/web/server/src/simcore_service_webserver/storage/_rest.py +++ b/services/web/server/src/simcore_service_webserver/storage/_rest.py @@ -7,11 +7,14 @@ import urllib.parse from typing import Any, Final, NamedTuple from urllib.parse import quote, unquote -from uuid import UUID from aiohttp import ClientTimeout, web -from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobNameData -from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE +from models_library.api_schemas_long_running_tasks.tasks import ( + TaskGet, +) +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobNameData, +) from models_library.api_schemas_storage.storage_schemas import ( FileUploadCompleteResponse, FileUploadCompletionBody, @@ -20,9 +23,6 @@ ) from models_library.api_schemas_webserver.storage import ( DataExportPost, - StorageAsyncJobGet, - StorageAsyncJobResult, - StorageAsyncJobStatus, StoragePathComputeSizeParams, ) from models_library.projects_nodes_io import LocationID @@ -38,13 +38,7 @@ ) from servicelib.aiohttp.rest_responses import create_data_response from servicelib.common_headers import X_FORWARDED_PROTO -from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import ( - abort, - get_result, - get_status, - list_jobs, - submit_job, -) +from servicelib.rabbitmq.rpc_interfaces.storage.data_export import start_data_export from servicelib.rabbitmq.rpc_interfaces.storage.paths import ( compute_path_size as remote_compute_path_size, ) @@ -57,7 +51,7 @@ from ..models import RequestContext from ..rabbitmq import get_rabbitmq_rpc_client from ..security.decorators import permission_required -from ._exception_handlers import handle_data_export_exceptions +from ..tasks._exception_handlers import handle_data_export_exceptions from .schemas import StorageFileIDStr from .settings import StorageSettings, get_plugin_settings @@ -199,8 +193,15 @@ async def compute_path_size(request: web.Request) -> web.Response: path=path_params.path, ) + _job_id = f"{async_job.job_id}" return create_data_response( - async_job, + TaskGet( + task_id=_job_id, + task_name=_job_id, + status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=_job_id)))}", + abort_href=f"{request.url.with_path(str(request.app.router['abort_async_job'].url_for(task_id=_job_id)))}", + result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=_job_id)))}", + ), status=status.HTTP_202_ACCEPTED, ) @@ -452,10 +453,8 @@ class _PathParams(BaseModel): data_export_post = await parse_request_body_as( model_schema_cls=DataExportPost, request=request ) - async_job_rpc_get = await submit_job( + async_job_rpc_get = await start_data_export( rabbitmq_rpc_client=rabbitmq_rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - method_name="start_data_export", job_id_data=AsyncJobNameData( user_id=_req_ctx.user_id, product_name=_req_ctx.product_name ), @@ -463,123 +462,14 @@ class _PathParams(BaseModel): location_id=_path_params.location_id, ), ) + _job_id = f"{async_job_rpc_get.job_id}" return create_data_response( - StorageAsyncJobGet.from_rpc_schema(async_job_rpc_get), - status=status.HTTP_202_ACCEPTED, - ) - - -@routes.get( - _storage_prefix + "/async-jobs", - name="get_async_jobs", -) -@login_required -@permission_required("storage.files.*") -@handle_data_export_exceptions -async def get_async_jobs(request: web.Request) -> web.Response: - _req_ctx = RequestContext.model_validate(request) - - rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) - - user_async_jobs = await list_jobs( - rabbitmq_rpc_client=rabbitmq_rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id_data=AsyncJobNameData( - user_id=_req_ctx.user_id, product_name=_req_ctx.product_name - ), - filter_="", - ) - return create_data_response( - [StorageAsyncJobGet.from_rpc_schema(job) for job in user_async_jobs], - status=status.HTTP_200_OK, - ) - - -@routes.get( - _storage_prefix + "/async-jobs/{job_id}/status", - name="get_async_job_status", -) -@login_required -@permission_required("storage.files.*") -@handle_data_export_exceptions -async def get_async_job_status(request: web.Request) -> web.Response: - class _PathParams(BaseModel): - job_id: UUID - - _req_ctx = RequestContext.model_validate(request) - rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) - - async_job_get = parse_request_path_parameters_as(_PathParams, request) - async_job_rpc_status = await get_status( - rabbitmq_rpc_client=rabbitmq_rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id=async_job_get.job_id, - job_id_data=AsyncJobNameData( - user_id=_req_ctx.user_id, product_name=_req_ctx.product_name - ), - ) - return create_data_response( - StorageAsyncJobStatus.from_rpc_schema(async_job_rpc_status), - status=status.HTTP_200_OK, - ) - - -@routes.post( - _storage_prefix + "/async-jobs/{job_id}:abort", - name="abort_async_job", -) -@login_required -@permission_required("storage.files.*") -@handle_data_export_exceptions -async def abort_async_job(request: web.Request) -> web.Response: - class _PathParams(BaseModel): - job_id: UUID - - _req_ctx = RequestContext.model_validate(request) - - rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) - async_job_get = parse_request_path_parameters_as(_PathParams, request) - async_job_rpc_abort = await abort( - rabbitmq_rpc_client=rabbitmq_rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id=async_job_get.job_id, - job_id_data=AsyncJobNameData( - user_id=_req_ctx.user_id, product_name=_req_ctx.product_name - ), - ) - return web.Response( - status=( - status.HTTP_200_OK - if async_job_rpc_abort.result - else status.HTTP_500_INTERNAL_SERVER_ERROR - ) - ) - - -@routes.get( - _storage_prefix + "/async-jobs/{job_id}/result", - name="get_async_job_result", -) -@login_required -@permission_required("storage.files.*") -@handle_data_export_exceptions -async def get_async_job_result(request: web.Request) -> web.Response: - class _PathParams(BaseModel): - job_id: UUID - - _req_ctx = RequestContext.model_validate(request) - - rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) - async_job_get = parse_request_path_parameters_as(_PathParams, request) - async_job_rpc_result = await get_result( - rabbitmq_rpc_client=rabbitmq_rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id=async_job_get.job_id, - job_id_data=AsyncJobNameData( - user_id=_req_ctx.user_id, product_name=_req_ctx.product_name + TaskGet( + task_id=_job_id, + task_name=_job_id, + status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=_job_id)))}", + abort_href=f"{request.url.with_path(str(request.app.router['abort_async_job'].url_for(task_id=_job_id)))}", + result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=_job_id)))}", ), - ) - return create_data_response( - StorageAsyncJobResult.from_rpc_schema(async_job_rpc_result), - status=status.HTTP_200_OK, + status=status.HTTP_202_ACCEPTED, ) diff --git a/services/web/server/src/simcore_service_webserver/tasks/__init__.py b/services/web/server/src/simcore_service_webserver/tasks/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/services/web/server/src/simcore_service_webserver/tasks/_exception_handlers.py b/services/web/server/src/simcore_service_webserver/tasks/_exception_handlers.py new file mode 100644 index 000000000000..8e4f467cf474 --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/tasks/_exception_handlers.py @@ -0,0 +1,60 @@ +from models_library.api_schemas_rpc_async_jobs.exceptions import ( + JobAbortedError, + JobError, + JobMissingError, + JobNotDoneError, + JobSchedulerError, + JobStatusError, +) +from models_library.api_schemas_storage.data_export_async_jobs import ( + AccessRightError, + InvalidFileIdentifierError, +) +from servicelib.aiohttp import status + +from ..exception_handling import ( + ExceptionToHttpErrorMap, + HttpErrorInfo, + exception_handling_decorator, + to_exceptions_handlers_map, +) + +_TO_HTTP_ERROR_MAP: ExceptionToHttpErrorMap = { + InvalidFileIdentifierError: HttpErrorInfo( + status.HTTP_404_NOT_FOUND, + "Could not find file {file_id}", + ), + AccessRightError: HttpErrorInfo( + status.HTTP_403_FORBIDDEN, + "Accessright error: user {user_id} does not have access to file {file_id} with location {location_id}", + ), + JobAbortedError: HttpErrorInfo( + status.HTTP_410_GONE, + "Task {job_id} is aborted", + ), + JobError: HttpErrorInfo( + status.HTTP_500_INTERNAL_SERVER_ERROR, + "Task {job_id} failed with exception type {exc_type} and message {exc_msg}", + ), + JobNotDoneError: HttpErrorInfo( + status.HTTP_404_NOT_FOUND, + "task {job_id} is not done yet", + ), + JobMissingError: HttpErrorInfo( + status.HTTP_404_NOT_FOUND, + "No task with id: {job_id}", + ), + JobSchedulerError: HttpErrorInfo( + status.HTTP_500_INTERNAL_SERVER_ERROR, + "Encountered an error with the task scheduling system", + ), + JobStatusError: HttpErrorInfo( + status.HTTP_500_INTERNAL_SERVER_ERROR, + "Encountered an error while getting the status of task {job_id}", + ), +} + + +handle_data_export_exceptions = exception_handling_decorator( + to_exceptions_handlers_map(_TO_HTTP_ERROR_MAP) +) diff --git a/services/web/server/src/simcore_service_webserver/tasks/_rest.py b/services/web/server/src/simcore_service_webserver/tasks/_rest.py new file mode 100644 index 000000000000..809870e594af --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/tasks/_rest.py @@ -0,0 +1,188 @@ +"""Handlers exposed by storage subsystem + +Mostly resolves and redirect to storage API +""" + +import logging +from typing import Final +from uuid import UUID + +from aiohttp import web +from models_library.api_schemas_long_running_tasks.base import TaskProgress +from models_library.api_schemas_long_running_tasks.tasks import ( + TaskGet, + TaskResult, + TaskStatus, +) +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobId, + AsyncJobNameData, +) +from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE +from models_library.generics import Envelope +from pydantic import BaseModel +from servicelib.aiohttp import status +from servicelib.aiohttp.client_session import get_client_session +from servicelib.aiohttp.requests_validation import ( + parse_request_path_parameters_as, +) +from servicelib.aiohttp.rest_responses import create_data_response +from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs + +from .._meta import API_VTAG +from ..login.decorators import login_required +from ..models import RequestContext +from ..rabbitmq import get_rabbitmq_rpc_client +from ..security.decorators import permission_required +from ._exception_handlers import handle_data_export_exceptions + +log = logging.getLogger(__name__) + + +routes = web.RouteTableDef() + +_task_prefix: Final[str] = f"/{API_VTAG}/tasks" + + +@routes.get( + _task_prefix, + name="get_async_jobs", +) +@login_required +@permission_required("storage.files.*") +@handle_data_export_exceptions +async def get_async_jobs(request: web.Request) -> web.Response: + session = get_client_session(request.app) + async with session.request( + "GET", + request.url.with_path(str(request.app.router["list_tasks"].url_for())), + cookies=request.cookies, + ) as resp: + if resp.status != status.HTTP_200_OK: + return web.Response( + status=resp.status, + body=await resp.read(), + content_type=resp.content_type, + ) + inprocess_tasks = ( + Envelope[list[TaskGet]].model_validate_json(await resp.text()).data + ) + assert inprocess_tasks is not None # nosec + + _req_ctx = RequestContext.model_validate(request) + + rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) + + user_async_jobs = await async_jobs.list_jobs( + rabbitmq_rpc_client=rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id_data=AsyncJobNameData( + user_id=_req_ctx.user_id, product_name=_req_ctx.product_name + ), + filter_="", + ) + return create_data_response( + [ + TaskGet( + task_id=f"{job.job_id}", + task_name=f"{job.job_id}", + status_href=f"{request.url.with_path(str(request.app.router['get_async_job_status'].url_for(task_id=str(job.job_id))))}", + abort_href=f"{request.url.with_path(str(request.app.router['abort_async_job'].url_for(task_id=str(job.job_id))))}", + result_href=f"{request.url.with_path(str(request.app.router['get_async_job_result'].url_for(task_id=str(job.job_id))))}", + ) + for job in user_async_jobs + ] + + inprocess_tasks, + status=status.HTTP_200_OK, + ) + + +class _StorageAsyncJobId(BaseModel): + task_id: AsyncJobId + + +@routes.get( + _task_prefix + "/{task_id}", + name="get_async_job_status", +) +@login_required +@handle_data_export_exceptions +async def get_async_job_status(request: web.Request) -> web.Response: + + _req_ctx = RequestContext.model_validate(request) + rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) + + async_job_get = parse_request_path_parameters_as(_StorageAsyncJobId, request) + async_job_rpc_status = await async_jobs.status( + rabbitmq_rpc_client=rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=async_job_get.task_id, + job_id_data=AsyncJobNameData( + user_id=_req_ctx.user_id, product_name=_req_ctx.product_name + ), + ) + _task_id = f"{async_job_rpc_status.job_id}" + return create_data_response( + TaskStatus( + task_progress=TaskProgress( + task_id=_task_id, percent=async_job_rpc_status.progress.actual_value + ), + done=async_job_rpc_status.done, + started=None, + ), + status=status.HTTP_200_OK, + ) + + +@routes.delete( + _task_prefix + "/{task_id}", + name="abort_async_job", +) +@login_required +@permission_required("storage.files.*") +@handle_data_export_exceptions +async def abort_async_job(request: web.Request) -> web.Response: + + _req_ctx = RequestContext.model_validate(request) + + rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) + async_job_get = parse_request_path_parameters_as(_StorageAsyncJobId, request) + await async_jobs.cancel( + rabbitmq_rpc_client=rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=async_job_get.task_id, + job_id_data=AsyncJobNameData( + user_id=_req_ctx.user_id, product_name=_req_ctx.product_name + ), + ) + return web.Response(status=status.HTTP_204_NO_CONTENT) + + +@routes.get( + _task_prefix + "/{task_id}/result", + name="get_async_job_result", +) +@login_required +@permission_required("storage.files.*") +@handle_data_export_exceptions +async def get_async_job_result(request: web.Request) -> web.Response: + class _PathParams(BaseModel): + task_id: UUID + + _req_ctx = RequestContext.model_validate(request) + + rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) + async_job_get = parse_request_path_parameters_as(_PathParams, request) + async_job_rpc_result = await async_jobs.result( + rabbitmq_rpc_client=rabbitmq_rpc_client, + rpc_namespace=STORAGE_RPC_NAMESPACE, + job_id=async_job_get.task_id, + job_id_data=AsyncJobNameData( + user_id=_req_ctx.user_id, product_name=_req_ctx.product_name + ), + ) + + return create_data_response( + TaskResult(result=async_job_rpc_result.result, error=None), + status=status.HTTP_200_OK, + ) diff --git a/services/web/server/src/simcore_service_webserver/tasks/plugin.py b/services/web/server/src/simcore_service_webserver/tasks/plugin.py new file mode 100644 index 000000000000..e9bfdeea222e --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/tasks/plugin.py @@ -0,0 +1,9 @@ +from aiohttp import web + +from ..rest.plugin import setup_rest +from . import _rest + + +def setup_tasks(app: web.Application): + setup_rest(app) + app.router.add_routes(_rest.routes) diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py index 9eb5d42407f3..3ea1ec402306 100644 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py +++ b/services/web/server/tests/unit/with_dbs/01/storage/test_storage.py @@ -4,14 +4,37 @@ # pylint: disable=too-many-arguments from collections.abc import Callable -from typing import Any +from pathlib import Path +from typing import Any, Final from urllib.parse import quote import pytest from aiohttp.test_utils import TestClient from faker import Faker from fastapi_pagination.cursor import CursorPage -from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobGet, AsyncJobId +from models_library.api_schemas_long_running_tasks.tasks import ( + TaskGet, + TaskResult, + TaskStatus, +) +from models_library.api_schemas_rpc_async_jobs.async_jobs import ( + AsyncJobAbort, + AsyncJobGet, + AsyncJobId, + AsyncJobResult, + AsyncJobStatus, +) +from models_library.api_schemas_rpc_async_jobs.exceptions import ( + JobAbortedError, + JobError, + JobMissingError, + JobNotDoneError, + JobSchedulerError, +) +from models_library.api_schemas_storage.data_export_async_jobs import ( + AccessRightError, + InvalidFileIdentifierError, +) from models_library.api_schemas_storage.storage_schemas import ( DatasetMetaDataGet, FileLocation, @@ -19,22 +42,40 @@ FileUploadSchema, PathMetaDataGet, ) -from models_library.api_schemas_webserver.storage import StorageAsyncJobGet +from models_library.api_schemas_webserver._base import OutputSchema +from models_library.api_schemas_webserver.storage import ( + DataExportPost, +) +from models_library.generics import Envelope +from models_library.progress_bar import ProgressReport from models_library.projects_nodes_io import LocationID, StorageFileID from pydantic import TypeAdapter from pytest_mock import MockerFixture from pytest_simcore.helpers.assert_checks import assert_status +from pytest_simcore.helpers.webserver_login import UserInfoDict from servicelib.aiohttp import status +from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import ( - submit_job, + submit, ) +from servicelib.rabbitmq.rpc_interfaces.storage.data_export import start_data_export from simcore_postgres_database.models.users import UserRole +from yarl import URL API_VERSION = "v0" PREFIX = "/" + API_VERSION + "/storage" +_faker = Faker() +_user_roles: Final[list[UserRole]] = [ + UserRole.GUEST, + UserRole.USER, + UserRole.TESTER, + UserRole.PRODUCT_OWNER, + UserRole.ADMIN, +] + @pytest.mark.parametrize( "user_role,expected", @@ -133,7 +174,7 @@ async def test_compute_path_size( backend_result_or_exception: Any, ): create_storage_paths_rpc_client_mock( - submit_job.__name__, + submit.__name__, backend_result_or_exception, ) @@ -146,7 +187,7 @@ async def test_compute_path_size( resp = await client.post(f"{url}") data, error = await assert_status(resp, expected) if not error: - TypeAdapter(StorageAsyncJobGet).validate_python(data) + TypeAdapter(TaskGet).validate_python(data) @pytest.mark.parametrize( @@ -317,3 +358,285 @@ async def test_upload_file( data, error = await assert_status(resp, status.HTTP_204_NO_CONTENT) assert not error assert not data + + +@pytest.fixture +def create_storage_rpc_client_mock( + mocker: MockerFixture, +) -> Callable[[str, str, Any], None]: + def _(module: str, method: str, result_or_exception: Any): + def side_effect(*args, **kwargs): + if isinstance(result_or_exception, Exception): + raise result_or_exception + + return result_or_exception + + for fct in (f"{module}.{method}",): + mocker.patch(fct, side_effect=side_effect) + + return _ + + +@pytest.mark.parametrize("user_role", _user_roles) +@pytest.mark.parametrize( + "backend_result_or_exception, expected_status", + [ + (AsyncJobGet(job_id=AsyncJobId(f"{_faker.uuid4()}")), status.HTTP_202_ACCEPTED), + ( + InvalidFileIdentifierError(file_id=Path("/my/file")), + status.HTTP_404_NOT_FOUND, + ), + ( + AccessRightError( + user_id=_faker.pyint(min_value=0), file_id=Path("/my/file") + ), + status.HTTP_403_FORBIDDEN, + ), + (JobSchedulerError(exc=_faker.text()), status.HTTP_500_INTERNAL_SERVER_ERROR), + ], + ids=lambda x: type(x).__name__, +) +async def test_data_export( + user_role: UserRole, + logged_user: UserInfoDict, + client: TestClient, + create_storage_rpc_client_mock: Callable[[str, str, Any], None], + faker: Faker, + backend_result_or_exception: Any, + expected_status: int, +): + create_storage_rpc_client_mock( + "simcore_service_webserver.storage._rest", + start_data_export.__name__, + backend_result_or_exception, + ) + + _body = DataExportPost( + paths=[f"{faker.uuid4()}/{faker.uuid4()}/{faker.file_name()}"] + ) + response = await client.post( + f"/{API_VERSION}/storage/locations/0/export-data", data=_body.model_dump_json() + ) + assert response.status == expected_status + if response.status == status.HTTP_202_ACCEPTED: + Envelope[TaskGet].model_validate(await response.json()) + + +@pytest.mark.parametrize("user_role", _user_roles) +@pytest.mark.parametrize( + "backend_result_or_exception, expected_status", + [ + ( + AsyncJobStatus( + job_id=AsyncJobId(f"{_faker.uuid4()}"), + progress=ProgressReport(actual_value=0.5, total=1.0), + done=False, + ), + status.HTTP_200_OK, + ), + (JobSchedulerError(exc=_faker.text()), status.HTTP_500_INTERNAL_SERVER_ERROR), + (JobMissingError(job_id=_faker.uuid4()), status.HTTP_404_NOT_FOUND), + ], + ids=lambda x: type(x).__name__, +) +async def test_get_async_jobs_status( + user_role: UserRole, + logged_user: UserInfoDict, + client: TestClient, + create_storage_rpc_client_mock: Callable[[str, str, Any], None], + backend_result_or_exception: Any, + expected_status: int, +): + _job_id = AsyncJobId(_faker.uuid4()) + create_storage_rpc_client_mock( + "simcore_service_webserver.tasks._rest", + f"async_jobs.{async_jobs.status.__name__}", + backend_result_or_exception, + ) + + response = await client.get(f"/{API_VERSION}/tasks/{_job_id}") + assert response.status == expected_status + if response.status == status.HTTP_200_OK: + response_body_data = ( + Envelope[TaskStatus].model_validate(await response.json()).data + ) + assert response_body_data is not None + + +@pytest.mark.parametrize("user_role", _user_roles) +@pytest.mark.parametrize( + "backend_result_or_exception, expected_status", + [ + ( + AsyncJobAbort(result=True, job_id=AsyncJobId(_faker.uuid4())), + status.HTTP_204_NO_CONTENT, + ), + (JobSchedulerError(exc=_faker.text()), status.HTTP_500_INTERNAL_SERVER_ERROR), + (JobMissingError(job_id=_faker.uuid4()), status.HTTP_404_NOT_FOUND), + ], + ids=lambda x: type(x).__name__, +) +async def test_abort_async_jobs( + user_role: UserRole, + logged_user: UserInfoDict, + client: TestClient, + create_storage_rpc_client_mock: Callable[[str, str, Any], None], + faker: Faker, + backend_result_or_exception: Any, + expected_status: int, +): + _job_id = AsyncJobId(faker.uuid4()) + create_storage_rpc_client_mock( + "simcore_service_webserver.tasks._rest", + f"async_jobs.{async_jobs.cancel.__name__}", + backend_result_or_exception, + ) + + response = await client.delete(f"/{API_VERSION}/tasks/{_job_id}") + assert response.status == expected_status + + +@pytest.mark.parametrize("user_role", _user_roles) +@pytest.mark.parametrize( + "backend_result_or_exception, expected_status", + [ + (JobNotDoneError(job_id=_faker.uuid4()), status.HTTP_404_NOT_FOUND), + (AsyncJobResult(result=None), status.HTTP_200_OK), + (JobError(job_id=_faker.uuid4()), status.HTTP_500_INTERNAL_SERVER_ERROR), + (JobAbortedError(job_id=_faker.uuid4()), status.HTTP_410_GONE), + (JobSchedulerError(exc=_faker.text()), status.HTTP_500_INTERNAL_SERVER_ERROR), + (JobMissingError(job_id=_faker.uuid4()), status.HTTP_404_NOT_FOUND), + ], + ids=lambda x: type(x).__name__, +) +async def test_get_async_job_result( + user_role: UserRole, + logged_user: UserInfoDict, + client: TestClient, + create_storage_rpc_client_mock: Callable[[str, str, Any], None], + faker: Faker, + backend_result_or_exception: Any, + expected_status: int, +): + _job_id = AsyncJobId(faker.uuid4()) + create_storage_rpc_client_mock( + "simcore_service_webserver.tasks._rest", + f"async_jobs.{async_jobs.result.__name__}", + backend_result_or_exception, + ) + + response = await client.get(f"/{API_VERSION}/tasks/{_job_id}/result") + assert response.status == expected_status + + +@pytest.mark.parametrize("user_role", _user_roles) +@pytest.mark.parametrize( + "backend_result_or_exception, expected_status", + [ + ( + [ + AsyncJobGet( + job_id=AsyncJobId(_faker.uuid4()), + ) + ], + status.HTTP_200_OK, + ), + (JobSchedulerError(exc=_faker.text()), status.HTTP_500_INTERNAL_SERVER_ERROR), + ], + ids=lambda x: type(x).__name__, +) +async def test_get_user_async_jobs( + user_role: UserRole, + logged_user: UserInfoDict, + client: TestClient, + create_storage_rpc_client_mock: Callable[[str, str, Any], None], + backend_result_or_exception: Any, + expected_status: int, +): + create_storage_rpc_client_mock( + "simcore_service_webserver.tasks._rest", + f"async_jobs.{async_jobs.list_jobs.__name__}", + backend_result_or_exception, + ) + + response = await client.get(f"/{API_VERSION}/tasks") + assert response.status == expected_status + if response.status == status.HTTP_200_OK: + Envelope[list[TaskGet]].model_validate(await response.json()) + + +@pytest.mark.parametrize("user_role", _user_roles) +@pytest.mark.parametrize( + "http_method, href, backend_method, backend_object, return_status, return_schema", + [ + ( + "GET", + "status_href", + async_jobs.status.__name__, + AsyncJobStatus( + job_id=AsyncJobId(_faker.uuid4()), + progress=ProgressReport(actual_value=0.5, total=1.0), + done=False, + ), + status.HTTP_200_OK, + TaskStatus, + ), + ( + "DELETE", + "abort_href", + async_jobs.cancel.__name__, + AsyncJobAbort(result=True, job_id=AsyncJobId(_faker.uuid4())), + status.HTTP_204_NO_CONTENT, + None, + ), + ( + "GET", + "result_href", + async_jobs.result.__name__, + AsyncJobResult(result=None), + status.HTTP_200_OK, + TaskResult, + ), + ], +) +async def test_get_async_job_links( + user_role: UserRole, + logged_user: UserInfoDict, + client: TestClient, + create_storage_rpc_client_mock: Callable[[str, str, Any], None], + faker: Faker, + http_method: str, + href: str, + backend_method: str, + backend_object: Any, + return_status: int, + return_schema: OutputSchema | None, +): + create_storage_rpc_client_mock( + "simcore_service_webserver.storage._rest", + start_data_export.__name__, + AsyncJobGet(job_id=AsyncJobId(f"{_faker.uuid4()}")), + ) + + _body = DataExportPost( + paths=[f"{faker.uuid4()}/{faker.uuid4()}/{faker.file_name()}"] + ) + response = await client.post( + f"/{API_VERSION}/storage/locations/0/export-data", data=_body.model_dump_json() + ) + assert response.status == status.HTTP_202_ACCEPTED + response_body_data = Envelope[TaskGet].model_validate(await response.json()).data + assert response_body_data is not None + + # Call the different links and check the correct model and return status + create_storage_rpc_client_mock( + "simcore_service_webserver.tasks._rest", + f"async_jobs.{backend_method}", + backend_object, + ) + response = await client.request( + http_method, URL(getattr(response_body_data, href)).path + ) + assert response.status == return_status + if return_schema: + Envelope[return_schema].model_validate(await response.json()) diff --git a/services/web/server/tests/unit/with_dbs/01/storage/test_storage_rpc.py b/services/web/server/tests/unit/with_dbs/01/storage/test_storage_rpc.py deleted file mode 100644 index ba7b910b24d2..000000000000 --- a/services/web/server/tests/unit/with_dbs/01/storage/test_storage_rpc.py +++ /dev/null @@ -1,210 +0,0 @@ -# pylint: disable=redefined-outer-name -# pylint: disable=unused-argument -from collections.abc import Callable -from datetime import datetime -from pathlib import Path -from typing import Any - -import pytest -from aiohttp.test_utils import TestClient -from faker import Faker -from models_library.api_schemas_rpc_async_jobs.async_jobs import ( - AsyncJobAbort, - AsyncJobGet, - AsyncJobId, - AsyncJobResult, - AsyncJobStatus, -) -from models_library.api_schemas_rpc_async_jobs.exceptions import ( - ResultError, - StatusError, -) -from models_library.api_schemas_storage.data_export_async_jobs import ( - AccessRightError, - DataExportError, - InvalidFileIdentifierError, -) -from models_library.api_schemas_webserver.storage import ( - DataExportPost, - StorageAsyncJobGet, -) -from models_library.generics import Envelope -from models_library.progress_bar import ProgressReport -from pytest_mock import MockerFixture -from pytest_simcore.helpers.webserver_login import UserInfoDict -from servicelib.aiohttp import status -from servicelib.rabbitmq.rpc_interfaces.async_jobs.async_jobs import ( - abort, - get_result, - get_status, - list_jobs, - submit_job, -) -from simcore_postgres_database.models.users import UserRole - -_faker = Faker() - - -@pytest.fixture -def create_storage_rpc_client_mock(mocker: MockerFixture) -> Callable[[str, Any], None]: - def _(method: str, result_or_exception: Any): - def side_effect(*args, **kwargs): - if isinstance(result_or_exception, Exception): - raise result_or_exception - - return result_or_exception - - for fct in (f"simcore_service_webserver.storage._rest.{method}",): - mocker.patch(fct, side_effect=side_effect) - - return _ - - -@pytest.mark.parametrize("user_role", [UserRole.USER]) -@pytest.mark.parametrize( - "backend_result_or_exception", - [ - AsyncJobGet(job_id=AsyncJobId(f"{_faker.uuid4()}")), - InvalidFileIdentifierError(file_id=Path("/my/file")), - AccessRightError(user_id=_faker.pyint(min_value=0), file_id=Path("/my/file")), - DataExportError(job_id=_faker.pyint(min_value=0)), - ], - ids=lambda x: type(x).__name__, -) -async def test_data_export( - user_role: UserRole, - logged_user: UserInfoDict, - client: TestClient, - create_storage_rpc_client_mock: Callable[[str, Any], None], - faker: Faker, - backend_result_or_exception: Any, -): - create_storage_rpc_client_mock( - submit_job.__name__, - backend_result_or_exception, - ) - - _body = DataExportPost( - paths=[f"{faker.uuid4()}/{faker.uuid4()}/{faker.file_name()}"] - ) - response = await client.post( - "/v0/storage/locations/0/export-data", data=_body.model_dump_json() - ) - if isinstance(backend_result_or_exception, AsyncJobGet): - assert response.status == status.HTTP_202_ACCEPTED - Envelope[StorageAsyncJobGet].model_validate(await response.json()) - elif isinstance(backend_result_or_exception, InvalidFileIdentifierError): - assert response.status == status.HTTP_404_NOT_FOUND - elif isinstance(backend_result_or_exception, AccessRightError): - assert response.status == status.HTTP_403_FORBIDDEN - else: - assert isinstance(backend_result_or_exception, DataExportError) - assert response.status == status.HTTP_500_INTERNAL_SERVER_ERROR - - -@pytest.mark.parametrize("user_role", [UserRole.USER]) -@pytest.mark.parametrize( - "backend_result_or_exception", - [ - AsyncJobStatus( - job_id=f"{_faker.uuid4()}", - progress=ProgressReport(actual_value=0.5, total=1.0), - done=False, - started=datetime.now(), - stopped=None, - ), - StatusError(job_id=_faker.uuid4()), - ], - ids=lambda x: type(x).__name__, -) -async def test_get_async_jobs_status( - user_role: UserRole, - logged_user: UserInfoDict, - client: TestClient, - create_storage_rpc_client_mock: Callable[[str, Any], None], - backend_result_or_exception: Any, -): - _job_id = AsyncJobId(_faker.uuid4()) - create_storage_rpc_client_mock(get_status.__name__, backend_result_or_exception) - - response = await client.get(f"/v0/storage/async-jobs/{_job_id}/status") - if isinstance(backend_result_or_exception, AsyncJobStatus): - assert response.status == status.HTTP_200_OK - response_body_data = ( - Envelope[StorageAsyncJobGet].model_validate(await response.json()).data - ) - assert response_body_data is not None - elif isinstance(backend_result_or_exception, StatusError): - assert response.status == status.HTTP_500_INTERNAL_SERVER_ERROR - else: - pytest.fail("Incorrectly configured test") - - -@pytest.mark.parametrize("user_role", [UserRole.USER]) -@pytest.mark.parametrize("abort_success", [True, False]) -async def test_abort_async_jobs( - user_role: UserRole, - logged_user: UserInfoDict, - client: TestClient, - create_storage_rpc_client_mock: Callable[[str, Any], None], - faker: Faker, - abort_success: bool, -): - _job_id = AsyncJobId(faker.uuid4()) - create_storage_rpc_client_mock( - abort.__name__, AsyncJobAbort(result=abort_success, job_id=_job_id) - ) - - response = await client.post(f"/v0/storage/async-jobs/{_job_id}:abort") - - if abort_success: - assert response.status == status.HTTP_200_OK - else: - assert response.status == status.HTTP_500_INTERNAL_SERVER_ERROR - - -@pytest.mark.parametrize("user_role", [UserRole.USER]) -@pytest.mark.parametrize( - "backend_result_or_exception", - [ - AsyncJobResult(result=None, error=_faker.text()), - ResultError(job_id=_faker.uuid4()), - ], - ids=lambda x: type(x).__name__, -) -async def test_get_async_job_result( - user_role: UserRole, - logged_user: UserInfoDict, - client: TestClient, - create_storage_rpc_client_mock: Callable[[str, Any], None], - faker: Faker, - backend_result_or_exception: Any, -): - _job_id = AsyncJobId(faker.uuid4()) - create_storage_rpc_client_mock(get_result.__name__, backend_result_or_exception) - - response = await client.get(f"/v0/storage/async-jobs/{_job_id}/result") - - if isinstance(backend_result_or_exception, AsyncJobResult): - assert response.status == status.HTTP_200_OK - elif isinstance(backend_result_or_exception, ResultError): - assert response.status == status.HTTP_500_INTERNAL_SERVER_ERROR - else: - pytest.fail("Incorrectly configured test") - - -@pytest.mark.parametrize("user_role", [UserRole.USER]) -async def test_get_user_async_jobs( - user_role: UserRole, - logged_user: UserInfoDict, - client: TestClient, - create_storage_rpc_client_mock: Callable[[str, Any], None], -): - create_storage_rpc_client_mock( - list_jobs.__name__, [StorageAsyncJobGet(job_id=AsyncJobId(_faker.uuid4()))] - ) - - response = await client.get("/v0/storage/async-jobs") - - assert response.status == status.HTTP_200_OK - Envelope[list[StorageAsyncJobGet]].model_validate(await response.json()) diff --git a/services/web/server/tests/unit/with_dbs/01/test_long_running_tasks.py b/services/web/server/tests/unit/with_dbs/01/test_long_running_tasks.py index 4e3f10a9c4de..c6f58f29ee1a 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_long_running_tasks.py +++ b/services/web/server/tests/unit/with_dbs/01/test_long_running_tasks.py @@ -2,11 +2,15 @@ # pylint: disable=too-many-arguments # pylint: disable=unused-argument # pylint: disable=unused-variable +# pylint: disable=no-self-use +# pylint: disable=no-self-argument from typing import Any import pytest from aiohttp.test_utils import TestClient +from faker import Faker +from pytest_mock import MockerFixture from pytest_simcore.helpers.assert_checks import assert_status from pytest_simcore.helpers.webserver_parametrizations import ( ExpectedResponse, @@ -63,3 +67,23 @@ async def test_listing_tasks_empty( assert not data return assert data == [] + + +@pytest.mark.parametrize("user_role", [UserRole.GUEST, UserRole.TESTER, UserRole.USER]) +async def test_listing_tasks_with_list_inprocess_tasks_error( + client: TestClient, logged_user, faker: Faker, mocker: MockerFixture +): + assert client.app + + class _DummyTaskManager: + def list_tasks(self, *args, **kwargs): + raise Exception() # pylint: disable=broad-exception-raised + + mocker.patch( + "servicelib.aiohttp.long_running_tasks._routes.get_tasks_manager", + return_value=_DummyTaskManager(), + ) + + _async_jobs_listing_path = client.app.router["get_async_jobs"].url_for() + resp = await client.request("GET", f"{_async_jobs_listing_path}") + assert resp.status == status.HTTP_500_INTERNAL_SERVER_ERROR diff --git a/services/web/server/tests/unit/with_dbs/03/test__openapi_specs.py b/services/web/server/tests/unit/with_dbs/03/test__openapi_specs.py index 886817bbf263..9c6ad78f4a8d 100644 --- a/services/web/server/tests/unit/with_dbs/03/test__openapi_specs.py +++ b/services/web/server/tests/unit/with_dbs/03/test__openapi_specs.py @@ -13,6 +13,7 @@ from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict from pytest_simcore.openapi_specs import Entrypoint +from simcore_service_webserver._meta import API_VTAG from simcore_service_webserver.application import create_application from simcore_service_webserver.application_settings import get_application_settings from simcore_service_webserver.rest._utils import get_openapi_specs_path @@ -75,7 +76,16 @@ def test_app_named_resources_against_openapi_specs( openapi_specs_entrypoints: set[Entrypoint], app_rest_entrypoints: set[Entrypoint], ): - assert app_rest_entrypoints == openapi_specs_entrypoints + # remove task-legacy routes. These should not be exposed. + # this test compares directly against the openapi specs. In future it would be + # cleaner to compare against the fastapi app entry points in specs and + # avoid including the endpoints there + required_entry_points = { + e + for e in app_rest_entrypoints + if not e.path.startswith(f"/{API_VTAG}/tasks-legacy") + } + assert required_entry_points == openapi_specs_entrypoints # NOTE: missing here is: # - input schemas (path, query and body)