Skip to content

Commit 1437b91

Browse files
committed
further refactor
1 parent 575b130 commit 1437b91

File tree

3 files changed

+10
-320
lines changed

3 files changed

+10
-320
lines changed

services/api-server/src/simcore_service_api_server/_service_function_jobs.py

Lines changed: 0 additions & 221 deletions
Original file line numberDiff line numberDiff line change
@@ -9,31 +9,24 @@
99
FunctionInputs,
1010
FunctionJobCollectionID,
1111
FunctionJobID,
12-
FunctionJobStatus,
13-
FunctionOutputs,
1412
FunctionSchemaClass,
1513
ProjectFunctionJob,
1614
RegisteredFunction,
1715
RegisteredFunctionJob,
1816
RegisteredFunctionJobPatch,
19-
RegisteredFunctionJobWithStatus,
2017
RegisteredProjectFunctionJobPatch,
2118
RegisteredSolverFunctionJobPatch,
2219
SolverFunctionJob,
2320
SolverJobID,
2421
TaskID,
2522
)
2623
from models_library.functions_errors import (
27-
FunctionExecuteAccessDeniedError,
2824
FunctionInputsValidationError,
29-
FunctionsExecuteApiAccessDeniedError,
3025
UnsupportedFunctionClassError,
31-
UnsupportedFunctionFunctionJobClassCombinationError,
3226
)
3327
from models_library.products import ProductName
3428
from models_library.projects import ProjectID
3529
from models_library.projects_nodes_io import NodeID
36-
from models_library.projects_state import RunningState
3730
from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt
3831
from models_library.rpc_pagination import PageLimitInt
3932
from models_library.users import UserID
@@ -42,13 +35,9 @@
4235
from servicelib.celery.task_manager import TaskManager
4336
from simcore_service_api_server._service_functions import FunctionService
4437
from simcore_service_api_server.services_rpc.storage import StorageService
45-
from sqlalchemy.ext.asyncio import AsyncEngine
4638

4739
from ._service_jobs import JobService
4840
from .api.routes.tasks import _get_task_filter
49-
from .exceptions.function_errors import (
50-
FunctionJobCacheNotFoundError,
51-
)
5241
from .models.api_resources import JobLinks
5342
from .models.domain.functions import PreRegisteredFunctionJobData
5443
from .models.schemas.jobs import JobInputs, JobPricingSpecification
@@ -124,33 +113,6 @@ async def list_function_jobs(
124113
**pagination_kwargs,
125114
)
126115

127-
async def list_function_jobs_with_status(
128-
self,
129-
*,
130-
filter_by_function_id: FunctionID | None = None,
131-
filter_by_function_job_ids: list[FunctionJobID] | None = None,
132-
filter_by_function_job_collection_id: FunctionJobCollectionID | None = None,
133-
pagination_offset: PageOffsetInt | None = None,
134-
pagination_limit: PageLimitInt | None = None,
135-
) -> tuple[
136-
list[RegisteredFunctionJobWithStatus],
137-
PageMetaInfoLimitOffset,
138-
]:
139-
"""Lists all function jobs for a user with pagination"""
140-
141-
pagination_kwargs = as_dict_exclude_none(
142-
pagination_offset=pagination_offset, pagination_limit=pagination_limit
143-
)
144-
145-
return await self._web_rpc_client.list_function_jobs_with_status(
146-
user_id=self.user_id,
147-
product_name=self.product_name,
148-
filter_by_function_id=filter_by_function_id,
149-
filter_by_function_job_ids=filter_by_function_job_ids,
150-
filter_by_function_job_collection_id=filter_by_function_job_collection_id,
151-
**pagination_kwargs,
152-
)
153-
154116
async def validate_function_inputs(
155117
self, *, function_id: FunctionID, inputs: FunctionInputs
156118
) -> tuple[bool, str]:
@@ -180,72 +142,6 @@ async def validate_function_inputs(
180142
f"Unsupported function schema class {function.input_schema.schema_class}",
181143
)
182144

183-
async def inspect_function_job(
184-
self,
185-
function: RegisteredFunction,
186-
function_job: RegisteredFunctionJob,
187-
task_manager: TaskManager,
188-
) -> FunctionJobStatus:
189-
"""Raises FunctionJobProjectNotRegisteredError if no project is associated with job"""
190-
stored_job_status = await self._web_rpc_client.get_function_job_status(
191-
function_job_id=function_job.uid,
192-
user_id=self.user_id,
193-
product_name=self.product_name,
194-
)
195-
196-
if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED):
197-
return stored_job_status
198-
199-
status: str
200-
if (
201-
function.function_class == FunctionClass.PROJECT
202-
and function_job.function_class == FunctionClass.PROJECT
203-
):
204-
if function_job.project_job_id is None:
205-
status = await _celery_task_status(
206-
job_creation_task_id=function_job.job_creation_task_id,
207-
task_manager=task_manager,
208-
user_id=self.user_id,
209-
product_name=self.product_name,
210-
)
211-
else:
212-
job_status = await self._job_service.inspect_study_job(
213-
job_id=function_job.project_job_id,
214-
)
215-
status = job_status.state
216-
elif (function.function_class == FunctionClass.SOLVER) and (
217-
function_job.function_class == FunctionClass.SOLVER
218-
):
219-
if function_job.solver_job_id is None:
220-
status = await _celery_task_status(
221-
job_creation_task_id=function_job.job_creation_task_id,
222-
task_manager=task_manager,
223-
user_id=self.user_id,
224-
product_name=self.product_name,
225-
)
226-
else:
227-
job_status = await self._job_service.inspect_solver_job(
228-
solver_key=function.solver_key,
229-
version=function.solver_version,
230-
job_id=function_job.solver_job_id,
231-
)
232-
status = job_status.state
233-
else:
234-
raise UnsupportedFunctionFunctionJobClassCombinationError(
235-
function_class=function.function_class,
236-
function_job_class=function_job.function_class,
237-
)
238-
239-
new_job_status = FunctionJobStatus(status=status)
240-
241-
return await self._web_rpc_client.update_function_job_status(
242-
function_job_id=function_job.uid,
243-
user_id=self.user_id,
244-
product_name=self.product_name,
245-
job_status=new_job_status,
246-
check_write_permissions=False,
247-
)
248-
249145
async def create_function_job_inputs( # pylint: disable=no-self-use
250146
self,
251147
*,
@@ -260,60 +156,6 @@ async def create_function_job_inputs( # pylint: disable=no-self-use
260156
values=joined_inputs or {},
261157
)
262158

263-
async def get_cached_function_job(
264-
self,
265-
*,
266-
function: RegisteredFunction,
267-
job_inputs: JobInputs,
268-
task_manager: TaskManager,
269-
) -> RegisteredFunctionJob:
270-
"""
271-
N.B. this function checks access rights
272-
273-
raises FunctionsExecuteApiAccessDeniedError if user cannot execute functions
274-
raises FunctionJobCacheNotFoundError if no cached job is found
275-
276-
"""
277-
278-
user_api_access_rights = (
279-
await self._web_rpc_client.get_functions_user_api_access_rights(
280-
user_id=self.user_id, product_name=self.product_name
281-
)
282-
)
283-
if not user_api_access_rights.execute_functions:
284-
raise FunctionsExecuteApiAccessDeniedError(
285-
user_id=self.user_id,
286-
function_id=function.uid,
287-
)
288-
289-
user_permissions = await self._web_rpc_client.get_function_user_permissions(
290-
function_id=function.uid,
291-
user_id=self.user_id,
292-
product_name=self.product_name,
293-
)
294-
if not user_permissions.execute:
295-
raise FunctionExecuteAccessDeniedError(
296-
user_id=self.user_id,
297-
function_id=function.uid,
298-
)
299-
300-
if cached_function_jobs := await self._web_rpc_client.find_cached_function_jobs(
301-
function_id=function.uid,
302-
inputs=job_inputs.values,
303-
user_id=self.user_id,
304-
product_name=self.product_name,
305-
):
306-
for cached_function_job in cached_function_jobs:
307-
job_status = await self.inspect_function_job(
308-
function=function,
309-
function_job=cached_function_job,
310-
task_manager=task_manager,
311-
)
312-
if job_status.status == RunningState.SUCCESS:
313-
return cached_function_job
314-
315-
raise FunctionJobCacheNotFoundError
316-
317159
async def pre_register_function_job(
318160
self,
319161
*,
@@ -508,66 +350,3 @@ async def run_function(
508350
raise UnsupportedFunctionClassError(
509351
function_class=function.function_class,
510352
)
511-
512-
async def function_job_outputs(
513-
self,
514-
*,
515-
function: RegisteredFunction,
516-
function_job: RegisteredFunctionJob,
517-
user_id: UserID,
518-
product_name: ProductName,
519-
stored_job_outputs: FunctionOutputs | None,
520-
async_pg_engine: AsyncEngine,
521-
task_manager: TaskManager,
522-
) -> FunctionOutputs:
523-
524-
if stored_job_outputs is not None:
525-
return stored_job_outputs
526-
527-
job_status = await self.inspect_function_job(
528-
function=function, function_job=function_job, task_manager=task_manager
529-
)
530-
531-
if job_status.status != RunningState.SUCCESS:
532-
return None
533-
534-
if (
535-
function.function_class == FunctionClass.PROJECT
536-
and function_job.function_class == FunctionClass.PROJECT
537-
):
538-
if function_job.project_job_id is None:
539-
return None
540-
new_outputs = dict(
541-
(
542-
await self._job_service.get_study_job_outputs(
543-
study_id=function.project_id,
544-
job_id=function_job.project_job_id,
545-
)
546-
).results
547-
)
548-
elif (
549-
function.function_class == FunctionClass.SOLVER
550-
and function_job.function_class == FunctionClass.SOLVER
551-
):
552-
if function_job.solver_job_id is None:
553-
return None
554-
new_outputs = dict(
555-
(
556-
await self._job_service.get_solver_job_outputs(
557-
solver_key=function.solver_key,
558-
version=function.solver_version,
559-
job_id=function_job.solver_job_id,
560-
async_pg_engine=async_pg_engine,
561-
)
562-
).results
563-
)
564-
else:
565-
raise UnsupportedFunctionClassError(function_class=function.function_class)
566-
567-
return await self._web_rpc_client.update_function_job_outputs(
568-
function_job_id=function_job.uid,
569-
user_id=user_id,
570-
product_name=product_name,
571-
outputs=new_outputs,
572-
check_write_permissions=False,
573-
)

services/api-server/src/simcore_service_api_server/_service_function_jobs_task_client.py

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,13 @@
1010
FunctionJobID,
1111
FunctionJobStatus,
1212
FunctionOutputs,
13-
ProjectFunctionJob,
1413
RegisteredFunction,
1514
RegisteredFunctionJob,
1615
RegisteredFunctionJobWithStatus,
17-
SolverFunctionJob,
1816
TaskID,
1917
)
2018
from models_library.functions_errors import (
2119
FunctionExecuteAccessDeniedError,
22-
FunctionInputsValidationError,
2320
FunctionsExecuteApiAccessDeniedError,
2421
UnsupportedFunctionClassError,
2522
UnsupportedFunctionFunctionJobClassCombinationError,
@@ -40,7 +37,6 @@
4037
from .exceptions.function_errors import (
4138
FunctionJobCacheNotFoundError,
4239
)
43-
from .models.domain.functions import PreRegisteredFunctionJobData
4440
from .models.schemas.jobs import JobInputs
4541
from .services_http.webserver import AuthSession
4642
from .services_rpc.storage import StorageService
@@ -264,62 +260,6 @@ async def get_cached_function_job(
264260

265261
raise FunctionJobCacheNotFoundError
266262

267-
async def pre_register_function_job(
268-
self,
269-
*,
270-
function: RegisteredFunction,
271-
job_inputs: JobInputs,
272-
) -> PreRegisteredFunctionJobData:
273-
274-
if function.input_schema is not None:
275-
is_valid, validation_str = (
276-
await self._function_job_service.validate_function_inputs(
277-
function_id=function.uid,
278-
inputs=job_inputs.values,
279-
)
280-
)
281-
if not is_valid:
282-
raise FunctionInputsValidationError(error=validation_str)
283-
284-
if function.function_class == FunctionClass.PROJECT:
285-
job = await self._web_rpc_client.register_function_job(
286-
function_job=ProjectFunctionJob(
287-
function_uid=function.uid,
288-
title=f"Function job of function {function.uid}",
289-
description=function.description,
290-
inputs=job_inputs.values,
291-
outputs=None,
292-
project_job_id=None,
293-
job_creation_task_id=None,
294-
),
295-
user_id=self.user_id,
296-
product_name=self.product_name,
297-
)
298-
299-
elif function.function_class == FunctionClass.SOLVER:
300-
job = await self._web_rpc_client.register_function_job(
301-
function_job=SolverFunctionJob(
302-
function_uid=function.uid,
303-
title=f"Function job of function {function.uid}",
304-
description=function.description,
305-
inputs=job_inputs.values,
306-
outputs=None,
307-
solver_job_id=None,
308-
job_creation_task_id=None,
309-
),
310-
user_id=self.user_id,
311-
product_name=self.product_name,
312-
)
313-
else:
314-
raise UnsupportedFunctionClassError(
315-
function_class=function.function_class,
316-
)
317-
318-
return PreRegisteredFunctionJobData(
319-
function_job_id=job.uid,
320-
job_inputs=job_inputs,
321-
)
322-
323263
async def function_job_outputs(
324264
self,
325265
*,

0 commit comments

Comments
 (0)