Skip to content

Commit c252a4a

Browse files
Merge branch 'master' into is1905/enhance-functions-listing-with-ordering-and-filtering
2 parents cbe405d + 0c24036 commit c252a4a

29 files changed

+990
-785
lines changed

services/api-server/src/simcore_service_api_server/_service_function_jobs.py

Lines changed: 271 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,68 @@
11
from dataclasses import dataclass
22

3+
import jsonschema
34
from common_library.exclude import as_dict_exclude_none
45
from models_library.functions import (
6+
FunctionClass,
57
FunctionID,
8+
FunctionInputs,
9+
FunctionInputsList,
10+
FunctionJobCollection,
611
FunctionJobCollectionID,
712
FunctionJobID,
13+
FunctionJobStatus,
14+
FunctionSchemaClass,
15+
ProjectFunctionJob,
16+
RegisteredFunction,
817
RegisteredFunctionJob,
18+
RegisteredFunctionJobCollection,
19+
SolverFunctionJob,
20+
)
21+
from models_library.functions_errors import (
22+
FunctionExecuteAccessDeniedError,
23+
FunctionInputsValidationError,
24+
FunctionsExecuteApiAccessDeniedError,
25+
UnsupportedFunctionClassError,
26+
UnsupportedFunctionFunctionJobClassCombinationError,
927
)
1028
from models_library.products import ProductName
29+
from models_library.projects import ProjectID
30+
from models_library.projects_nodes_io import NodeID
31+
from models_library.projects_state import RunningState
1132
from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt
1233
from models_library.rpc_pagination import PageLimitInt
1334
from models_library.users import UserID
14-
from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient
35+
from pydantic import ValidationError
36+
37+
from ._service_jobs import JobService
38+
from .models.api_resources import JobLinks
39+
from .models.schemas.jobs import (
40+
JobInputs,
41+
JobPricingSpecification,
42+
)
43+
from .services_rpc.wb_api_server import WbApiRpcClient
44+
45+
46+
def join_inputs(
47+
default_inputs: FunctionInputs | None,
48+
function_inputs: FunctionInputs | None,
49+
) -> FunctionInputs:
50+
if default_inputs is None:
51+
return function_inputs
52+
53+
if function_inputs is None:
54+
return default_inputs
55+
56+
# last dict will override defaults
57+
return {**default_inputs, **function_inputs}
1558

1659

1760
@dataclass(frozen=True, kw_only=True)
1861
class FunctionJobService:
1962
user_id: UserID
2063
product_name: ProductName
2164
_web_rpc_client: WbApiRpcClient
65+
_job_service: JobService
2266

2367
async def list_function_jobs(
2468
self,
@@ -43,3 +87,229 @@ async def list_function_jobs(
4387
filter_by_function_job_collection_id=filter_by_function_job_collection_id,
4488
**pagination_kwargs,
4589
)
90+
91+
async def validate_function_inputs(
92+
self, *, function_id: FunctionID, inputs: FunctionInputs
93+
) -> tuple[bool, str]:
94+
function = await self._web_rpc_client.get_function(
95+
function_id=function_id,
96+
user_id=self.user_id,
97+
product_name=self.product_name,
98+
)
99+
100+
if (
101+
function.input_schema is None
102+
or function.input_schema.schema_content is None
103+
):
104+
return True, "No input schema defined for this function"
105+
106+
if function.input_schema.schema_class == FunctionSchemaClass.json_schema:
107+
try:
108+
jsonschema.validate(
109+
instance=inputs, schema=function.input_schema.schema_content
110+
)
111+
except ValidationError as err:
112+
return False, str(err)
113+
return True, "Inputs are valid"
114+
115+
return (
116+
False,
117+
f"Unsupported function schema class {function.input_schema.schema_class}",
118+
)
119+
120+
async def inspect_function_job(
121+
self, function: RegisteredFunction, function_job: RegisteredFunctionJob
122+
) -> FunctionJobStatus:
123+
124+
stored_job_status = await self._web_rpc_client.get_function_job_status(
125+
function_job_id=function_job.uid,
126+
user_id=self.user_id,
127+
product_name=self.product_name,
128+
)
129+
130+
if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED):
131+
return stored_job_status
132+
133+
if (
134+
function.function_class == FunctionClass.PROJECT
135+
and function_job.function_class == FunctionClass.PROJECT
136+
):
137+
job_status = await self._job_service.inspect_study_job(
138+
job_id=function_job.project_job_id,
139+
)
140+
elif (function.function_class == FunctionClass.SOLVER) and (
141+
function_job.function_class == FunctionClass.SOLVER
142+
):
143+
job_status = await self._job_service.inspect_solver_job(
144+
solver_key=function.solver_key,
145+
version=function.solver_version,
146+
job_id=function_job.solver_job_id,
147+
)
148+
else:
149+
raise UnsupportedFunctionFunctionJobClassCombinationError(
150+
function_class=function.function_class,
151+
function_job_class=function_job.function_class,
152+
)
153+
154+
new_job_status = FunctionJobStatus(status=job_status.state)
155+
156+
return await self._web_rpc_client.update_function_job_status(
157+
function_job_id=function_job.uid,
158+
user_id=self.user_id,
159+
product_name=self.product_name,
160+
job_status=new_job_status,
161+
)
162+
163+
async def run_function(
164+
self,
165+
*,
166+
function: RegisteredFunction,
167+
function_inputs: FunctionInputs,
168+
pricing_spec: JobPricingSpecification | None,
169+
job_links: JobLinks,
170+
x_simcore_parent_project_uuid: NodeID | None,
171+
x_simcore_parent_node_id: NodeID | None,
172+
) -> RegisteredFunctionJob:
173+
174+
user_api_access_rights = (
175+
await self._web_rpc_client.get_functions_user_api_access_rights(
176+
user_id=self.user_id, product_name=self.product_name
177+
)
178+
)
179+
if not user_api_access_rights.execute_functions:
180+
raise FunctionsExecuteApiAccessDeniedError(
181+
user_id=self.user_id,
182+
function_id=function.uid,
183+
)
184+
185+
user_permissions = await self._web_rpc_client.get_function_user_permissions(
186+
function_id=function.uid,
187+
user_id=self.user_id,
188+
product_name=self.product_name,
189+
)
190+
if not user_permissions.execute:
191+
raise FunctionExecuteAccessDeniedError(
192+
user_id=self.user_id,
193+
function_id=function.uid,
194+
)
195+
196+
joined_inputs = join_inputs(
197+
function.default_inputs,
198+
function_inputs,
199+
)
200+
201+
if function.input_schema is not None:
202+
is_valid, validation_str = await self.validate_function_inputs(
203+
function_id=function.uid,
204+
inputs=joined_inputs,
205+
)
206+
if not is_valid:
207+
raise FunctionInputsValidationError(error=validation_str)
208+
209+
if cached_function_jobs := await self._web_rpc_client.find_cached_function_jobs(
210+
function_id=function.uid,
211+
inputs=joined_inputs,
212+
user_id=self.user_id,
213+
product_name=self.product_name,
214+
):
215+
for cached_function_job in cached_function_jobs:
216+
job_status = await self.inspect_function_job(
217+
function=function,
218+
function_job=cached_function_job,
219+
)
220+
if job_status.status == RunningState.SUCCESS:
221+
return cached_function_job
222+
223+
if function.function_class == FunctionClass.PROJECT:
224+
study_job = await self._job_service.create_studies_job(
225+
study_id=function.project_id,
226+
job_inputs=JobInputs(values=joined_inputs or {}),
227+
hidden=True,
228+
job_links=job_links,
229+
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
230+
x_simcore_parent_node_id=x_simcore_parent_node_id,
231+
)
232+
await self._job_service.start_study_job(
233+
study_id=function.project_id,
234+
job_id=study_job.id,
235+
pricing_spec=pricing_spec,
236+
)
237+
return await self._web_rpc_client.register_function_job(
238+
function_job=ProjectFunctionJob(
239+
function_uid=function.uid,
240+
title=f"Function job of function {function.uid}",
241+
description=function.description,
242+
inputs=joined_inputs,
243+
outputs=None,
244+
project_job_id=study_job.id,
245+
),
246+
user_id=self.user_id,
247+
product_name=self.product_name,
248+
)
249+
250+
if function.function_class == FunctionClass.SOLVER:
251+
solver_job = await self._job_service.create_solver_job(
252+
solver_key=function.solver_key,
253+
version=function.solver_version,
254+
inputs=JobInputs(values=joined_inputs or {}),
255+
job_links=job_links,
256+
hidden=True,
257+
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
258+
x_simcore_parent_node_id=x_simcore_parent_node_id,
259+
)
260+
await self._job_service.start_solver_job(
261+
solver_key=function.solver_key,
262+
version=function.solver_version,
263+
job_id=solver_job.id,
264+
pricing_spec=pricing_spec,
265+
)
266+
return await self._web_rpc_client.register_function_job(
267+
function_job=SolverFunctionJob(
268+
function_uid=function.uid,
269+
title=f"Function job of function {function.uid}",
270+
description=function.description,
271+
inputs=joined_inputs,
272+
outputs=None,
273+
solver_job_id=solver_job.id,
274+
),
275+
user_id=self.user_id,
276+
product_name=self.product_name,
277+
)
278+
279+
raise UnsupportedFunctionClassError(
280+
function_class=function.function_class,
281+
)
282+
283+
async def map_function(
284+
self,
285+
*,
286+
function: RegisteredFunction,
287+
function_inputs_list: FunctionInputsList,
288+
job_links: JobLinks,
289+
pricing_spec: JobPricingSpecification | None,
290+
x_simcore_parent_project_uuid: ProjectID | None,
291+
x_simcore_parent_node_id: NodeID | None,
292+
) -> RegisteredFunctionJobCollection:
293+
294+
function_jobs = [
295+
await self.run_function(
296+
function=function,
297+
function_inputs=function_inputs,
298+
pricing_spec=pricing_spec,
299+
job_links=job_links,
300+
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
301+
x_simcore_parent_node_id=x_simcore_parent_node_id,
302+
)
303+
for function_inputs in function_inputs_list
304+
]
305+
306+
function_job_collection_description = f"Function job collection of map of function {function.uid} with {len(function_inputs_list)} inputs"
307+
return await self._web_rpc_client.register_function_job_collection(
308+
function_job_collection=FunctionJobCollection(
309+
title="Function job collection of function map",
310+
description=function_job_collection_description,
311+
job_ids=[function_job.uid for function_job in function_jobs],
312+
),
313+
user_id=self.user_id,
314+
product_name=self.product_name,
315+
)

services/api-server/src/simcore_service_api_server/_service_functions.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1+
# pylint: disable=no-self-use
2+
3+
from collections.abc import Callable
14
from dataclasses import dataclass
25

36
from common_library.exclude import as_dict_exclude_none
4-
from models_library.functions import RegisteredFunction
7+
from models_library.functions import FunctionClass, RegisteredFunction
8+
from models_library.functions_errors import UnsupportedFunctionClassError
59
from models_library.products import ProductName
610
from models_library.rest_pagination import (
711
MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE,
@@ -10,7 +14,15 @@
1014
)
1115
from models_library.rpc_pagination import PageLimitInt
1216
from models_library.users import UserID
13-
from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient
17+
18+
from .models.api_resources import JobLinks
19+
from .services_http.solver_job_models_converters import (
20+
get_solver_job_rest_interface_links,
21+
)
22+
from .services_http.study_job_models_converters import (
23+
get_study_job_rest_interface_links,
24+
)
25+
from .services_rpc.wb_api_server import WbApiRpcClient
1426

1527
DEFAULT_PAGINATION_LIMIT = MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE - 1
1628

@@ -38,3 +50,21 @@ async def list_functions(
3850
product_name=self.product_name,
3951
**pagination_kwargs,
4052
)
53+
54+
async def get_function_job_links(
55+
self, function: RegisteredFunction, url_for: Callable
56+
) -> JobLinks:
57+
if function.function_class == FunctionClass.SOLVER:
58+
return get_solver_job_rest_interface_links(
59+
url_for=url_for,
60+
solver_key=function.solver_key,
61+
version=function.solver_version,
62+
)
63+
if function.function_class == FunctionClass.PROJECT:
64+
return get_study_job_rest_interface_links(
65+
url_for=url_for,
66+
study_id=function.project_id,
67+
)
68+
raise UnsupportedFunctionClassError(
69+
function_class=function.function_class,
70+
)

0 commit comments

Comments
 (0)