diff --git a/services/api-server/src/simcore_service_api_server/_service_function_jobs.py b/services/api-server/src/simcore_service_api_server/_service_function_jobs.py index 9998e23cf5ff..1c04cc36e91e 100644 --- a/services/api-server/src/simcore_service_api_server/_service_function_jobs.py +++ b/services/api-server/src/simcore_service_api_server/_service_function_jobs.py @@ -1,17 +1,60 @@ from dataclasses import dataclass +import jsonschema from common_library.exclude import as_dict_exclude_none from models_library.functions import ( + FunctionClass, FunctionID, + FunctionInputs, + FunctionInputsList, + FunctionJobCollection, FunctionJobCollectionID, FunctionJobID, + FunctionJobStatus, + FunctionSchemaClass, + ProjectFunctionJob, + RegisteredFunction, RegisteredFunctionJob, + RegisteredFunctionJobCollection, + SolverFunctionJob, +) +from models_library.functions_errors import ( + FunctionExecuteAccessDeniedError, + FunctionInputsValidationError, + FunctionsExecuteApiAccessDeniedError, + UnsupportedFunctionClassError, + UnsupportedFunctionFunctionJobClassCombinationError, ) from models_library.products import ProductName +from models_library.projects import ProjectID +from models_library.projects_nodes_io import NodeID +from models_library.projects_state import RunningState from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt from models_library.rpc_pagination import PageLimitInt from models_library.users import UserID -from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient +from pydantic import ValidationError + +from ._service_jobs import JobService +from .models.api_resources import JobLinks +from .models.schemas.jobs import ( + JobInputs, + JobPricingSpecification, +) +from .services_rpc.wb_api_server import WbApiRpcClient + + +def join_inputs( + default_inputs: FunctionInputs | None, + function_inputs: FunctionInputs | None, +) -> FunctionInputs: + if default_inputs is None: + return function_inputs + + if function_inputs is None: + return default_inputs + + # last dict will override defaults + return {**default_inputs, **function_inputs} @dataclass(frozen=True, kw_only=True) @@ -19,6 +62,7 @@ class FunctionJobService: user_id: UserID product_name: ProductName _web_rpc_client: WbApiRpcClient + _job_service: JobService async def list_function_jobs( self, @@ -43,3 +87,229 @@ async def list_function_jobs( filter_by_function_job_collection_id=filter_by_function_job_collection_id, **pagination_kwargs, ) + + async def validate_function_inputs( + self, *, function_id: FunctionID, inputs: FunctionInputs + ) -> tuple[bool, str]: + function = await self._web_rpc_client.get_function( + function_id=function_id, + user_id=self.user_id, + product_name=self.product_name, + ) + + if ( + function.input_schema is None + or function.input_schema.schema_content is None + ): + return True, "No input schema defined for this function" + + if function.input_schema.schema_class == FunctionSchemaClass.json_schema: + try: + jsonschema.validate( + instance=inputs, schema=function.input_schema.schema_content + ) + except ValidationError as err: + return False, str(err) + return True, "Inputs are valid" + + return ( + False, + f"Unsupported function schema class {function.input_schema.schema_class}", + ) + + async def inspect_function_job( + self, function: RegisteredFunction, function_job: RegisteredFunctionJob + ) -> FunctionJobStatus: + + stored_job_status = await self._web_rpc_client.get_function_job_status( + function_job_id=function_job.uid, + user_id=self.user_id, + product_name=self.product_name, + ) + + if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED): + return stored_job_status + + if ( + function.function_class == FunctionClass.PROJECT + and function_job.function_class == FunctionClass.PROJECT + ): + job_status = await self._job_service.inspect_study_job( + job_id=function_job.project_job_id, + ) + elif (function.function_class == FunctionClass.SOLVER) and ( + function_job.function_class == FunctionClass.SOLVER + ): + job_status = await self._job_service.inspect_solver_job( + solver_key=function.solver_key, + version=function.solver_version, + job_id=function_job.solver_job_id, + ) + else: + raise UnsupportedFunctionFunctionJobClassCombinationError( + function_class=function.function_class, + function_job_class=function_job.function_class, + ) + + new_job_status = FunctionJobStatus(status=job_status.state) + + return await self._web_rpc_client.update_function_job_status( + function_job_id=function_job.uid, + user_id=self.user_id, + product_name=self.product_name, + job_status=new_job_status, + ) + + async def run_function( + self, + *, + function: RegisteredFunction, + function_inputs: FunctionInputs, + pricing_spec: JobPricingSpecification | None, + job_links: JobLinks, + x_simcore_parent_project_uuid: NodeID | None, + x_simcore_parent_node_id: NodeID | None, + ) -> RegisteredFunctionJob: + + user_api_access_rights = ( + await self._web_rpc_client.get_functions_user_api_access_rights( + user_id=self.user_id, product_name=self.product_name + ) + ) + if not user_api_access_rights.execute_functions: + raise FunctionsExecuteApiAccessDeniedError( + user_id=self.user_id, + function_id=function.uid, + ) + + user_permissions = await self._web_rpc_client.get_function_user_permissions( + function_id=function.uid, + user_id=self.user_id, + product_name=self.product_name, + ) + if not user_permissions.execute: + raise FunctionExecuteAccessDeniedError( + user_id=self.user_id, + function_id=function.uid, + ) + + joined_inputs = join_inputs( + function.default_inputs, + function_inputs, + ) + + if function.input_schema is not None: + is_valid, validation_str = await self.validate_function_inputs( + function_id=function.uid, + inputs=joined_inputs, + ) + if not is_valid: + raise FunctionInputsValidationError(error=validation_str) + + if cached_function_jobs := await self._web_rpc_client.find_cached_function_jobs( + function_id=function.uid, + inputs=joined_inputs, + user_id=self.user_id, + product_name=self.product_name, + ): + for cached_function_job in cached_function_jobs: + job_status = await self.inspect_function_job( + function=function, + function_job=cached_function_job, + ) + if job_status.status == RunningState.SUCCESS: + return cached_function_job + + if function.function_class == FunctionClass.PROJECT: + study_job = await self._job_service.create_studies_job( + study_id=function.project_id, + job_inputs=JobInputs(values=joined_inputs or {}), + hidden=True, + job_links=job_links, + x_simcore_parent_project_uuid=x_simcore_parent_project_uuid, + x_simcore_parent_node_id=x_simcore_parent_node_id, + ) + await self._job_service.start_study_job( + study_id=function.project_id, + job_id=study_job.id, + pricing_spec=pricing_spec, + ) + return await self._web_rpc_client.register_function_job( + function_job=ProjectFunctionJob( + function_uid=function.uid, + title=f"Function job of function {function.uid}", + description=function.description, + inputs=joined_inputs, + outputs=None, + project_job_id=study_job.id, + ), + user_id=self.user_id, + product_name=self.product_name, + ) + + if function.function_class == FunctionClass.SOLVER: + solver_job = await self._job_service.create_solver_job( + solver_key=function.solver_key, + version=function.solver_version, + inputs=JobInputs(values=joined_inputs or {}), + job_links=job_links, + hidden=True, + x_simcore_parent_project_uuid=x_simcore_parent_project_uuid, + x_simcore_parent_node_id=x_simcore_parent_node_id, + ) + await self._job_service.start_solver_job( + solver_key=function.solver_key, + version=function.solver_version, + job_id=solver_job.id, + pricing_spec=pricing_spec, + ) + return await self._web_rpc_client.register_function_job( + function_job=SolverFunctionJob( + function_uid=function.uid, + title=f"Function job of function {function.uid}", + description=function.description, + inputs=joined_inputs, + outputs=None, + solver_job_id=solver_job.id, + ), + user_id=self.user_id, + product_name=self.product_name, + ) + + raise UnsupportedFunctionClassError( + function_class=function.function_class, + ) + + async def map_function( + self, + *, + function: RegisteredFunction, + function_inputs_list: FunctionInputsList, + job_links: JobLinks, + pricing_spec: JobPricingSpecification | None, + x_simcore_parent_project_uuid: ProjectID | None, + x_simcore_parent_node_id: NodeID | None, + ) -> RegisteredFunctionJobCollection: + + function_jobs = [ + await self.run_function( + function=function, + function_inputs=function_inputs, + pricing_spec=pricing_spec, + job_links=job_links, + x_simcore_parent_project_uuid=x_simcore_parent_project_uuid, + x_simcore_parent_node_id=x_simcore_parent_node_id, + ) + for function_inputs in function_inputs_list + ] + + function_job_collection_description = f"Function job collection of map of function {function.uid} with {len(function_inputs_list)} inputs" + return await self._web_rpc_client.register_function_job_collection( + function_job_collection=FunctionJobCollection( + title="Function job collection of function map", + description=function_job_collection_description, + job_ids=[function_job.uid for function_job in function_jobs], + ), + user_id=self.user_id, + product_name=self.product_name, + ) diff --git a/services/api-server/src/simcore_service_api_server/_service_functions.py b/services/api-server/src/simcore_service_api_server/_service_functions.py index 0c2c85bd5f7b..4b6ba663bbe6 100644 --- a/services/api-server/src/simcore_service_api_server/_service_functions.py +++ b/services/api-server/src/simcore_service_api_server/_service_functions.py @@ -1,7 +1,11 @@ +# pylint: disable=no-self-use + +from collections.abc import Callable from dataclasses import dataclass from common_library.exclude import as_dict_exclude_none -from models_library.functions import RegisteredFunction +from models_library.functions import FunctionClass, RegisteredFunction +from models_library.functions_errors import UnsupportedFunctionClassError from models_library.products import ProductName from models_library.rest_pagination import ( MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE, @@ -10,7 +14,15 @@ ) from models_library.rpc_pagination import PageLimitInt from models_library.users import UserID -from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient + +from .models.api_resources import JobLinks +from .services_http.solver_job_models_converters import ( + get_solver_job_rest_interface_links, +) +from .services_http.study_job_models_converters import ( + get_study_job_rest_interface_links, +) +from .services_rpc.wb_api_server import WbApiRpcClient DEFAULT_PAGINATION_LIMIT = MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE - 1 @@ -38,3 +50,21 @@ async def list_functions( product_name=self.product_name, **pagination_kwargs, ) + + async def get_function_job_links( + self, function: RegisteredFunction, url_for: Callable + ) -> JobLinks: + if function.function_class == FunctionClass.SOLVER: + return get_solver_job_rest_interface_links( + url_for=url_for, + solver_key=function.solver_key, + version=function.solver_version, + ) + if function.function_class == FunctionClass.PROJECT: + return get_study_job_rest_interface_links( + url_for=url_for, + study_id=function.project_id, + ) + raise UnsupportedFunctionClassError( + function_class=function.function_class, + ) diff --git a/services/api-server/src/simcore_service_api_server/_service_jobs.py b/services/api-server/src/simcore_service_api_server/_service_jobs.py index 67aa11714337..0300665e74d7 100644 --- a/services/api-server/src/simcore_service_api_server/_service_jobs.py +++ b/services/api-server/src/simcore_service_api_server/_service_jobs.py @@ -1,7 +1,7 @@ import logging -from collections.abc import Callable from dataclasses import dataclass from pathlib import Path +from uuid import UUID from common_library.exclude import as_dict_exclude_none from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobGet @@ -10,8 +10,11 @@ ProjectGet, ProjectPatch, ) +from models_library.api_schemas_webserver.projects_nodes import NodeOutputs +from models_library.function_services_catalog.services import file_picker from models_library.products import ProductName from models_library.projects import ProjectID +from models_library.projects_nodes import InputID, InputTypes from models_library.projects_nodes_io import NodeID from models_library.rest_pagination import ( PageMetaInfoLimitOffset, @@ -20,20 +23,42 @@ from models_library.rpc.webserver.projects import ProjectJobRpcGet from models_library.rpc_pagination import PageLimitInt from models_library.users import UserID -from pydantic import HttpUrl from servicelib.logging_utils import log_context -from .models.api_resources import RelativeResourceName -from .models.basic_types import NameValueTuple -from .models.schemas.jobs import Job, JobID, JobInputs +from ._service_solvers import ( + SolverService, +) +from .exceptions.backend_errors import JobAssetsMissingError +from .exceptions.custom_errors import SolverServiceListJobsFiltersError +from .models.api_resources import ( + JobLinks, + RelativeResourceName, + compose_resource_name, +) +from .models.basic_types import NameValueTuple, VersionStr +from .models.schemas.jobs import ( + Job, + JobID, + JobInputs, + JobPricingSpecification, + JobStatus, +) from .models.schemas.programs import Program -from .models.schemas.solvers import Solver +from .models.schemas.solvers import Solver, SolverKeyId +from .models.schemas.studies import Study, StudyID +from .services_http.director_v2 import DirectorV2Api +from .services_http.jobs import start_project from .services_http.solver_job_models_converters import ( create_job_from_project, create_job_inputs_from_node_inputs, + create_jobstatus_from_task, create_new_project_for_job, ) from .services_http.storage import StorageApi +from .services_http.study_job_models_converters import ( + create_job_from_study, + get_project_and_file_inputs_from_job_inputs, +) from .services_http.webserver import AuthSession from .services_rpc.director_v2 import DirectorV2Service from .services_rpc.storage import StorageService @@ -42,17 +67,35 @@ _logger = logging.getLogger(__name__) +def compose_solver_job_resource_name(solver_key, solver_version, job_id) -> str: + """Creates a unique resource name for solver's jobs""" + return Job.compose_resource_name( + parent_name=Solver.compose_resource_name(solver_key, solver_version), + job_id=job_id, + ) + + +def compose_study_job_resource_name(study_key, job_id) -> str: + """Creates a unique resource name for study's jobs""" + return Job.compose_resource_name( + parent_name=Study.compose_resource_name(study_key), + job_id=job_id, + ) + + @dataclass(frozen=True, kw_only=True) class JobService: _web_rest_client: AuthSession _web_rpc_client: WbApiRpcClient _storage_rpc_client: StorageService + _director2_api: DirectorV2Api _storage_rest_client: StorageApi _directorv2_rpc_client: DirectorV2Service + _solver_service: SolverService user_id: UserID product_name: ProductName - async def list_jobs( + async def _list_jobs( self, job_parent_resource_name: str, *, @@ -105,14 +148,72 @@ async def list_jobs( return jobs, projects_page.meta - async def create_job( + async def list_solver_jobs( + self, + *, + pagination_offset: PageOffsetInt | None = None, + pagination_limit: PageLimitInt | None = None, + filter_by_solver_key: SolverKeyId | None = None, + filter_by_solver_version: VersionStr | None = None, + filter_any_custom_metadata: list[NameValueTuple] | None = None, + ) -> tuple[list[Job], PageMetaInfoLimitOffset]: + """Lists all solver jobs for a user with pagination""" + + # 1. Compose job parent resource name prefix + collection_or_resource_ids = [ + "solvers", # solver_id, "releases", solver_version, "jobs", + ] + if filter_by_solver_key: + collection_or_resource_ids.append(filter_by_solver_key) + if filter_by_solver_version: + collection_or_resource_ids.append("releases") + collection_or_resource_ids.append(filter_by_solver_version) + elif filter_by_solver_version: + raise SolverServiceListJobsFiltersError + + job_parent_resource_name = compose_resource_name(*collection_or_resource_ids) + + # 2. list jobs under job_parent_resource_name + return await self._list_jobs( + job_parent_resource_name=job_parent_resource_name, + filter_any_custom_metadata=filter_any_custom_metadata, + pagination_offset=pagination_offset, + pagination_limit=pagination_limit, + ) + + async def list_study_jobs( + self, + *, + filter_by_study_id: StudyID | None = None, + pagination_offset: PageOffsetInt | None = None, + pagination_limit: PageLimitInt | None = None, + ) -> tuple[list[Job], PageMetaInfoLimitOffset]: + """Lists all solver jobs for a user with pagination""" + + # 1. Compose job parent resource name prefix + collection_or_resource_ids: list[str] = [ + "study", # study_id, "jobs", + ] + if filter_by_study_id: + collection_or_resource_ids.append(f"{filter_by_study_id}") + + job_parent_resource_name = compose_resource_name(*collection_or_resource_ids) + + # 2. list jobs under job_parent_resource_name + return await self._list_jobs( + job_parent_resource_name=job_parent_resource_name, + pagination_offset=pagination_offset, + pagination_limit=pagination_limit, + ) + + async def create_project_marked_as_job( self, *, solver_or_program: Solver | Program, inputs: JobInputs, parent_project_uuid: ProjectID | None, parent_node_id: NodeID | None, - url_for: Callable[..., HttpUrl], + job_links: JobLinks, hidden: bool, project_name: str | None, description: str | None, @@ -153,7 +254,9 @@ async def create_job( # for consistency, it rebuild job job = create_job_from_project( - solver_or_program=solver_or_program, project=new_project, url_for=url_for + solver_or_program=solver_or_program, + project=new_project, + job_links=job_links, ) assert job.id == pre_job.id # nosec assert job.name == pre_job.name # nosec @@ -190,7 +293,7 @@ async def get_job( async def delete_job_assets( self, job_parent_resource_name: RelativeResourceName, job_id: JobID - ): + ) -> None: """Marks job project as hidden and deletes S3 assets associated it""" await self._web_rest_client.patch_project( project_id=job_id, patch_params=ProjectPatch(hidden=True) @@ -205,3 +308,175 @@ async def delete_job_assets( job_parent_resource_name=job_parent_resource_name, storage_assets_deleted=True, ) + + async def create_solver_job( + self, + *, + solver_key: SolverKeyId, + version: VersionStr, + inputs: JobInputs, + hidden: bool, + job_links: JobLinks, + x_simcore_parent_project_uuid: ProjectID | None, + x_simcore_parent_node_id: NodeID | None, + ) -> Job: + + solver = await self._solver_service.get_solver( + solver_key=solver_key, + solver_version=version, + ) + job, _ = await self.create_project_marked_as_job( + project_name=None, + description=None, + solver_or_program=solver, + inputs=inputs, + hidden=hidden, + parent_project_uuid=x_simcore_parent_project_uuid, + parent_node_id=x_simcore_parent_node_id, + job_links=job_links, + ) + + return job + + async def inspect_solver_job( + self, + *, + solver_key: SolverKeyId, + version: VersionStr, + job_id: JobID, + ) -> JobStatus: + assert solver_key # nosec + assert version # nosec + task = await self._director2_api.get_computation( + project_id=job_id, user_id=self.user_id + ) + job_status: JobStatus = create_jobstatus_from_task(task) + return job_status + + async def start_solver_job( + self, + *, + solver_key: SolverKeyId, + version: VersionStr, + job_id: JobID, + pricing_spec: JobPricingSpecification | None, + ) -> JobStatus: + """ + Raises ProjectAlreadyStartedError if the project is already started + """ + job_name = compose_solver_job_resource_name(solver_key, version, job_id) + _logger.debug("Start Job '%s'", job_name) + job_parent_resource_name = Solver.compose_resource_name(solver_key, version) + job = await self.get_job( + job_id=job_id, job_parent_resource_name=job_parent_resource_name + ) + if job.storage_assets_deleted: + raise JobAssetsMissingError(job_id=job_id) + await start_project( + pricing_spec=pricing_spec, + job_id=job_id, + expected_job_name=job_name, + webserver_api=self._web_rest_client, + ) + return await self.inspect_solver_job( + solver_key=solver_key, + version=version, + job_id=job_id, + ) + + async def create_studies_job( + self, + *, + study_id: StudyID, + job_inputs: JobInputs, + x_simcore_parent_project_uuid: ProjectID | None, + x_simcore_parent_node_id: NodeID | None, + job_links: JobLinks, + hidden: bool, + ) -> Job: + + project = await self._web_rest_client.clone_project( + project_id=study_id, + hidden=hidden, + parent_project_uuid=x_simcore_parent_project_uuid, + parent_node_id=x_simcore_parent_node_id, + ) + job = create_job_from_study( + study_key=study_id, + project=project, + job_inputs=job_inputs, + job_links=job_links, + ) + + await self._web_rest_client.patch_project( + project_id=job.id, + patch_params=ProjectPatch(name=job.name), + ) + + await self._web_rpc_client.mark_project_as_job( + product_name=self.product_name, + user_id=self.user_id, + project_uuid=job.id, + job_parent_resource_name=job.runner_name, + storage_assets_deleted=False, + ) + + project_inputs = await self._web_rest_client.get_project_inputs( + project_id=project.uuid + ) + + file_param_nodes = {} + for node_id, node in project.workbench.items(): + if ( + node.key == file_picker.META.key + and node.outputs is not None + and len(node.outputs) == 0 + ): + file_param_nodes[node.label] = node_id + + file_inputs: dict[InputID, InputTypes] = {} + + ( + new_project_inputs, + new_project_file_inputs, + ) = get_project_and_file_inputs_from_job_inputs( + project_inputs, file_inputs, job_inputs + ) + + for node_label, file_link in new_project_file_inputs.items(): + await self._web_rest_client.update_node_outputs( + project_id=project.uuid, + node_id=UUID(file_param_nodes[node_label]), + new_node_outputs=NodeOutputs(outputs={"outFile": file_link}), + ) + + if len(new_project_inputs) > 0: + await self._web_rest_client.update_project_inputs( + project_id=project.uuid, new_inputs=new_project_inputs + ) + return job + + async def inspect_study_job(self, *, job_id: JobID) -> JobStatus: + task = await self._director2_api.get_computation( + project_id=job_id, user_id=self.user_id + ) + job_status: JobStatus = create_jobstatus_from_task(task) + return job_status + + async def start_study_job( + self, + *, + job_id: JobID, + study_id: StudyID, + pricing_spec: JobPricingSpecification | None, + ): + job_name = compose_study_job_resource_name(study_id, job_id) + await start_project( + job_id=job_id, + expected_job_name=job_name, + webserver_api=self._web_rest_client, + pricing_spec=pricing_spec, + ) + return await self.inspect_study_job( + job_id=job_id, + ) diff --git a/services/api-server/src/simcore_service_api_server/_service_solvers.py b/services/api-server/src/simcore_service_api_server/_service_solvers.py index 3c0aac79721a..458f53432e1e 100644 --- a/services/api-server/src/simcore_service_api_server/_service_solvers.py +++ b/services/api-server/src/simcore_service_api_server/_service_solvers.py @@ -10,18 +10,11 @@ from models_library.rpc_pagination import PageLimitInt from models_library.services_enums import ServiceType from models_library.users import UserID -from simcore_service_api_server.models.basic_types import NameValueTuple -from ._service_jobs import JobService from ._service_utils import check_user_product_consistency from .exceptions.backend_errors import ( ProgramOrSolverOrStudyNotFoundError, ) -from .exceptions.custom_errors import ( - SolverServiceListJobsFiltersError, -) -from .models.api_resources import compose_resource_name -from .models.schemas.jobs import Job from .models.schemas.solvers import Solver, SolverKeyId from .services_rpc.catalog import CatalogService @@ -29,7 +22,6 @@ @dataclass(frozen=True, kw_only=True) class SolverService: catalog_service: CatalogService - job_service: JobService user_id: UserID product_name: ProductName @@ -41,13 +33,6 @@ def __post_init__(self): product_name=self.product_name, ) - check_user_product_consistency( - service_cls_name=self.__class__.__name__, - service_provider=self.job_service, - user_id=self.user_id, - product_name=self.product_name, - ) - async def get_solver( self, *, @@ -84,39 +69,6 @@ async def get_latest_release( return Solver.create_from_service(service) - async def list_jobs( - self, - *, - pagination_offset: PageOffsetInt | None = None, - pagination_limit: PageLimitInt | None = None, - filter_by_solver_key: SolverKeyId | None = None, - filter_by_solver_version: VersionStr | None = None, - filter_any_custom_metadata: list[NameValueTuple] | None = None, - ) -> tuple[list[Job], PageMetaInfoLimitOffset]: - """Lists all solver jobs for a user with pagination""" - - # 1. Compose job parent resource name prefix - collection_or_resource_ids = [ - "solvers", # solver_id, "releases", solver_version, "jobs", - ] - if filter_by_solver_key: - collection_or_resource_ids.append(filter_by_solver_key) - if filter_by_solver_version: - collection_or_resource_ids.append("releases") - collection_or_resource_ids.append(filter_by_solver_version) - elif filter_by_solver_version: - raise SolverServiceListJobsFiltersError - - job_parent_resource_name = compose_resource_name(*collection_or_resource_ids) - - # 2. list jobs under job_parent_resource_name - return await self.job_service.list_jobs( - job_parent_resource_name=job_parent_resource_name, - filter_any_custom_metadata=filter_any_custom_metadata, - pagination_offset=pagination_offset, - pagination_limit=pagination_limit, - ) - async def solver_release_history( self, *, diff --git a/services/api-server/src/simcore_service_api_server/_service_studies.py b/services/api-server/src/simcore_service_api_server/_service_studies.py index 733315f33305..89fa5196e341 100644 --- a/services/api-server/src/simcore_service_api_server/_service_studies.py +++ b/services/api-server/src/simcore_service_api_server/_service_studies.py @@ -3,17 +3,11 @@ from models_library.products import ProductName from models_library.rest_pagination import ( MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE, - PageMetaInfoLimitOffset, - PageOffsetInt, ) -from models_library.rpc_pagination import PageLimitInt from models_library.users import UserID from ._service_jobs import JobService from ._service_utils import check_user_product_consistency -from .models.api_resources import compose_resource_name -from .models.schemas.jobs import Job -from .models.schemas.studies import StudyID DEFAULT_PAGINATION_LIMIT = MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE - 1 @@ -31,28 +25,3 @@ def __post_init__(self): user_id=self.user_id, product_name=self.product_name, ) - - async def list_jobs( - self, - *, - filter_by_study_id: StudyID | None = None, - pagination_offset: PageOffsetInt | None = None, - pagination_limit: PageLimitInt | None = None, - ) -> tuple[list[Job], PageMetaInfoLimitOffset]: - """Lists all solver jobs for a user with pagination""" - - # 1. Compose job parent resource name prefix - collection_or_resource_ids: list[str] = [ - "study", # study_id, "jobs", - ] - if filter_by_study_id: - collection_or_resource_ids.append(f"{filter_by_study_id}") - - job_parent_resource_name = compose_resource_name(*collection_or_resource_ids) - - # 2. list jobs under job_parent_resource_name - return await self.job_service.list_jobs( - job_parent_resource_name=job_parent_resource_name, - pagination_offset=pagination_offset, - pagination_limit=pagination_limit, - ) diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/services.py b/services/api-server/src/simcore_service_api_server/api/dependencies/services.py index fe5c30ceb164..1b722ad182b2 100644 --- a/services/api-server/src/simcore_service_api_server/api/dependencies/services.py +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/services.py @@ -7,13 +7,13 @@ from models_library.products import ProductName from models_library.users import UserID from servicelib.rabbitmq import RabbitMQRPCClient -from simcore_service_api_server._service_function_jobs import FunctionJobService -from simcore_service_api_server._service_functions import FunctionService +from ..._service_function_jobs import FunctionJobService +from ..._service_functions import FunctionService from ..._service_jobs import JobService from ..._service_programs import ProgramService from ..._service_solvers import SolverService -from ..._service_studies import StudyService +from ...services_http.director_v2 import DirectorV2Api from ...services_http.storage import StorageApi from ...services_http.webserver import AuthSession from ...services_rpc.catalog import CatalogService @@ -84,33 +84,8 @@ def get_directorv2_service( return DirectorV2Service(_rpc_client=rpc_client) -def get_job_service( - web_rest_api: Annotated[AuthSession, Depends(get_webserver_session)], - web_rpc_api: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], - storage_rpc_client: Annotated[StorageService, Depends(get_storage_service)], - storage_rest_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))], - directorv2_service: Annotated[DirectorV2Service, Depends(get_directorv2_service)], - user_id: Annotated[UserID, Depends(get_current_user_id)], - product_name: Annotated[ProductName, Depends(get_product_name)], -) -> JobService: - """ - "Assembles" the JobsService layer to the underlying service and client interfaces - in the context of the rest controller (i.e. api/dependencies) - """ - return JobService( - _web_rest_client=web_rest_api, - _web_rpc_client=web_rpc_api, - _storage_rpc_client=storage_rpc_client, - _storage_rest_client=storage_rest_client, - _directorv2_rpc_client=directorv2_service, - user_id=user_id, - product_name=product_name, - ) - - def get_solver_service( catalog_service: Annotated[CatalogService, Depends(get_catalog_service)], - job_service: Annotated[JobService, Depends(get_job_service)], user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], ) -> SolverService: @@ -120,19 +95,6 @@ def get_solver_service( """ return SolverService( catalog_service=catalog_service, - job_service=job_service, - user_id=user_id, - product_name=product_name, - ) - - -def get_study_service( - job_service: Annotated[JobService, Depends(get_job_service)], - user_id: Annotated[UserID, Depends(get_current_user_id)], - product_name: Annotated[ProductName, Depends(get_product_name)], -) -> StudyService: - return StudyService( - job_service=job_service, user_id=user_id, product_name=product_name, ) @@ -146,6 +108,34 @@ def get_program_service( ) +def get_job_service( + web_rest_api: Annotated[AuthSession, Depends(get_webserver_session)], + director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], + storage_api: Annotated[StorageApi, Depends(get_api_client(StorageApi))], + web_rpc_api: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + storage_service: Annotated[StorageService, Depends(get_storage_service)], + directorv2_service: Annotated[DirectorV2Service, Depends(get_directorv2_service)], + user_id: Annotated[UserID, Depends(get_current_user_id)], + product_name: Annotated[ProductName, Depends(get_product_name)], + solver_service: Annotated[SolverService, Depends(get_solver_service)], +) -> JobService: + """ + "Assembles" the JobsService layer to the underlying service and client interfaces + in the context of the rest controller (i.e. api/dependencies) + """ + return JobService( + _web_rest_client=web_rest_api, + _web_rpc_client=web_rpc_api, + _storage_rpc_client=storage_service, + _directorv2_rpc_client=directorv2_service, + _director2_api=director2_api, + _storage_rest_client=storage_api, + _solver_service=solver_service, + user_id=user_id, + product_name=product_name, + ) + + def get_function_service( web_rpc_api: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], user_id: Annotated[UserID, Depends(get_current_user_id)], @@ -160,11 +150,13 @@ def get_function_service( def get_function_job_service( web_rpc_api: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + job_service: Annotated[JobService, Depends(get_job_service)], user_id: Annotated[UserID, Depends(get_current_user_id)], product_name: Annotated[ProductName, Depends(get_product_name)], ) -> FunctionJobService: return FunctionJobService( _web_rpc_client=web_rpc_api, + _job_service=job_service, user_id=user_id, product_name=product_name, ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py index 06097542f17f..c5667f8bce65 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py @@ -18,24 +18,24 @@ from ...models.pagination import Page, PaginationParams from ...models.schemas.errors import ErrorGet -from ...services_http.director_v2 import DirectorV2Api from ...services_rpc.wb_api_server import WbApiRpcClient from ..dependencies.authentication import get_current_user_id, get_product_name from ..dependencies.functions import ( get_function_from_functionjobid, - get_stored_job_status, ) from ..dependencies.models_schemas_function_filters import ( get_function_job_collections_filters, ) -from ..dependencies.services import get_api_client, get_function_job_service +from ..dependencies.services import ( + get_function_job_service, +) from ..dependencies.webserver_rpc import get_wb_api_rpc_client from ._constants import ( FMSG_CHANGELOG_ADDED_IN_VERSION, FMSG_CHANGELOG_NEW_IN_VERSION, create_route_description, ) -from .function_jobs_routes import function_job_status, get_function_job +from .function_jobs_routes import get_function_job # pylint: disable=too-many-arguments @@ -256,9 +256,11 @@ async def function_job_collection_list_function_jobs_list( async def function_job_collection_status( function_job_collection_id: FunctionJobCollectionID, wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], - director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], user_id: Annotated[UserID, Depends(get_current_user_id)], # Updated type product_name: Annotated[ProductName, Depends(get_product_name)], + function_job_service: Annotated[ + FunctionJobService, Depends(get_function_job_service) + ], ) -> FunctionJobCollectionStatus: function_job_collection = await get_function_job_collection( function_job_collection_id=function_job_collection_id, @@ -269,7 +271,7 @@ async def function_job_collection_status( job_statuses = await limited_gather( *[ - function_job_status( + function_job_service.inspect_function_job( function_job=await get_function_job( function_job_id=function_job_id, wb_api_rpc=wb_api_rpc, @@ -282,16 +284,6 @@ async def function_job_collection_status( user_id=user_id, product_name=product_name, ), - stored_job_status=await get_stored_job_status( - function_job_id=function_job_id, - wb_api_rpc=wb_api_rpc, - user_id=user_id, - product_name=product_name, - ), - wb_api_rpc=wb_api_rpc, - director2_api=director2_api, - user_id=user_id, - product_name=product_name, ) for function_job_id in function_job_collection.job_ids ] diff --git a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py index 170dfe526146..f1ee7cc4e573 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py @@ -15,10 +15,8 @@ from models_library.functions import RegisteredFunction from models_library.functions_errors import ( UnsupportedFunctionClassError, - UnsupportedFunctionFunctionJobClassCombinationError, ) from models_library.products import ProductName -from models_library.projects_state import RunningState from models_library.users import UserID from servicelib.fastapi.dependencies import get_app from simcore_service_api_server.models.schemas.functions_filters import ( @@ -30,7 +28,6 @@ from ..._service_jobs import JobService from ...models.pagination import Page, PaginationParams from ...models.schemas.errors import ErrorGet -from ...services_http.director_v2 import DirectorV2Api from ...services_http.storage import StorageApi from ...services_http.webserver import AuthSession from ...services_rpc.wb_api_server import WbApiRpcClient @@ -40,7 +37,6 @@ get_function_from_functionjob, get_function_job_dependency, get_stored_job_outputs, - get_stored_job_status, ) from ..dependencies.models_schemas_function_filters import get_function_jobs_filters from ..dependencies.services import ( @@ -50,7 +46,7 @@ ) from ..dependencies.webserver_http import get_webserver_session from ..dependencies.webserver_rpc import get_wb_api_rpc_client -from . import solvers_jobs, solvers_jobs_read, studies_jobs +from . import solvers_jobs_read, studies_jobs from ._constants import ( FMSG_CHANGELOG_ADDED_IN_VERSION, FMSG_CHANGELOG_NEW_IN_VERSION, @@ -204,49 +200,13 @@ async def function_job_status( RegisteredFunctionJob, Depends(get_function_job_dependency) ], function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)], - stored_job_status: Annotated[FunctionJobStatus, Depends(get_stored_job_status)], - director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], - user_id: Annotated[UserID, Depends(get_current_user_id)], - product_name: Annotated[ProductName, Depends(get_product_name)], - wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], + function_job_service: Annotated[ + FunctionJobService, Depends(get_function_job_service) + ], ) -> FunctionJobStatus: - if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED): - return stored_job_status - - if ( - function.function_class == FunctionClass.PROJECT - and function_job.function_class == FunctionClass.PROJECT - ): - job_status = await studies_jobs.inspect_study_job( - study_id=function.project_id, - job_id=function_job.project_job_id, - user_id=user_id, - director2_api=director2_api, - ) - elif (function.function_class == FunctionClass.SOLVER) and ( - function_job.function_class == FunctionClass.SOLVER - ): - job_status = await solvers_jobs.inspect_job( - solver_key=function.solver_key, - version=function.solver_version, - job_id=function_job.solver_job_id, - user_id=user_id, - director2_api=director2_api, - ) - else: - raise UnsupportedFunctionFunctionJobClassCombinationError( - function_class=function.function_class, - function_job_class=function_job.function_class, - ) - - new_job_status = FunctionJobStatus(status=job_status.state) - - return await wb_api_rpc.update_function_job_status( - function_job_id=function_job.uid, - user_id=user_id, - product_name=product_name, - job_status=new_job_status, + return await function_job_service.inspect_function_job( + function=function, function_job=function_job ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py b/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py index f3799a97c5b4..e1d05e129c3e 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py @@ -2,69 +2,43 @@ from collections.abc import Callable from typing import Annotated, Final, Literal -import jsonschema from fastapi import APIRouter, Depends, Header, Request, status from fastapi_pagination.api import create_page from fastapi_pagination.bases import AbstractPage -from jsonschema import ValidationError from models_library.api_schemas_api_server.functions import ( Function, - FunctionClass, FunctionID, FunctionInputs, FunctionInputSchema, FunctionInputsList, - FunctionJobCollection, FunctionOutputSchema, - FunctionSchemaClass, - ProjectFunctionJob, RegisteredFunction, RegisteredFunctionJob, RegisteredFunctionJobCollection, - SolverFunctionJob, -) -from models_library.functions import FunctionUserAccessRights -from models_library.functions_errors import ( - FunctionExecuteAccessDeniedError, - FunctionInputsValidationError, - FunctionsExecuteApiAccessDeniedError, - UnsupportedFunctionClassError, ) from models_library.products import ProductName from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID -from models_library.projects_state import RunningState from models_library.users import UserID from servicelib.fastapi.dependencies import get_reverse_url_mapper from ..._service_function_jobs import FunctionJobService from ..._service_functions import FunctionService -from ..._service_jobs import JobService -from ..._service_solvers import SolverService from ...models.pagination import Page, PaginationParams from ...models.schemas.errors import ErrorGet -from ...models.schemas.jobs import JobInputs -from ...services_http.director_v2 import DirectorV2Api -from ...services_http.webserver import AuthSession +from ...models.schemas.jobs import JobPricingSpecification from ...services_rpc.wb_api_server import WbApiRpcClient from ..dependencies.authentication import get_current_user_id, get_product_name -from ..dependencies.functions import get_stored_job_status from ..dependencies.services import ( - get_api_client, get_function_job_service, get_function_service, - get_job_service, - get_solver_service, ) -from ..dependencies.webserver_http import get_webserver_session from ..dependencies.webserver_rpc import get_wb_api_rpc_client -from . import solvers_jobs, studies_jobs from ._constants import ( FMSG_CHANGELOG_ADDED_IN_VERSION, FMSG_CHANGELOG_NEW_IN_VERSION, create_route_description, ) -from .function_jobs_routes import register_function_job # pylint: disable=too-many-arguments # pylint: disable=cyclic-import @@ -261,20 +235,6 @@ async def update_function_description( return returned_function -def _join_inputs( - default_inputs: FunctionInputs | None, - function_inputs: FunctionInputs | None, -) -> FunctionInputs: - if default_inputs is None: - return function_inputs - - if function_inputs is None: - return default_inputs - - # last dict will override defaults - return {**default_inputs, **function_inputs} - - @function_router.get( "/{function_id:uuid}/input_schema", response_model=FunctionInputSchema, @@ -332,29 +292,13 @@ async def get_function_outputschema( async def validate_function_inputs( function_id: FunctionID, inputs: FunctionInputs, - wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], - user_id: Annotated[UserID, Depends(get_current_user_id)], - product_name: Annotated[ProductName, Depends(get_product_name)], + function_job_service: Annotated[ + FunctionJobService, Depends(get_function_job_service) + ], ) -> tuple[bool, str]: - function = await wb_api_rpc.get_function( - function_id=function_id, user_id=user_id, product_name=product_name - ) - - if function.input_schema is None or function.input_schema.schema_content is None: - return True, "No input schema defined for this function" - - if function.input_schema.schema_class == FunctionSchemaClass.json_schema: - try: - jsonschema.validate( - instance=inputs, schema=function.input_schema.schema_content - ) - except ValidationError as err: - return False, str(err) - return True, "Inputs are valid" - - return ( - False, - f"Unsupported function schema class {function.input_schema.schema_class}", + return await function_job_service.validate_function_inputs( + function_id=function_id, + inputs=inputs, ) @@ -369,17 +313,13 @@ async def validate_function_inputs( ) async def run_function( # noqa: PLR0913 request: Request, - function_id: FunctionID, to_run_function: Annotated[RegisteredFunction, Depends(get_function)], - wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], - webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], - director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], function_inputs: FunctionInputs, - user_id: Annotated[UserID, Depends(get_current_user_id)], - product_name: Annotated[str, Depends(get_product_name)], - solver_service: Annotated[SolverService, Depends(get_solver_service)], - job_service: Annotated[JobService, Depends(get_job_service)], + function_service: Annotated[FunctionService, Depends(get_function_service)], + function_jobs_service: Annotated[ + FunctionJobService, Depends(get_function_job_service) + ], x_simcore_parent_project_uuid: Annotated[ProjectID | Literal["null"], Header()], x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()], ) -> RegisteredFunctionJob: @@ -393,140 +333,16 @@ async def run_function( # noqa: PLR0913 if isinstance(x_simcore_parent_node_id, NodeID) else None ) - - user_api_access_rights = await wb_api_rpc.get_functions_user_api_access_rights( - user_id=user_id, product_name=product_name - ) - if not user_api_access_rights.execute_functions: - raise FunctionsExecuteApiAccessDeniedError( - user_id=user_id, - function_id=function_id, - ) - - user_permissions: FunctionUserAccessRights = ( - await wb_api_rpc.get_function_user_permissions( - function_id=function_id, user_id=user_id, product_name=product_name - ) - ) - if not user_permissions.execute: - raise FunctionExecuteAccessDeniedError( - user_id=user_id, - function_id=function_id, - ) - - from .function_jobs_routes import function_job_status - - joined_inputs = _join_inputs( - to_run_function.default_inputs, - function_inputs, - ) - - if to_run_function.input_schema is not None: - is_valid, validation_str = await validate_function_inputs( - function_id=to_run_function.uid, - inputs=joined_inputs, - wb_api_rpc=wb_api_rpc, - user_id=user_id, - product_name=product_name, - ) - if not is_valid: - raise FunctionInputsValidationError(error=validation_str) - - if cached_function_jobs := await wb_api_rpc.find_cached_function_jobs( - function_id=to_run_function.uid, - inputs=joined_inputs, - user_id=user_id, - product_name=product_name, - ): - for cached_function_job in cached_function_jobs: - job_status = await function_job_status( - function=to_run_function, - function_job=cached_function_job, - stored_job_status=await get_stored_job_status( - function_job_id=cached_function_job.uid, - user_id=user_id, - product_name=product_name, - wb_api_rpc=wb_api_rpc, - ), - wb_api_rpc=wb_api_rpc, - user_id=user_id, - director2_api=director2_api, - product_name=product_name, - ) - if job_status.status == RunningState.SUCCESS: - return cached_function_job - - if to_run_function.function_class == FunctionClass.PROJECT: - study_job = await studies_jobs.create_study_job( - study_id=to_run_function.project_id, - job_inputs=JobInputs(values=joined_inputs or {}), - webserver_api=webserver_api, - wb_api_rpc=wb_api_rpc, - url_for=url_for, - x_simcore_parent_project_uuid=parent_project_uuid, - x_simcore_parent_node_id=parent_node_id, - user_id=user_id, - product_name=product_name, - ) - await studies_jobs.start_study_job( - request=request, - study_id=to_run_function.project_id, - job_id=study_job.id, - user_id=user_id, - webserver_api=webserver_api, - director2_api=director2_api, - ) - return await register_function_job( - wb_api_rpc=wb_api_rpc, - function_job=ProjectFunctionJob( - function_uid=to_run_function.uid, - title=f"Function job of function {to_run_function.uid}", - description=to_run_function.description, - inputs=joined_inputs, - outputs=None, - project_job_id=study_job.id, - ), - user_id=user_id, - product_name=product_name, - ) - - if to_run_function.function_class == FunctionClass.SOLVER: - solver_job = await solvers_jobs.create_solver_job( - solver_key=to_run_function.solver_key, - version=to_run_function.solver_version, - inputs=JobInputs(values=joined_inputs or {}), - solver_service=solver_service, - job_service=job_service, - url_for=url_for, - x_simcore_parent_project_uuid=parent_project_uuid, - x_simcore_parent_node_id=parent_node_id, - ) - await solvers_jobs.start_job( - request=request, - solver_key=to_run_function.solver_key, - version=to_run_function.solver_version, - job_id=solver_job.id, - user_id=user_id, - webserver_api=webserver_api, - director2_api=director2_api, - job_service=job_service, - ) - return await register_function_job( - wb_api_rpc=wb_api_rpc, - function_job=SolverFunctionJob( - function_uid=to_run_function.uid, - title=f"Function job of function {to_run_function.uid}", - description=to_run_function.description, - inputs=joined_inputs, - outputs=None, - solver_job_id=solver_job.id, - ), - user_id=user_id, - product_name=product_name, - ) - - raise UnsupportedFunctionClassError( - function_class=to_run_function.function_class, + pricing_spec = JobPricingSpecification.create_from_headers(request.headers) + job_links = await function_service.get_function_job_links(to_run_function, url_for) + + return await function_jobs_service.run_function( + function=to_run_function, + function_inputs=function_inputs, + pricing_spec=pricing_spec, + job_links=job_links, + x_simcore_parent_project_uuid=parent_project_uuid, + x_simcore_parent_node_id=parent_node_id, ) @@ -569,51 +385,36 @@ async def delete_function( ) async def map_function( # noqa: PLR0913 request: Request, - function_id: FunctionID, to_run_function: Annotated[RegisteredFunction, Depends(get_function)], function_inputs_list: FunctionInputsList, - wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], - webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], - director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], - user_id: Annotated[UserID, Depends(get_current_user_id)], - product_name: Annotated[str, Depends(get_product_name)], - solver_service: Annotated[SolverService, Depends(get_solver_service)], - job_service: Annotated[JobService, Depends(get_job_service)], + function_jobs_service: Annotated[ + FunctionJobService, Depends(get_function_job_service) + ], + function_service: Annotated[FunctionService, Depends(get_function_service)], x_simcore_parent_project_uuid: Annotated[ProjectID | Literal["null"], Header()], x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()], ) -> RegisteredFunctionJobCollection: - function_jobs = [ - await run_function( - wb_api_rpc=wb_api_rpc, - function_id=function_id, - to_run_function=to_run_function, - function_inputs=function_inputs, - product_name=product_name, - user_id=user_id, - webserver_api=webserver_api, - url_for=url_for, - director2_api=director2_api, - request=request, - solver_service=solver_service, - job_service=job_service, - x_simcore_parent_project_uuid=x_simcore_parent_project_uuid, - x_simcore_parent_node_id=x_simcore_parent_node_id, - ) - for function_inputs in function_inputs_list - ] - function_job_collection_description = f"Function job collection of map of function {function_id} with {len(function_inputs_list)} inputs" - # Import here to avoid circular import - from .function_job_collections_routes import register_function_job_collection - - return await register_function_job_collection( - wb_api_rpc=wb_api_rpc, - function_job_collection=FunctionJobCollection( - title="Function job collection of function map", - description=function_job_collection_description, - job_ids=[function_job.uid for function_job in function_jobs], - ), - user_id=user_id, - product_name=product_name, + parent_project_uuid = ( + x_simcore_parent_project_uuid + if isinstance(x_simcore_parent_project_uuid, ProjectID) + else None + ) + parent_node_id = ( + x_simcore_parent_node_id + if isinstance(x_simcore_parent_node_id, NodeID) + else None + ) + pricing_spec = JobPricingSpecification.create_from_headers(request.headers) + + job_links = await function_service.get_function_job_links(to_run_function, url_for) + + return await function_jobs_service.map_function( + function=to_run_function, + function_inputs_list=function_inputs_list, + pricing_spec=pricing_spec, + job_links=job_links, + x_simcore_parent_project_uuid=parent_project_uuid, + x_simcore_parent_node_id=parent_node_id, ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/programs.py b/services/api-server/src/simcore_service_api_server/api/routes/programs.py index 2ccd9d5ed7b3..86910bf754a8 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/programs.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/programs.py @@ -16,6 +16,7 @@ complete_file_upload, get_upload_links_from_s3, ) +from simcore_service_api_server.models.api_resources import JobLinks from ..._service_jobs import JobService from ..._service_programs import ProgramService @@ -153,7 +154,6 @@ async def create_program_job( user_id: Annotated[PositiveInt, Depends(get_current_user_id)], program_service: Annotated[ProgramService, Depends(get_program_service)], job_service: Annotated[JobService, Depends(get_job_service)], - url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], x_simcore_parent_project_uuid: Annotated[ProjectID | None, Header()] = None, x_simcore_parent_node_id: Annotated[NodeID | None, Header()] = None, name: Annotated[ @@ -171,15 +171,20 @@ async def create_program_job( name=program_key, version=version, ) + job_rest_interface_links = JobLinks( + url_template=None, + runner_url_template=None, + outputs_url_template=None, + ) - job, project = await job_service.create_job( + job, project = await job_service.create_project_marked_as_job( project_name=name, description=description, solver_or_program=program, inputs=inputs, parent_project_uuid=x_simcore_parent_project_uuid, parent_node_id=x_simcore_parent_node_id, - url_for=url_for, + job_links=job_rest_interface_links, hidden=False, ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py index 7e8bd40eb1ce..1bca0e13f826 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py @@ -4,16 +4,14 @@ from collections.abc import Callable from typing import Annotated, Any -from fastapi import APIRouter, Depends, Header, HTTPException, Query, Request, status -from fastapi.encoders import jsonable_encoder +from fastapi import APIRouter, Depends, Header, Query, Request, status from fastapi.responses import JSONResponse from models_library.clusters import ClusterID from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from pydantic.types import PositiveInt -from ..._service_jobs import JobService -from ..._service_solvers import SolverService +from ..._service_jobs import JobService, compose_solver_job_resource_name from ...exceptions.backend_errors import ProjectAlreadyStartedError from ...exceptions.service_errors_utils import DEFAULT_BACKEND_SERVICE_STATUS_CODES from ...models.basic_types import VersionStr @@ -24,17 +22,18 @@ JobInputs, JobMetadata, JobMetadataUpdate, + JobPricingSpecification, JobStatus, ) from ...models.schemas.solvers import Solver, SolverKeyId from ...services_http.director_v2 import DirectorV2Api -from ...services_http.jobs import replace_custom_metadata, start_project, stop_project +from ...services_http.jobs import replace_custom_metadata, stop_project from ...services_http.solver_job_models_converters import ( - create_jobstatus_from_task, + get_solver_job_rest_interface_links, ) from ..dependencies.application import get_reverse_url_mapper from ..dependencies.authentication import get_current_user_id -from ..dependencies.services import get_api_client, get_job_service, get_solver_service +from ..dependencies.services import get_api_client, get_job_service from ..dependencies.webserver_http import AuthSession, get_webserver_session from ._constants import ( FMSG_CHANGELOG_ADDED_IN_VERSION, @@ -48,14 +47,6 @@ router = APIRouter() -def compose_job_resource_name(solver_key, solver_version, job_id) -> str: - """Creates a unique resource name for solver's jobs""" - return Job.compose_resource_name( - parent_name=Solver.compose_resource_name(solver_key, solver_version), - job_id=job_id, - ) - - # JOBS --------------- # # - Similar to docker container's API design (container = job and image = solver) @@ -97,7 +88,6 @@ async def create_solver_job( # noqa: PLR0913 solver_key: SolverKeyId, version: VersionStr, inputs: JobInputs, - solver_service: Annotated[SolverService, Depends(get_solver_service)], job_service: Annotated[JobService, Depends(get_job_service)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], hidden: Annotated[bool, Query()] = True, @@ -109,24 +99,18 @@ async def create_solver_job( # noqa: PLR0913 NOTE: This operation does **not** start the job """ - # ensures user has access to solver - solver = await solver_service.get_solver( + return await job_service.create_solver_job( solver_key=solver_key, - solver_version=version, - ) - job, _ = await job_service.create_job( - project_name=None, - description=None, - solver_or_program=solver, + version=version, inputs=inputs, - url_for=url_for, hidden=hidden, - parent_project_uuid=x_simcore_parent_project_uuid, - parent_node_id=x_simcore_parent_node_id, + x_simcore_parent_project_uuid=x_simcore_parent_project_uuid, + x_simcore_parent_node_id=x_simcore_parent_node_id, + job_links=get_solver_job_rest_interface_links( + url_for=url_for, solver_key=solver_key, version=version + ), ) - return job - @router.delete( "/{solver_key:path}/releases/{version}/jobs/{job_id:uuid}", @@ -145,7 +129,7 @@ async def delete_job( job_id: JobID, webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], ): - job_name = compose_job_resource_name(solver_key, version, job_id) + job_name = compose_solver_job_resource_name(solver_key, version, job_id) _logger.debug("Deleting Job '%s'", job_name) await webserver_api.delete_project(project_id=job_id) @@ -224,52 +208,27 @@ async def start_job( solver_key: SolverKeyId, version: VersionStr, job_id: JobID, - user_id: Annotated[PositiveInt, Depends(get_current_user_id)], - director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], - webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], job_service: Annotated[JobService, Depends(get_job_service)], cluster_id: Annotated[ # pylint: disable=unused-argument # noqa: ARG001 ClusterID | None, Query(deprecated=True) ] = None, ): - job_name = compose_job_resource_name(solver_key, version, job_id) - _logger.debug("Start Job '%s'", job_name) - - job_parent_resource_name = Solver.compose_resource_name(solver_key, version) - job = await job_service.get_job( - job_id=job_id, job_parent_resource_name=job_parent_resource_name - ) - if job.storage_assets_deleted: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=f"Assets for job job_id={job_id} are missing", - ) + pricing_spec = JobPricingSpecification.create_from_headers(headers=request.headers) try: - await start_project( - request=request, - job_id=job_id, - expected_job_name=job_name, - webserver_api=webserver_api, - ) - except ProjectAlreadyStartedError: - job_status = await inspect_job( + return await job_service.start_solver_job( solver_key=solver_key, version=version, job_id=job_id, - user_id=user_id, - director2_api=director2_api, + pricing_spec=pricing_spec, + ) + except ProjectAlreadyStartedError: + job_status = await job_service.inspect_solver_job( + solver_key=solver_key, version=version, job_id=job_id ) return JSONResponse( - status_code=status.HTTP_200_OK, content=jsonable_encoder(job_status) + status_code=status.HTTP_200_OK, content=job_status.model_dump(mode="json") ) - return await inspect_job( - solver_key=solver_key, - version=version, - job_id=job_id, - user_id=user_id, - director2_api=director2_api, - ) @router.post( @@ -290,7 +249,7 @@ async def stop_job( user_id: Annotated[PositiveInt, Depends(get_current_user_id)], director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], ): - job_name = compose_job_resource_name(solver_key, version, job_id) + job_name = compose_solver_job_resource_name(solver_key, version, job_id) _logger.debug("Stopping Job '%s'", job_name) return await stop_project( @@ -313,15 +272,14 @@ async def inspect_job( solver_key: SolverKeyId, version: VersionStr, job_id: JobID, - user_id: Annotated[PositiveInt, Depends(get_current_user_id)], - director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], + job_service: Annotated[JobService, Depends(get_job_service)], ) -> JobStatus: - job_name = compose_job_resource_name(solver_key, version, job_id) + job_name = compose_solver_job_resource_name(solver_key, version, job_id) _logger.debug("Inspecting Job '%s'", job_name) - task = await director2_api.get_computation(project_id=job_id, user_id=user_id) - job_status: JobStatus = create_jobstatus_from_task(task) - return job_status + return await job_service.inspect_solver_job( + solver_key=solver_key, version=version, job_id=job_id + ) @router.patch( @@ -343,7 +301,7 @@ async def replace_job_custom_metadata( webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], ): - job_name = compose_job_resource_name(solver_key, version, job_id) + job_name = compose_solver_job_resource_name(solver_key, version, job_id) _logger.debug("Custom metadata for '%s'", job_name) return await replace_custom_metadata( diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_read.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_read.py index 30136b408e75..8aafc0aeb798 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_read.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_read.py @@ -21,7 +21,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine from starlette.background import BackgroundTask -from ..._service_jobs import JobService +from ..._service_jobs import JobService, compose_solver_job_resource_name from ..._service_solvers import SolverService from ...exceptions.custom_errors import InsufficientCreditsError, MissingWalletError from ...exceptions.service_errors_utils import DEFAULT_BACKEND_SERVICE_STATUS_CODES @@ -51,7 +51,10 @@ raise_if_job_not_associated_with_solver, ) from ...services_http.log_streaming import LogDistributor, LogStreamer -from ...services_http.solver_job_models_converters import create_job_from_project +from ...services_http.solver_job_models_converters import ( + create_job_from_project, + get_solver_job_rest_interface_links, +) from ...services_http.solver_job_outputs import ResultsTypes, get_solver_output_results from ...services_http.storage import StorageApi, to_file_api_model from ..dependencies.application import get_reverse_url_mapper @@ -69,7 +72,6 @@ from .solvers_jobs import ( JOBS_STATUS_CODES, METADATA_STATUS_CODES, - compose_job_resource_name, ) from .wallets import WALLET_STATUS_CODES @@ -141,11 +143,11 @@ async def list_all_solvers_jobs( filter_job_metadata_params: Annotated[ JobMetadataFilter | None, Depends(get_job_metadata_filter) ], - solver_service: Annotated[SolverService, Depends(get_solver_service)], + job_service: Annotated[JobService, Depends(get_job_service)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], ): - jobs, meta = await solver_service.list_jobs( + jobs, meta = await job_service.list_solver_jobs( filter_any_custom_metadata=( [ NameValueTuple(filter_metadata.name, filter_metadata.pattern) @@ -204,9 +206,14 @@ async def list_jobs( ) jobs: deque[Job] = deque() + job_rest_interface_links = get_solver_job_rest_interface_links( + url_for=url_for, solver_key=solver_key, version=solver.version + ) for prj in projects_page.data: job = create_job_from_project( - solver_or_program=solver, project=prj, url_for=url_for + solver_or_program=solver, + project=prj, + job_links=job_rest_interface_links, ) assert job.id == prj.uuid # nosec assert job.name == prj.name # nosec @@ -248,9 +255,16 @@ async def list_jobs_paginated( projects_page = await webserver_api.get_projects_w_solver_page( solver_name=solver.name, limit=page_params.limit, offset=page_params.offset ) + job_rest_interface_links = get_solver_job_rest_interface_links( + url_for=url_for, solver_key=solver_key, version=version + ) jobs: list[Job] = [ - create_job_from_project(solver_or_program=solver, project=prj, url_for=url_for) + create_job_from_project( + solver_or_program=solver, + project=prj, + job_links=job_rest_interface_links, + ) for prj in projects_page.data ] @@ -276,7 +290,8 @@ async def get_job( ): """Gets job of a given solver""" _logger.debug( - "Getting Job '%s'", compose_job_resource_name(solver_key, version, job_id) + "Getting Job '%s'", + compose_solver_job_resource_name(solver_key, version, job_id), ) solver = await solver_service.get_solver( @@ -285,8 +300,14 @@ async def get_job( ) project: ProjectGet = await webserver_api.get_project(project_id=job_id) + job_rest_interface_links = get_solver_job_rest_interface_links( + url_for=url_for, solver_key=solver_key, version=version + ) + job = create_job_from_project( - solver_or_program=solver, project=project, url_for=url_for + solver_or_program=solver, + project=project, + job_links=job_rest_interface_links, ) assert job.id == job_id # nosec return job # nosec @@ -313,7 +334,7 @@ async def get_job_outputs( job_service: Annotated[JobService, Depends(get_job_service)], storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))], ): - job_name = compose_job_resource_name(solver_key, version, job_id) + job_name = compose_solver_job_resource_name(solver_key, version, job_id) _logger.debug("Get Job '%s' outputs", job_name) project_marked_as_job = await job_service.get_job( @@ -396,7 +417,7 @@ async def get_job_output_logfile( user_id: Annotated[PositiveInt, Depends(get_current_user_id)], director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], ): - job_name = compose_job_resource_name(solver_key, version, job_id) + job_name = compose_solver_job_resource_name(solver_key, version, job_id) _logger.debug("Get Job '%s' outputs logfile", job_name) project_id = job_id @@ -452,7 +473,7 @@ async def get_job_custom_metadata( webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], ): - job_name = compose_job_resource_name(solver_key, version, job_id) + job_name = compose_solver_job_resource_name(solver_key, version, job_id) _logger.debug("Custom metadata for '%s'", job_name) return await get_custom_metadata( @@ -482,7 +503,7 @@ async def get_job_wallet( job_id: JobID, webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], ) -> WalletGetWithAvailableCreditsLegacy: - job_name = compose_job_resource_name(solver_key, version, job_id) + job_name = compose_solver_job_resource_name(solver_key, version, job_id) _logger.debug("Getting wallet for job '%s'", job_name) if project_wallet := await webserver_api.get_project_wallet(project_id=job_id): @@ -505,7 +526,7 @@ async def get_job_pricing_unit( job_id: JobID, webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], ): - job_name = compose_job_resource_name(solver_key, version, job_id) + job_name = compose_solver_job_resource_name(solver_key, version, job_id) with log_context(_logger, logging.DEBUG, "Get pricing unit"): _logger.debug("job: %s", job_name) project: ProjectGet = await webserver_api.get_project(project_id=job_id) @@ -536,7 +557,7 @@ async def get_log_stream( ): assert request # nosec - job_name = compose_job_resource_name(solver_key, version, job_id) + job_name = compose_solver_job_resource_name(solver_key, version, job_id) with log_context( _logger, logging.DEBUG, f"Streaming logs for {job_name=} and {user_id=}" ): diff --git a/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py b/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py index 8fd7e224eb8f..4eb72ed7788f 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py @@ -1,23 +1,18 @@ import logging from collections.abc import Callable from typing import Annotated -from uuid import UUID from fastapi import APIRouter, Depends, Header, Query, Request, status from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse from fastapi_pagination.api import create_page -from models_library.api_schemas_webserver.projects import ProjectPatch -from models_library.api_schemas_webserver.projects_nodes import NodeOutputs from models_library.clusters import ClusterID -from models_library.function_services_catalog.services import file_picker from models_library.projects import ProjectID -from models_library.projects_nodes import InputID, InputTypes from models_library.projects_nodes_io import NodeID from pydantic import HttpUrl, PositiveInt from servicelib.logging_utils import log_context -from ..._service_studies import StudyService +from ..._service_jobs import JobService, compose_study_job_resource_name from ...exceptions.backend_errors import ProjectAlreadyStartedError from ...models.api_resources import parse_resources_ids from ...models.pagination import Page, PaginationParams @@ -29,30 +24,26 @@ JobMetadata, JobMetadataUpdate, JobOutputs, + JobPricingSpecification, JobStatus, ) -from ...models.schemas.studies import JobLogsMap, Study, StudyID +from ...models.schemas.studies import JobLogsMap, StudyID from ...services_http.director_v2 import DirectorV2Api from ...services_http.jobs import ( get_custom_metadata, replace_custom_metadata, - start_project, stop_project, ) -from ...services_http.solver_job_models_converters import create_jobstatus_from_task from ...services_http.storage import StorageApi from ...services_http.study_job_models_converters import ( - create_job_from_study, create_job_outputs_from_project_outputs, - get_project_and_file_inputs_from_job_inputs, + get_study_job_rest_interface_links, ) from ...services_http.webserver import AuthSession -from ...services_rpc.wb_api_server import WbApiRpcClient from ..dependencies.application import get_reverse_url_mapper -from ..dependencies.authentication import get_current_user_id, get_product_name -from ..dependencies.services import get_api_client, get_study_service +from ..dependencies.authentication import get_current_user_id +from ..dependencies.services import get_api_client, get_job_service from ..dependencies.webserver_http import get_webserver_session -from ..dependencies.webserver_rpc import get_wb_api_rpc_client from ._constants import ( FMSG_CHANGELOG_CHANGED_IN_VERSION, FMSG_CHANGELOG_NEW_IN_VERSION, @@ -66,14 +57,6 @@ router = APIRouter() -def _compose_job_resource_name(study_key, job_id) -> str: - """Creates a unique resource name for solver's jobs""" - return Job.compose_resource_name( - parent_name=Study.compose_resource_name(study_key), - job_id=job_id, - ) - - @router.get( "/{study_id:uuid}/jobs", response_model=Page[Job], @@ -88,13 +71,13 @@ def _compose_job_resource_name(study_key, job_id) -> str: async def list_study_jobs( study_id: StudyID, page_params: Annotated[PaginationParams, Depends()], - study_service: Annotated[StudyService, Depends(get_study_service)], + job_service: Annotated[JobService, Depends(get_job_service)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], ): msg = f"list study jobs study_id={study_id!r} with pagination={page_params!r}. SEE https://github.com/ITISFoundation/osparc-simcore/issues/4177" _logger.debug(msg) - jobs, meta = await study_service.list_jobs( + jobs, meta = await job_service.list_study_jobs( filter_by_study_id=study_id, pagination_offset=page_params.offset, pagination_limit=page_params.limit, @@ -121,11 +104,8 @@ async def list_study_jobs( async def create_study_job( study_id: StudyID, job_inputs: JobInputs, - webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], - wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], - user_id: Annotated[PositiveInt, Depends(get_current_user_id)], - product_name: Annotated[str, Depends(get_product_name)], + job_service: Annotated[JobService, Depends(get_job_service)], hidden: Annotated[bool, Query()] = True, # noqa: FBT002 x_simcore_parent_project_uuid: Annotated[ProjectID | None, Header()] = None, x_simcore_parent_node_id: Annotated[NodeID | None, Header()] = None, @@ -133,15 +113,17 @@ async def create_study_job( """ hidden -- if True (default) hides project from UI """ - project = await webserver_api.clone_project( - project_id=study_id, + + job_links = get_study_job_rest_interface_links(url_for=url_for, study_id=study_id) + job = await job_service.create_studies_job( + study_id=study_id, + job_inputs=job_inputs, + x_simcore_parent_project_uuid=x_simcore_parent_project_uuid, + x_simcore_parent_node_id=x_simcore_parent_node_id, + job_links=job_links, hidden=hidden, - parent_project_uuid=x_simcore_parent_project_uuid, - parent_node_id=x_simcore_parent_node_id, - ) - job = create_job_from_study( - study_key=study_id, project=project, job_inputs=job_inputs ) + assert job.name == compose_study_job_resource_name(study_id, job.id) job.url = url_for( "get_study_job", study_id=study_id, @@ -153,54 +135,6 @@ async def create_study_job( study_id=study_id, job_id=job.id, ) - - await webserver_api.patch_project( - project_id=job.id, - patch_params=ProjectPatch(name=job.name), - ) - - await wb_api_rpc.mark_project_as_job( - product_name=product_name, - user_id=user_id, - project_uuid=job.id, - job_parent_resource_name=job.runner_name, - storage_assets_deleted=False, - ) - - project_inputs = await webserver_api.get_project_inputs(project_id=project.uuid) - - file_param_nodes = {} - for node_id, node in project.workbench.items(): - if ( - node.key == file_picker.META.key - and node.outputs is not None - and len(node.outputs) == 0 - ): - file_param_nodes[node.label] = node_id - - file_inputs: dict[InputID, InputTypes] = {} - - ( - new_project_inputs, - new_project_file_inputs, - ) = get_project_and_file_inputs_from_job_inputs( - project_inputs, file_inputs, job_inputs - ) - - for node_label, file_link in new_project_file_inputs.items(): - await webserver_api.update_node_outputs( - project_id=project.uuid, - node_id=UUID(file_param_nodes[node_label]), - new_node_outputs=NodeOutputs(outputs={"outFile": file_link}), - ) - - if len(new_project_inputs) > 0: - await webserver_api.update_project_inputs( - project_id=project.uuid, new_inputs=new_project_inputs - ) - - assert job.name == _compose_job_resource_name(study_id, job.id) - return job @@ -219,9 +153,7 @@ async def create_study_job( async def get_study_job( study_id: StudyID, job_id: JobID, - study_service: Annotated[StudyService, Depends(get_study_service)], ): - assert study_service # nosec msg = f"get study job study_id={study_id!r} job_id={job_id!r}. SEE https://github.com/ITISFoundation/osparc-simcore/issues/4177" raise NotImplementedError(msg) @@ -237,7 +169,7 @@ async def delete_study_job( webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], ): """Deletes an existing study job""" - job_name = _compose_job_resource_name(study_id, job_id) + job_name = compose_study_job_resource_name(study_id, job_id) with log_context(_logger, logging.DEBUG, f"Deleting Job '{job_name}'"): await webserver_api.delete_project(project_id=job_id) @@ -274,9 +206,7 @@ async def start_study_job( request: Request, study_id: StudyID, job_id: JobID, - user_id: Annotated[PositiveInt, Depends(get_current_user_id)], - webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], - director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], + job_service: Annotated[JobService, Depends(get_job_service)], cluster_id: Annotated[ # pylint: disable=unused-argument # noqa: ARG001 ClusterID | None, Query( @@ -291,31 +221,23 @@ async def start_study_job( ), ] = None, ): - job_name = _compose_job_resource_name(study_id, job_id) + pricing_spec = JobPricingSpecification.create_from_headers(headers=request.headers) + + job_name = compose_study_job_resource_name(study_id, job_id) with log_context(_logger, logging.DEBUG, f"Starting Job '{job_name}'"): try: - await start_project( - request=request, + return await job_service.start_study_job( + study_id=study_id, job_id=job_id, - expected_job_name=job_name, - webserver_api=webserver_api, + pricing_spec=pricing_spec, ) except ProjectAlreadyStartedError: - job_status: JobStatus = await inspect_study_job( - study_id=study_id, + job_status: JobStatus = await job_service.inspect_study_job( job_id=job_id, - user_id=user_id, - director2_api=director2_api, ) return JSONResponse( content=jsonable_encoder(job_status), status_code=status.HTTP_200_OK ) - return await inspect_study_job( - study_id=study_id, - job_id=job_id, - user_id=user_id, - director2_api=director2_api, - ) @router.post( @@ -328,7 +250,7 @@ async def stop_study_job( user_id: Annotated[PositiveInt, Depends(get_current_user_id)], director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], ): - job_name = _compose_job_resource_name(study_id, job_id) + job_name = compose_study_job_resource_name(study_id, job_id) with log_context(_logger, logging.DEBUG, f"Stopping Job '{job_name}'"): return await stop_project( job_id=job_id, user_id=user_id, director2_api=director2_api @@ -342,15 +264,12 @@ async def stop_study_job( async def inspect_study_job( study_id: StudyID, job_id: JobID, - user_id: Annotated[PositiveInt, Depends(get_current_user_id)], - director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], + job_service: Annotated[JobService, Depends(get_job_service)], ) -> JobStatus: - job_name = _compose_job_resource_name(study_id, job_id) + job_name = compose_study_job_resource_name(study_id, job_id) _logger.debug("Inspecting Job '%s'", job_name) - task = await director2_api.get_computation(project_id=job_id, user_id=user_id) - job_status: JobStatus = create_jobstatus_from_task(task) - return job_status + return await job_service.inspect_study_job(job_id=job_id) @router.post( @@ -364,7 +283,7 @@ async def get_study_job_outputs( webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))], ): - job_name = _compose_job_resource_name(study_id, job_id) + job_name = compose_study_job_resource_name(study_id, job_id) _logger.debug("Getting Job Outputs for '%s'", job_name) project_outputs = await webserver_api.get_project_outputs(project_id=job_id) @@ -411,7 +330,7 @@ async def get_study_job_custom_metadata( webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], ): - job_name = _compose_job_resource_name(study_id, job_id) + job_name = compose_study_job_resource_name(study_id, job_id) msg = f"Gets metadata attached to study_id={study_id!r} job_id={job_id!r}.\njob_name={job_name!r}.\nSEE https://github.com/ITISFoundation/osparc-simcore/issues/4313" _logger.debug(msg) @@ -442,7 +361,7 @@ async def replace_study_job_custom_metadata( webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], url_for: Annotated[Callable, Depends(get_reverse_url_mapper)], ): - job_name = _compose_job_resource_name(study_id, job_id) + job_name = compose_study_job_resource_name(study_id, job_id) msg = f"Attaches metadata={replace.metadata!r} to study_id={study_id!r} job_id={job_id!r}.\njob_name={job_name!r}.\nSEE https://github.com/ITISFoundation/osparc-simcore/issues/4313" _logger.debug(msg) diff --git a/services/api-server/src/simcore_service_api_server/exceptions/backend_errors.py b/services/api-server/src/simcore_service_api_server/exceptions/backend_errors.py index db7c88f6ca04..33960e49f6bb 100644 --- a/services/api-server/src/simcore_service_api_server/exceptions/backend_errors.py +++ b/services/api-server/src/simcore_service_api_server/exceptions/backend_errors.py @@ -140,3 +140,8 @@ class CanNotCheckoutServiceIsNotRunningError(BaseBackEndError): class LicensedItemCheckoutNotFoundError(BaseBackEndError): msg_template = "Licensed item checkout {licensed_item_checkout_id} not found." status_code = status.HTTP_404_NOT_FOUND + + +class JobAssetsMissingError(BaseBackEndError): + msg_template = "Job assets missing for job {job_id}" + status_code = status.HTTP_409_CONFLICT diff --git a/services/api-server/src/simcore_service_api_server/models/api_resources.py b/services/api-server/src/simcore_service_api_server/models/api_resources.py index 939012bbf571..b16a0414b834 100644 --- a/services/api-server/src/simcore_service_api_server/models/api_resources.py +++ b/services/api-server/src/simcore_service_api_server/models/api_resources.py @@ -1,8 +1,10 @@ import re import urllib.parse from typing import Annotated, TypeAlias +from uuid import UUID -from pydantic import Field, TypeAdapter +import parse # type: ignore[import-untyped] +from pydantic import AfterValidator, BaseModel, Field, HttpUrl, TypeAdapter from pydantic.types import StringConstraints # RESOURCE NAMES https://google.aip.dev/122 @@ -26,7 +28,6 @@ # SEE https://tools.ietf.org/html/rfc3986#appendix-B # - _RELATIVE_RESOURCE_NAME_RE = r"^([^\s/]+/?){1,10}$" @@ -91,3 +92,39 @@ def split_resource_name_as_dict( """ parts = split_resource_name(resource_name) return dict(zip(parts[::2], parts[1::2], strict=False)) + + +def _url_missing_only_job_id(url: str | None) -> str | None: + if url is None: + return None + if set(parse.compile(url).named_fields) != {"job_id"}: + raise ValueError(f"Missing job_id in {url=}") + return url + + +class JobLinks(BaseModel): + url_template: Annotated[str | None, AfterValidator(_url_missing_only_job_id)] + runner_url_template: str | None + outputs_url_template: Annotated[ + str | None, AfterValidator(_url_missing_only_job_id) + ] + + def url(self, job_id: UUID) -> HttpUrl | None: + if self.url_template is None: + return None + return TypeAdapter(HttpUrl).validate_python( + self.url_template.format(job_id=job_id) + ) + + def runner_url(self, job_id: UUID) -> HttpUrl | None: + assert job_id # nosec + if self.runner_url_template is None: + return None + return TypeAdapter(HttpUrl).validate_python(self.runner_url_template) + + def outputs_url(self, job_id: UUID) -> HttpUrl | None: + if self.outputs_url_template is None: + return None + return TypeAdapter(HttpUrl).validate_python( + self.outputs_url_template.format(job_id=job_id) + ) diff --git a/services/api-server/src/simcore_service_api_server/models/schemas/jobs.py b/services/api-server/src/simcore_service_api_server/models/schemas/jobs.py index 08bfe901bbc4..1e3c6d6d0e68 100644 --- a/services/api-server/src/simcore_service_api_server/models/schemas/jobs.py +++ b/services/api-server/src/simcore_service_api_server/models/schemas/jobs.py @@ -1,7 +1,6 @@ import datetime import hashlib import logging -from collections.abc import Callable from pathlib import Path from typing import Annotated, TypeAlias from uuid import UUID, uuid4 @@ -40,6 +39,7 @@ from ..domain.files import FileInProgramJobData from ..schemas.files import UserFile from .base import ApiServerInputSchema +from .programs import ProgramKeyId # JOB SUB-RESOURCES ---------- # @@ -47,8 +47,6 @@ # - Input/outputs are defined in service metadata # - custom metadata # -from .programs import Program, ProgramKeyId -from .solvers import Solver JobID: TypeAlias = UUID @@ -324,44 +322,6 @@ def resource_name(self) -> str: return self.name -def get_url( - solver_or_program: Solver | Program, url_for: Callable[..., HttpUrl], job_id: JobID -) -> HttpUrl | None: - if isinstance(solver_or_program, Solver): - return url_for( - "get_job", - solver_key=solver_or_program.id, - version=solver_or_program.version, - job_id=job_id, - ) - return None - - -def get_runner_url( - solver_or_program: Solver | Program, url_for: Callable[..., HttpUrl] -) -> HttpUrl | None: - if isinstance(solver_or_program, Solver): - return url_for( - "get_solver_release", - solver_key=solver_or_program.id, - version=solver_or_program.version, - ) - return None - - -def get_outputs_url( - solver_or_program: Solver | Program, url_for: Callable[..., HttpUrl], job_id: JobID -) -> HttpUrl | None: - if isinstance(solver_or_program, Solver): - return url_for( - "get_job_outputs", - solver_key=solver_or_program.id, - version=solver_or_program.version, - job_id=job_id, - ) - return None - - PercentageInt: TypeAlias = Annotated[int, Field(ge=0, le=100)] diff --git a/services/api-server/src/simcore_service_api_server/services_http/jobs.py b/services/api-server/src/simcore_service_api_server/services_http/jobs.py index ed2ef50d5881..32cbe4b4cd6c 100644 --- a/services/api-server/src/simcore_service_api_server/services_http/jobs.py +++ b/services/api-server/src/simcore_service_api_server/services_http/jobs.py @@ -1,15 +1,12 @@ import logging -from typing import Annotated from uuid import UUID -from fastapi import Depends, HTTPException, Request, status from models_library.api_schemas_webserver.projects import ProjectGet -from pydantic import HttpUrl, PositiveInt +from models_library.users import UserID +from pydantic import HttpUrl from servicelib.logging_utils import log_context -from ..api.dependencies.authentication import get_current_user_id -from ..api.dependencies.services import get_api_client -from ..api.dependencies.webserver_http import get_webserver_session +from ..exceptions.backend_errors import InvalidInputError from ..models.schemas.jobs import ( JobID, JobMetadata, @@ -28,20 +25,17 @@ def raise_if_job_not_associated_with_solver( expected_project_name: str, project: ProjectGet ) -> None: if expected_project_name != project.name: - raise HTTPException( - status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Invalid input data for job {project.uuid}", - ) + raise InvalidInputError() async def start_project( *, - request: Request, job_id: JobID, expected_job_name: str, - webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], + pricing_spec: JobPricingSpecification | None, + webserver_api: AuthSession, ) -> None: - if pricing_spec := JobPricingSpecification.create_from_headers(request.headers): + if pricing_spec is not None: with log_context(_logger, logging.DEBUG, "Set pricing plan and unit"): project: ProjectGet = await webserver_api.get_project(project_id=job_id) raise_if_job_not_associated_with_solver(expected_job_name, project) @@ -60,8 +54,8 @@ async def start_project( async def stop_project( *, job_id: JobID, - user_id: Annotated[PositiveInt, Depends(get_current_user_id)], - director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], + user_id: UserID, + director2_api: DirectorV2Api, ) -> JobStatus: await director2_api.stop_computation(project_id=job_id, user_id=user_id) diff --git a/services/api-server/src/simcore_service_api_server/services_http/solver_job_models_converters.py b/services/api-server/src/simcore_service_api_server/services_http/solver_job_models_converters.py index 3a6728f478a8..d57d9614a83f 100644 --- a/services/api-server/src/simcore_service_api_server/services_http/solver_job_models_converters.py +++ b/services/api-server/src/simcore_service_api_server/services_http/solver_job_models_converters.py @@ -11,10 +11,11 @@ import arrow from models_library.api_schemas_webserver.projects import ProjectCreateNew, ProjectGet from models_library.api_schemas_webserver.projects_ui import StudyUI -from models_library.basic_types import KeyIDStr +from models_library.basic_types import KeyIDStr, VersionStr from models_library.projects import Project from models_library.projects_nodes import InputID -from pydantic import HttpUrl, TypeAdapter +from pydantic import TypeAdapter +from simcore_service_api_server.models.api_resources import JobLinks from ..models.domain.projects import InputTypes, Node, SimCoreFileLink from ..models.schemas.files import File @@ -24,12 +25,9 @@ JobInputs, JobStatus, PercentageInt, - get_outputs_url, - get_runner_url, - get_url, ) from ..models.schemas.programs import Program -from ..models.schemas.solvers import Solver +from ..models.schemas.solvers import Solver, SolverKeyId from .director_v2 import ComputationTaskGet # UTILS ------ @@ -184,11 +182,35 @@ def create_new_project_for_job( ) +def get_solver_job_rest_interface_links( + *, url_for: Callable, solver_key: SolverKeyId, version: VersionStr +) -> JobLinks: + return JobLinks( + url_template=url_for( + "get_job", + solver_key=solver_key, + version=version, + job_id="{job_id}", + ), + runner_url_template=url_for( + "get_solver_release", + solver_key=solver_key, + version=version, + ), + outputs_url_template=url_for( + "get_job_outputs", + solver_key=solver_key, + version=version, + job_id="{job_id}", + ), + ) + + def create_job_from_project( *, solver_or_program: Solver | Program, project: ProjectGet | Project, - url_for: Callable[..., HttpUrl], + job_links: JobLinks, ) -> Job: """ Given a project, creates a job @@ -218,13 +240,9 @@ def create_job_from_project( inputs_checksum=job_inputs.compute_checksum(), created_at=project.creation_date, # type: ignore[arg-type] runner_name=solver_or_program_name, - url=get_url( - solver_or_program=solver_or_program, url_for=url_for, job_id=job_id - ), - runner_url=get_runner_url(solver_or_program=solver_or_program, url_for=url_for), - outputs_url=get_outputs_url( - solver_or_program=solver_or_program, url_for=url_for, job_id=job_id - ), + url=job_links.url(job_id=job_id), + runner_url=job_links.runner_url(job_id=job_id), + outputs_url=job_links.outputs_url(job_id=job_id), ) diff --git a/services/api-server/src/simcore_service_api_server/services_http/study_job_models_converters.py b/services/api-server/src/simcore_service_api_server/services_http/study_job_models_converters.py index 99bb5a59ae97..d73fd5fac4ec 100644 --- a/services/api-server/src/simcore_service_api_server/services_http/study_job_models_converters.py +++ b/services/api-server/src/simcore_service_api_server/services_http/study_job_models_converters.py @@ -3,6 +3,7 @@ services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py """ +from collections.abc import Callable from typing import Any, NamedTuple from uuid import UUID @@ -16,6 +17,7 @@ from models_library.projects_nodes_io import LinkToFileTypes, NodeID, SimcoreS3FileID from pydantic import TypeAdapter +from ..models.api_resources import JobLinks from ..models.domain.files import File from ..models.domain.projects import InputTypes, SimCoreFileLink from ..models.schemas.jobs import Job, JobInputs, JobOutputs @@ -57,10 +59,26 @@ def get_project_and_file_inputs_from_job_inputs( return ProjectInputs(new_inputs, file_inputs) +def get_study_job_rest_interface_links( + *, url_for: Callable, study_id: StudyID +) -> JobLinks: + return JobLinks( + url_template=url_for( + "get_study_job", + study_id=study_id, + job_id="{job_id}", + ), + runner_url_template=url_for("get_study", study_id=study_id), + outputs_url_template=url_for( + "get_study_job_outputs", + study_id=study_id, + job_id="{job_id}", + ), + ) + + def create_job_from_study( - study_key: StudyID, - project: ProjectGet, - job_inputs: JobInputs, + study_key: StudyID, project: ProjectGet, job_inputs: JobInputs, job_links: JobLinks ) -> Job: """ Given a study, creates a job @@ -78,9 +96,9 @@ def create_job_from_study( inputs_checksum=job_inputs.compute_checksum(), created_at=DateTimeStr.to_datetime(project.creation_date), runner_name=study_name, - url=None, - runner_url=None, - outputs_url=None, + url=job_links.url(job_id=project.uuid), + runner_url=job_links.runner_url(job_id=project.uuid), + outputs_url=job_links.outputs_url(job_id=project.uuid), ) diff --git a/services/api-server/tests/unit/api_functions/conftest.py b/services/api-server/tests/unit/api_functions/conftest.py index 913b9803f1f9..3d78c4515986 100644 --- a/services/api-server/tests/unit/api_functions/conftest.py +++ b/services/api-server/tests/unit/api_functions/conftest.py @@ -38,6 +38,7 @@ from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient +from simcore_service_api_server.api.dependencies.services import get_rabbitmq_rpc_client from simcore_service_api_server.api.routes.functions_routes import get_wb_api_rpc_client from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient @@ -71,6 +72,17 @@ async def request(self, namespace: str, method_name: str, **kwargs): return {"mocked_response": True} +@pytest.fixture +async def mock_rabbitmq_rpc_client( + app: FastAPI, mocker: MockerFixture +) -> MockerFixture: + def _(): + return DummyRpcClient() + + app.dependency_overrides[get_rabbitmq_rpc_client] = _ + return mocker + + @pytest.fixture async def mock_wb_api_server_rpc(app: FastAPI, mocker: MockerFixture) -> MockerFixture: @@ -267,19 +279,19 @@ def _mock( @pytest.fixture() -def mock_handler_in_study_jobs_rest_interface( +def mock_method_in_jobs_service( mock_wb_api_server_rpc: MockerFixture, ) -> Callable[[str, Any, Exception | None], None]: def _mock( - handler_name: str = "", + method_name: str = "", return_value: Any = None, exception: Exception | None = None, ) -> None: - from simcore_service_api_server.api.routes.functions_routes import studies_jobs + from simcore_service_api_server._service_jobs import JobService mock_wb_api_server_rpc.patch.object( - studies_jobs, - handler_name, + JobService, + method_name, return_value=return_value, side_effect=exception, ) diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_function_job_collections.py b/services/api-server/tests/unit/api_functions/test_api_routers_function_job_collections.py index 17cb6d77f540..1ae7a5820408 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_function_job_collections.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_function_job_collections.py @@ -1,3 +1,4 @@ +# pylint: disable=unused-argument import datetime from collections.abc import Callable from typing import Any @@ -12,6 +13,7 @@ RegisteredProjectFunctionJob, ) from models_library.rest_pagination import PageMetaInfoLimitOffset +from pytest_mock import MockerFixture from servicelib.aiohttp import status from simcore_service_api_server._meta import API_VTAG @@ -105,6 +107,7 @@ async def test_delete_function_job_collection( @pytest.mark.parametrize("response_type", ["page", "list"]) async def test_get_function_job_collection_jobs( client: AsyncClient, + mock_rabbitmq_rpc_client: MockerFixture, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], mock_registered_function_job_collection: RegisteredFunctionJobCollection, mock_registered_project_function_job: RegisteredProjectFunctionJob, diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py index a217ee1b6d59..6bdda8e4ecca 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py @@ -18,7 +18,7 @@ from models_library.projects_state import RunningState from models_library.rest_pagination import PageMetaInfoLimitOffset from models_library.users import UserID -from pytest_mock import MockType +from pytest_mock import MockerFixture, MockType from servicelib.aiohttp import status from simcore_service_api_server._meta import API_VTAG from simcore_service_api_server.models.schemas.jobs import JobStatus @@ -91,6 +91,7 @@ async def test_get_function_job( async def test_list_function_jobs( client: AsyncClient, + mock_rabbitmq_rpc_client: MockerFixture, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], mock_registered_project_function_job: RegisteredProjectFunctionJob, auth: httpx.BasicAuth, @@ -115,6 +116,7 @@ async def test_list_function_jobs( async def test_list_function_jobs_with_job_id_filter( client: AsyncClient, + mock_rabbitmq_rpc_client: MockerFixture, mock_handler_in_functions_rpc_interface: Callable[[str], MockType], mock_registered_project_function_job: RegisteredProjectFunctionJob, user_id: UserID, @@ -178,11 +180,12 @@ def mocked_list_function_jobs(offset: int, limit: int): @pytest.mark.parametrize("job_status", ["SUCCESS", "FAILED", "STARTED"]) async def test_get_function_job_status( + mocked_app_dependencies: None, client: AsyncClient, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], mock_registered_project_function_job: RegisteredProjectFunctionJob, mock_registered_project_function: RegisteredProjectFunction, - mock_handler_in_study_jobs_rest_interface: Callable[[str, Any], None], + mock_method_in_jobs_service: Callable[[str, Any], None], auth: httpx.BasicAuth, job_status: str, ) -> None: @@ -197,7 +200,7 @@ async def test_get_function_job_status( "get_function_job_status", FunctionJobStatus(status=job_status), ) - mock_handler_in_study_jobs_rest_interface( + mock_method_in_jobs_service( "inspect_study_job", JobStatus( job_id=uuid.uuid4(), diff --git a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py index a3e61a3ec155..f4092d3afd27 100644 --- a/services/api-server/tests/unit/api_functions/test_api_routers_functions.py +++ b/services/api-server/tests/unit/api_functions/test_api_routers_functions.py @@ -32,7 +32,7 @@ ) from models_library.rest_pagination import PageMetaInfoLimitOffset from models_library.users import UserID -from pytest_mock import MockType +from pytest_mock import MockerFixture, MockType from pytest_simcore.helpers.httpx_calls_capture_models import HttpApiCallCaptureModel from servicelib.aiohttp import status from servicelib.common_headers import ( @@ -274,6 +274,7 @@ async def test_get_function_output_schema( async def test_validate_function_inputs( client: AsyncClient, + mock_rabbitmq_rpc_client: MockerFixture, mock_handler_in_functions_rpc_interface: Callable[[str, Any], None], mock_registered_project_function: RegisteredProjectFunction, auth: httpx.BasicAuth, diff --git a/services/api-server/tests/unit/service/conftest.py b/services/api-server/tests/unit/service/conftest.py index 0dc632def76f..f6b51d091c1c 100644 --- a/services/api-server/tests/unit/service/conftest.py +++ b/services/api-server/tests/unit/service/conftest.py @@ -19,7 +19,7 @@ from simcore_service_api_server._service_jobs import JobService from simcore_service_api_server._service_programs import ProgramService from simcore_service_api_server._service_solvers import SolverService -from simcore_service_api_server._service_studies import StudyService +from simcore_service_api_server.services_http.director_v2 import DirectorV2Api from simcore_service_api_server.services_http.storage import StorageApi from simcore_service_api_server.services_http.webserver import AuthSession from simcore_service_api_server.services_rpc.catalog import CatalogService @@ -109,6 +109,11 @@ async def _create_project(project: ProjectCreateNew, **kwargs): return mock +@pytest.fixture +def director2_api(mocker: MockerFixture) -> DirectorV2Api: + return mocker.AsyncMock(spec=DirectorV2Api) + + @pytest.fixture def storage_rest_client( mocker: MockerFixture, @@ -117,27 +122,6 @@ def storage_rest_client( return mock -@pytest.fixture -def job_service( - auth_session: AuthSession, - director_v2_rpc_client: DirectorV2Service, - storage_rpc_client: StorageService, - storage_rest_client: StorageApi, - wb_api_rpc_client: WbApiRpcClient, - product_name: ProductName, - user_id: UserID, -) -> JobService: - return JobService( - _web_rest_client=auth_session, - _web_rpc_client=wb_api_rpc_client, - _storage_rpc_client=storage_rpc_client, - _storage_rest_client=storage_rest_client, - _directorv2_rpc_client=director_v2_rpc_client, - user_id=user_id, - product_name=product_name, - ) - - @pytest.fixture def catalog_service( mocked_rpc_client: MockType, @@ -152,34 +136,43 @@ def catalog_service( @pytest.fixture def solver_service( catalog_service: CatalogService, - job_service: JobService, product_name: ProductName, user_id: UserID, ) -> SolverService: return SolverService( catalog_service=catalog_service, - job_service=job_service, user_id=user_id, product_name=product_name, ) @pytest.fixture -def study_service( - job_service: JobService, +def program_service( + catalog_service: CatalogService, +) -> ProgramService: + return ProgramService(catalog_service=catalog_service) + + +@pytest.fixture +def job_service( + auth_session: AuthSession, + director_v2_rpc_client: DirectorV2Service, + storage_rpc_client: StorageService, + wb_api_rpc_client: WbApiRpcClient, + director2_api: DirectorV2Api, + storage_rest_client: StorageApi, product_name: ProductName, user_id: UserID, -) -> StudyService: - - return StudyService( - job_service=job_service, + solver_service: SolverService, +) -> JobService: + return JobService( + _web_rest_client=auth_session, + _web_rpc_client=wb_api_rpc_client, + _storage_rpc_client=storage_rpc_client, + _storage_rest_client=storage_rest_client, + _directorv2_rpc_client=director_v2_rpc_client, + _director2_api=director2_api, + _solver_service=solver_service, user_id=user_id, product_name=product_name, ) - - -@pytest.fixture -def program_service( - catalog_service: CatalogService, -) -> ProgramService: - return ProgramService(catalog_service=catalog_service) diff --git a/services/api-server/tests/unit/service/test_service_jobs.py b/services/api-server/tests/unit/service/test_service_jobs.py index d6829339507f..b8686819ed06 100644 --- a/services/api-server/tests/unit/service/test_service_jobs.py +++ b/services/api-server/tests/unit/service/test_service_jobs.py @@ -4,18 +4,22 @@ # pylint: disable=unused-variable +from faker import Faker from pytest_mock import MockType from simcore_service_api_server._service_jobs import JobService +from simcore_service_api_server.models.api_resources import JobLinks from simcore_service_api_server.models.schemas.jobs import Job, JobInputs from simcore_service_api_server.models.schemas.solvers import Solver +_faker = Faker() + async def test_list_jobs_by_resource_prefix( mocked_rpc_client: MockType, job_service: JobService, ): # Test with default pagination parameters - jobs, page_meta = await job_service.list_jobs( + jobs, page_meta = await job_service._list_jobs( job_parent_resource_name="solvers/some-solver" ) @@ -53,13 +57,19 @@ async def test_create_job( def mock_url_for(*args, **kwargs): return "https://example.com/api/v1/jobs/test-job" + job_rest_interface_links = JobLinks( + url_template=_faker.url() + "/{job_id}", + runner_url_template=_faker.url(), + outputs_url_template=_faker.url() + "/{job_id}", + ) + # Test job creation - job, project = await job_service.create_job( + job, project = await job_service.create_project_marked_as_job( solver_or_program=solver, inputs=inputs, parent_project_uuid=None, parent_node_id=None, - url_for=mock_url_for, + job_links=job_rest_interface_links, hidden=False, project_name="Test Job Project", description="Test description", diff --git a/services/api-server/tests/unit/service/test_service_solvers.py b/services/api-server/tests/unit/service/test_service_solvers.py index 7c912e1657d4..191eb50969ea 100644 --- a/services/api-server/tests/unit/service/test_service_solvers.py +++ b/services/api-server/tests/unit/service/test_service_solvers.py @@ -46,10 +46,10 @@ async def test_get_solver( async def test_list_jobs( mocked_rpc_client: MockType, - solver_service: SolverService, + job_service: JobService, ): # Test default parameters - jobs, page_meta = await solver_service.list_jobs() + jobs, page_meta = await job_service.list_solver_jobs() assert jobs assert len(jobs) == page_meta.count @@ -77,7 +77,6 @@ async def test_solver_service_init_raises_configuration_error( with pytest.raises(ServiceConfigurationError, match="SolverService"): SolverService( catalog_service=catalog_service, - job_service=job_service, user_id=user_id, product_name=invalid_product_name, ) diff --git a/services/api-server/tests/unit/service/test_service_studies.py b/services/api-server/tests/unit/service/test_service_studies.py index fa9b9921866e..5cf292340869 100644 --- a/services/api-server/tests/unit/service/test_service_studies.py +++ b/services/api-server/tests/unit/service/test_service_studies.py @@ -4,16 +4,15 @@ # pylint: disable=unused-variable from pytest_mock import MockType -from simcore_service_api_server._service_studies import StudyService +from simcore_service_api_server._service_jobs import JobService from simcore_service_api_server.models.schemas.studies import StudyID async def test_list_jobs_no_study_id( - mocked_rpc_client: MockType, - study_service: StudyService, + mocked_rpc_client: MockType, job_service: JobService ): # Test with default parameters - jobs, page_meta = await study_service.list_jobs() + jobs, page_meta = await job_service.list_study_jobs() assert isinstance(jobs, list) assert mocked_rpc_client.request.call_args.args == ( @@ -41,11 +40,11 @@ async def test_list_jobs_no_study_id( async def test_list_jobs_with_study_id( mocked_rpc_client: MockType, - study_service: StudyService, + job_service: JobService, ): # Test with a specific study ID study_id = StudyID("914c7c33-8fb6-4164-9787-7b88b5c148bf") - jobs, page_meta = await study_service.list_jobs(filter_by_study_id=study_id) + jobs, page_meta = await job_service.list_study_jobs(filter_by_study_id=study_id) assert isinstance(jobs, list) diff --git a/services/api-server/tests/unit/test_api_solver_jobs.py b/services/api-server/tests/unit/test_api_solver_jobs.py index 33363cb4e339..8d2129a7b459 100644 --- a/services/api-server/tests/unit/test_api_solver_jobs.py +++ b/services/api-server/tests/unit/test_api_solver_jobs.py @@ -206,6 +206,7 @@ def _get_pricing_unit_side_effect( ], ) async def test_start_solver_job_pricing_unit_with_payment( + mocked_app_dependencies: None, client: AsyncClient, mocked_webserver_rest_api_base: MockRouter, mocked_directorv2_rest_api_base: MockRouter, @@ -282,6 +283,7 @@ def _put_pricing_plan_and_unit_side_effect( async def test_get_solver_job_pricing_unit_no_payment( + mocked_app_dependencies: None, client: AsyncClient, mocked_webserver_rest_api_base: MockRouter, mocked_directorv2_rest_api_base: MockRouter, @@ -316,6 +318,7 @@ async def test_get_solver_job_pricing_unit_no_payment( async def test_start_solver_job_conflict( + mocked_app_dependencies: None, client: AsyncClient, mocked_webserver_rest_api_base: MockRouter, mocked_directorv2_rest_api_base: MockRouter, diff --git a/services/api-server/tests/unit/test_services_solver_job_models_converters.py b/services/api-server/tests/unit/test_services_solver_job_models_converters.py index deae3ad65875..97a777387a7c 100644 --- a/services/api-server/tests/unit/test_services_solver_job_models_converters.py +++ b/services/api-server/tests/unit/test_services_solver_job_models_converters.py @@ -6,7 +6,8 @@ from faker import Faker from models_library.projects import Project from models_library.projects_nodes import InputsDict, InputTypes, SimCoreFileLink -from pydantic import HttpUrl, RootModel, TypeAdapter, create_model +from pydantic import RootModel, TypeAdapter, create_model +from simcore_service_api_server.models.api_resources import JobLinks from simcore_service_api_server.models.schemas.files import File from simcore_service_api_server.models.schemas.jobs import ArgumentTypes, Job, JobInputs from simcore_service_api_server.models.schemas.solvers import Solver @@ -208,8 +209,11 @@ def test_create_job_from_project(faker: Faker): solver_key = "simcore/services/comp/itis/sleeper" solver_version = "2.0.2" - def fake_url_for(*args, **kwargs) -> HttpUrl: - return HttpUrl(faker.url()) + fake_job_links = JobLinks( + url_template=faker.url() + "/{job_id}", + runner_url_template=faker.url(), + outputs_url_template=faker.url() + "/{job_id}", + ) solver = Solver( id=solver_key, @@ -221,7 +225,9 @@ def fake_url_for(*args, **kwargs) -> HttpUrl: ) job = create_job_from_project( - solver_or_program=solver, project=project, url_for=fake_url_for + solver_or_program=solver, + project=project, + job_links=fake_job_links, ) assert job.id == project.uuid