Skip to content
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4b6ade0
first cleanup
bisgaard-itis Aug 15, 2025
9caa628
minor fixes
bisgaard-itis Aug 15, 2025
ce16bc4
fix tests after refactoring
bisgaard-itis Aug 15, 2025
3520ce4
Merge branch 'master' into 1973-refactor-create-start-inspect-methods
bisgaard-itis Aug 15, 2025
3378148
move start_solver_job to job_service
bisgaard-itis Aug 15, 2025
1d86dc3
cleanup
bisgaard-itis Aug 15, 2025
aa193c7
move create study job function to job_service
bisgaard-itis Aug 15, 2025
8e488c6
make typecheck happy
bisgaard-itis Aug 15, 2025
4aa2c9c
Merge branch 'master' into 1973-refactor-create-start-inspect-methods
bisgaard-itis Aug 15, 2025
69af729
refactor inspect study job method
bisgaard-itis Aug 15, 2025
d07fb51
bugfixes
bisgaard-itis Aug 15, 2025
983ead6
minor fix
bisgaard-itis Aug 15, 2025
4b78421
Merge branch 'master' into 1973-refactor-create-start-inspect-methods
bisgaard-itis Aug 18, 2025
50aade2
port start_study_job code to job_service
bisgaard-itis Aug 18, 2025
8e7c9f0
move inspect function job to function_job_service
bisgaard-itis Aug 18, 2025
5033949
move run function and map endpoint to function_job_service
bisgaard-itis Aug 18, 2025
c5a63e6
move validate_function_inputs
bisgaard-itis Aug 18, 2025
9204637
Merge branch 'master' into 1973-refactor-create-start-inspect-methods
bisgaard-itis Aug 18, 2025
3d3b341
minor change
bisgaard-itis Aug 18, 2025
5f96aef
test fix
bisgaard-itis Aug 18, 2025
ce4ef7a
remove unused fixture
bisgaard-itis Aug 18, 2025
7e9634a
pylint
bisgaard-itis Aug 18, 2025
884cac2
start moving out url_for
bisgaard-itis Aug 18, 2025
3b84dd9
JobRestInterfaceLinks -> JobInterfaceLinks
bisgaard-itis Aug 18, 2025
b4bc5b2
fix typecheck
bisgaard-itis Aug 19, 2025
c1856da
Merge branch 'master' into 1973-refactor-create-start-inspect-methods
bisgaard-itis Aug 19, 2025
a0acf32
minor cleanup
bisgaard-itis Aug 19, 2025
d9b48ca
add job links when creating job from study
bisgaard-itis Aug 19, 2025
43f38b2
remove unneeded code
bisgaard-itis Aug 19, 2025
166e8cf
make pylint happy
bisgaard-itis Aug 19, 2025
a137e8d
pylint
bisgaard-itis Aug 19, 2025
30d0977
ensure only pydantic models are passed from function run and map endp…
bisgaard-itis Aug 19, 2025
582e156
Merge branch 'master' into 1973-refactor-create-start-inspect-methods
bisgaard-itis Aug 19, 2025
f2ddd31
pylint
bisgaard-itis Aug 19, 2025
1531960
@sanderegg move import above comments
bisgaard-itis Aug 19, 2025
eeaff1c
@sanderegg PositiveInt -> UserID
bisgaard-itis Aug 19, 2025
90e9b2a
Merge branch 'master' into 1973-refactor-create-start-inspect-methods
bisgaard-itis Aug 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,24 +1,68 @@
from dataclasses import dataclass

import jsonschema
from common_library.exclude import as_dict_exclude_none
from models_library.functions import (
FunctionClass,
FunctionID,
FunctionInputs,
FunctionInputsList,
FunctionJobCollection,
FunctionJobCollectionID,
FunctionJobID,
FunctionJobStatus,
FunctionSchemaClass,
ProjectFunctionJob,
RegisteredFunction,
RegisteredFunctionJob,
RegisteredFunctionJobCollection,
SolverFunctionJob,
)
from models_library.functions_errors import (
FunctionExecuteAccessDeniedError,
FunctionInputsValidationError,
FunctionsExecuteApiAccessDeniedError,
UnsupportedFunctionClassError,
UnsupportedFunctionFunctionJobClassCombinationError,
)
from models_library.products import ProductName
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.projects_state import RunningState
from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt
from models_library.rpc_pagination import PageLimitInt
from models_library.users import UserID
from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient
from pydantic import ValidationError

from ._service_jobs import JobService
from .models.api_resources import JobLinks
from .models.schemas.jobs import (
JobInputs,
JobPricingSpecification,
)
from .services_rpc.wb_api_server import WbApiRpcClient


def join_inputs(
default_inputs: FunctionInputs | None,
function_inputs: FunctionInputs | None,
) -> FunctionInputs:
if default_inputs is None:
return function_inputs

if function_inputs is None:
return default_inputs

# last dict will override defaults
return {**default_inputs, **function_inputs}


@dataclass(frozen=True, kw_only=True)
class FunctionJobService:
user_id: UserID
product_name: ProductName
_web_rpc_client: WbApiRpcClient
_job_service: JobService

async def list_function_jobs(
self,
Expand All @@ -43,3 +87,229 @@ async def list_function_jobs(
filter_by_function_job_collection_id=filter_by_function_job_collection_id,
**pagination_kwargs,
)

async def validate_function_inputs(
self, *, function_id: FunctionID, inputs: FunctionInputs
) -> tuple[bool, str]:
function = await self._web_rpc_client.get_function(
function_id=function_id,
user_id=self.user_id,
product_name=self.product_name,
)

if (
function.input_schema is None
or function.input_schema.schema_content is None
):
return True, "No input schema defined for this function"

if function.input_schema.schema_class == FunctionSchemaClass.json_schema:
try:
jsonschema.validate(
instance=inputs, schema=function.input_schema.schema_content
)
except ValidationError as err:
return False, str(err)
return True, "Inputs are valid"

return (
False,
f"Unsupported function schema class {function.input_schema.schema_class}",
)

async def inspect_function_job(
self, function: RegisteredFunction, function_job: RegisteredFunctionJob
) -> FunctionJobStatus:

stored_job_status = await self._web_rpc_client.get_function_job_status(
function_job_id=function_job.uid,
user_id=self.user_id,
product_name=self.product_name,
)

if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED):
return stored_job_status

if (
function.function_class == FunctionClass.PROJECT
and function_job.function_class == FunctionClass.PROJECT
):
job_status = await self._job_service.inspect_study_job(
job_id=function_job.project_job_id,
)
elif (function.function_class == FunctionClass.SOLVER) and (
function_job.function_class == FunctionClass.SOLVER
):
job_status = await self._job_service.inspect_solver_job(
solver_key=function.solver_key,
version=function.solver_version,
job_id=function_job.solver_job_id,
)
else:
raise UnsupportedFunctionFunctionJobClassCombinationError(
function_class=function.function_class,
function_job_class=function_job.function_class,
)

new_job_status = FunctionJobStatus(status=job_status.state)

return await self._web_rpc_client.update_function_job_status(
function_job_id=function_job.uid,
user_id=self.user_id,
product_name=self.product_name,
job_status=new_job_status,
)

async def run_function(
self,
*,
function: RegisteredFunction,
function_inputs: FunctionInputs,
pricing_spec: JobPricingSpecification | None,
job_links: JobLinks,
x_simcore_parent_project_uuid: NodeID | None,
x_simcore_parent_node_id: NodeID | None,
) -> RegisteredFunctionJob:

user_api_access_rights = (
await self._web_rpc_client.get_functions_user_api_access_rights(
user_id=self.user_id, product_name=self.product_name
)
)
if not user_api_access_rights.execute_functions:
raise FunctionsExecuteApiAccessDeniedError(
user_id=self.user_id,
function_id=function.uid,
)

user_permissions = await self._web_rpc_client.get_function_user_permissions(
function_id=function.uid,
user_id=self.user_id,
product_name=self.product_name,
)
if not user_permissions.execute:
raise FunctionExecuteAccessDeniedError(
user_id=self.user_id,
function_id=function.uid,
)

joined_inputs = join_inputs(
function.default_inputs,
function_inputs,
)

if function.input_schema is not None:
is_valid, validation_str = await self.validate_function_inputs(
function_id=function.uid,
inputs=joined_inputs,
)
if not is_valid:
raise FunctionInputsValidationError(error=validation_str)

if cached_function_jobs := await self._web_rpc_client.find_cached_function_jobs(
function_id=function.uid,
inputs=joined_inputs,
user_id=self.user_id,
product_name=self.product_name,
):
for cached_function_job in cached_function_jobs:
job_status = await self.inspect_function_job(
function=function,
function_job=cached_function_job,
)
if job_status.status == RunningState.SUCCESS:
return cached_function_job

if function.function_class == FunctionClass.PROJECT:
study_job = await self._job_service.create_studies_job(
study_id=function.project_id,
job_inputs=JobInputs(values=joined_inputs or {}),
hidden=True,
job_links=job_links,
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
x_simcore_parent_node_id=x_simcore_parent_node_id,
)
await self._job_service.start_study_job(
study_id=function.project_id,
job_id=study_job.id,
pricing_spec=pricing_spec,
)
return await self._web_rpc_client.register_function_job(
function_job=ProjectFunctionJob(
function_uid=function.uid,
title=f"Function job of function {function.uid}",
description=function.description,
inputs=joined_inputs,
outputs=None,
project_job_id=study_job.id,
),
user_id=self.user_id,
product_name=self.product_name,
)

if function.function_class == FunctionClass.SOLVER:
solver_job = await self._job_service.create_solver_job(
solver_key=function.solver_key,
version=function.solver_version,
inputs=JobInputs(values=joined_inputs or {}),
job_links=job_links,
hidden=True,
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
x_simcore_parent_node_id=x_simcore_parent_node_id,
)
await self._job_service.start_solver_job(
solver_key=function.solver_key,
version=function.solver_version,
job_id=solver_job.id,
pricing_spec=pricing_spec,
)
return await self._web_rpc_client.register_function_job(
function_job=SolverFunctionJob(
function_uid=function.uid,
title=f"Function job of function {function.uid}",
description=function.description,
inputs=joined_inputs,
outputs=None,
solver_job_id=solver_job.id,
),
user_id=self.user_id,
product_name=self.product_name,
)

raise UnsupportedFunctionClassError(
function_class=function.function_class,
)

async def map_function(
self,
*,
function: RegisteredFunction,
function_inputs_list: FunctionInputsList,
job_links: JobLinks,
pricing_spec: JobPricingSpecification | None,
x_simcore_parent_project_uuid: ProjectID | None,
x_simcore_parent_node_id: NodeID | None,
) -> RegisteredFunctionJobCollection:

function_jobs = [
await self.run_function(
function=function,
function_inputs=function_inputs,
pricing_spec=pricing_spec,
job_links=job_links,
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
x_simcore_parent_node_id=x_simcore_parent_node_id,
)
for function_inputs in function_inputs_list
]

function_job_collection_description = f"Function job collection of map of function {function.uid} with {len(function_inputs_list)} inputs"
return await self._web_rpc_client.register_function_job_collection(
function_job_collection=FunctionJobCollection(
title="Function job collection of function map",
description=function_job_collection_description,
job_ids=[function_job.uid for function_job in function_jobs],
),
user_id=self.user_id,
product_name=self.product_name,
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# pylint: disable=no-self-use

from collections.abc import Callable
from dataclasses import dataclass

from common_library.exclude import as_dict_exclude_none
from models_library.functions import RegisteredFunction
from models_library.functions import FunctionClass, RegisteredFunction
from models_library.functions_errors import UnsupportedFunctionClassError
from models_library.products import ProductName
from models_library.rest_pagination import (
MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE,
Expand All @@ -10,7 +14,15 @@
)
from models_library.rpc_pagination import PageLimitInt
from models_library.users import UserID
from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient

from .models.api_resources import JobLinks
from .services_http.solver_job_models_converters import (
get_solver_job_rest_interface_links,
)
from .services_http.study_job_models_converters import (
get_study_job_rest_interface_links,
)
from .services_rpc.wb_api_server import WbApiRpcClient

DEFAULT_PAGINATION_LIMIT = MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE - 1

Expand Down Expand Up @@ -38,3 +50,21 @@ async def list_functions(
product_name=self.product_name,
**pagination_kwargs,
)

async def get_function_job_links(
self, function: RegisteredFunction, url_for: Callable
) -> JobLinks:
if function.function_class == FunctionClass.SOLVER:
return get_solver_job_rest_interface_links(
url_for=url_for,
solver_key=function.solver_key,
version=function.solver_version,
)
if function.function_class == FunctionClass.PROJECT:
return get_study_job_rest_interface_links(
url_for=url_for,
study_id=function.project_id,
)
raise UnsupportedFunctionClassError(
function_class=function.function_class,
)
Loading
Loading