Skip to content

Commit 8e7c9f0

Browse files
committed
move inspect function job to function_job_service
1 parent 50aade2 commit 8e7c9f0

File tree

8 files changed

+113
-99
lines changed

8 files changed

+113
-99
lines changed

services/api-server/src/simcore_service_api_server/_service_function_jobs.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,33 @@
22

33
from common_library.exclude import as_dict_exclude_none
44
from models_library.functions import (
5+
FunctionClass,
56
FunctionID,
67
FunctionJobCollectionID,
78
FunctionJobID,
9+
FunctionJobStatus,
10+
RegisteredFunction,
811
RegisteredFunctionJob,
912
)
13+
from models_library.functions_errors import (
14+
UnsupportedFunctionFunctionJobClassCombinationError,
15+
)
1016
from models_library.products import ProductName
17+
from models_library.projects_state import RunningState
1118
from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt
1219
from models_library.rpc_pagination import PageLimitInt
1320
from models_library.users import UserID
14-
from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient
21+
22+
from ._service_jobs import JobService
23+
from .services_rpc.wb_api_server import WbApiRpcClient
1524

1625

1726
@dataclass(frozen=True, kw_only=True)
1827
class FunctionJobService:
1928
user_id: UserID
2029
product_name: ProductName
2130
_web_rpc_client: WbApiRpcClient
31+
_job_service: JobService
2232

2333
async def list_function_jobs(
2434
self,
@@ -43,3 +53,46 @@ async def list_function_jobs(
4353
filter_by_function_job_collection_id=filter_by_function_job_collection_id,
4454
**pagination_kwargs,
4555
)
56+
57+
async def inspect_function_job(
58+
self, function: RegisteredFunction, function_job: RegisteredFunctionJob
59+
) -> FunctionJobStatus:
60+
61+
stored_job_status = await self._web_rpc_client.get_function_job_status(
62+
function_job_id=function_job.uid,
63+
user_id=self.user_id,
64+
product_name=self.product_name,
65+
)
66+
67+
if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED):
68+
return stored_job_status
69+
70+
if (
71+
function.function_class == FunctionClass.PROJECT
72+
and function_job.function_class == FunctionClass.PROJECT
73+
):
74+
job_status = await self._job_service.inspect_study_job(
75+
job_id=function_job.project_job_id,
76+
)
77+
elif (function.function_class == FunctionClass.SOLVER) and (
78+
function_job.function_class == FunctionClass.SOLVER
79+
):
80+
job_status = await self._job_service.inspect_solver_job(
81+
solver_key=function.solver_key,
82+
version=function.solver_version,
83+
job_id=function_job.solver_job_id,
84+
)
85+
else:
86+
raise UnsupportedFunctionFunctionJobClassCombinationError(
87+
function_class=function.function_class,
88+
function_job_class=function_job.function_class,
89+
)
90+
91+
new_job_status = FunctionJobStatus(status=job_status.state)
92+
93+
return await self._web_rpc_client.update_function_job_status(
94+
function_job_id=function_job.uid,
95+
user_id=self.user_id,
96+
product_name=self.product_name,
97+
job_status=new_job_status,
98+
)

services/api-server/src/simcore_service_api_server/api/dependencies/services.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,13 @@ def get_function_service(
150150

151151
def get_function_job_service(
152152
web_rpc_api: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
153+
job_service: Annotated[JobService, Depends(get_job_service)],
153154
user_id: Annotated[UserID, Depends(get_current_user_id)],
154155
product_name: Annotated[ProductName, Depends(get_product_name)],
155156
) -> FunctionJobService:
156157
return FunctionJobService(
157158
_web_rpc_client=web_rpc_api,
159+
_job_service=job_service,
158160
user_id=user_id,
159161
product_name=product_name,
160162
)

services/api-server/src/simcore_service_api_server/api/routes/function_job_collections_routes.py

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,26 @@
1616
from servicelib.utils import limited_gather
1717
from simcore_service_api_server._service_function_jobs import FunctionJobService
1818

19-
from ..._service_jobs import JobService
2019
from ...models.pagination import Page, PaginationParams
2120
from ...models.schemas.errors import ErrorGet
22-
from ...services_http.director_v2 import DirectorV2Api
2321
from ...services_rpc.wb_api_server import WbApiRpcClient
2422
from ..dependencies.authentication import get_current_user_id, get_product_name
2523
from ..dependencies.functions import (
2624
get_function_from_functionjobid,
27-
get_stored_job_status,
2825
)
2926
from ..dependencies.models_schemas_function_filters import (
3027
get_function_job_collections_filters,
3128
)
3229
from ..dependencies.services import (
33-
get_api_client,
3430
get_function_job_service,
35-
get_job_service,
3631
)
3732
from ..dependencies.webserver_rpc import get_wb_api_rpc_client
3833
from ._constants import (
3934
FMSG_CHANGELOG_ADDED_IN_VERSION,
4035
FMSG_CHANGELOG_NEW_IN_VERSION,
4136
create_route_description,
4237
)
43-
from .function_jobs_routes import function_job_status, get_function_job
38+
from .function_jobs_routes import get_function_job
4439

4540
# pylint: disable=too-many-arguments
4641

@@ -261,10 +256,11 @@ async def function_job_collection_list_function_jobs_list(
261256
async def function_job_collection_status(
262257
function_job_collection_id: FunctionJobCollectionID,
263258
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
264-
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
265259
user_id: Annotated[UserID, Depends(get_current_user_id)], # Updated type
266260
product_name: Annotated[ProductName, Depends(get_product_name)],
267-
job_service: Annotated[JobService, Depends(get_job_service)],
261+
function_job_service: Annotated[
262+
FunctionJobService, Depends(get_function_job_service)
263+
],
268264
) -> FunctionJobCollectionStatus:
269265
function_job_collection = await get_function_job_collection(
270266
function_job_collection_id=function_job_collection_id,
@@ -275,7 +271,7 @@ async def function_job_collection_status(
275271

276272
job_statuses = await limited_gather(
277273
*[
278-
function_job_status(
274+
function_job_service.inspect_function_job(
279275
function_job=await get_function_job(
280276
function_job_id=function_job_id,
281277
wb_api_rpc=wb_api_rpc,
@@ -288,17 +284,6 @@ async def function_job_collection_status(
288284
user_id=user_id,
289285
product_name=product_name,
290286
),
291-
stored_job_status=await get_stored_job_status(
292-
function_job_id=function_job_id,
293-
wb_api_rpc=wb_api_rpc,
294-
user_id=user_id,
295-
product_name=product_name,
296-
),
297-
wb_api_rpc=wb_api_rpc,
298-
director2_api=director2_api,
299-
user_id=user_id,
300-
product_name=product_name,
301-
job_service=job_service,
302287
)
303288
for function_job_id in function_job_collection.job_ids
304289
]

services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py

Lines changed: 6 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
from models_library.functions import RegisteredFunction
1616
from models_library.functions_errors import (
1717
UnsupportedFunctionClassError,
18-
UnsupportedFunctionFunctionJobClassCombinationError,
1918
)
2019
from models_library.products import ProductName
21-
from models_library.projects_state import RunningState
2220
from models_library.users import UserID
2321
from servicelib.fastapi.dependencies import get_app
2422
from simcore_service_api_server.models.schemas.functions_filters import (
@@ -30,7 +28,6 @@
3028
from ..._service_jobs import JobService
3129
from ...models.pagination import Page, PaginationParams
3230
from ...models.schemas.errors import ErrorGet
33-
from ...services_http.director_v2 import DirectorV2Api
3431
from ...services_http.storage import StorageApi
3532
from ...services_http.webserver import AuthSession
3633
from ...services_rpc.wb_api_server import WbApiRpcClient
@@ -40,7 +37,6 @@
4037
get_function_from_functionjob,
4138
get_function_job_dependency,
4239
get_stored_job_outputs,
43-
get_stored_job_status,
4440
)
4541
from ..dependencies.models_schemas_function_filters import get_function_jobs_filters
4642
from ..dependencies.services import (
@@ -50,7 +46,7 @@
5046
)
5147
from ..dependencies.webserver_http import get_webserver_session
5248
from ..dependencies.webserver_rpc import get_wb_api_rpc_client
53-
from . import solvers_jobs, solvers_jobs_read, studies_jobs
49+
from . import solvers_jobs_read, studies_jobs
5450
from ._constants import (
5551
FMSG_CHANGELOG_ADDED_IN_VERSION,
5652
FMSG_CHANGELOG_NEW_IN_VERSION,
@@ -204,48 +200,13 @@ async def function_job_status(
204200
RegisteredFunctionJob, Depends(get_function_job_dependency)
205201
],
206202
function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)],
207-
stored_job_status: Annotated[FunctionJobStatus, Depends(get_stored_job_status)],
208-
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
209-
job_service: Annotated[JobService, Depends(get_job_service)],
210-
user_id: Annotated[UserID, Depends(get_current_user_id)],
211-
product_name: Annotated[ProductName, Depends(get_product_name)],
212-
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
203+
function_job_service: Annotated[
204+
FunctionJobService, Depends(get_function_job_service)
205+
],
213206
) -> FunctionJobStatus:
214207

215-
if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED):
216-
return stored_job_status
217-
218-
if (
219-
function.function_class == FunctionClass.PROJECT
220-
and function_job.function_class == FunctionClass.PROJECT
221-
):
222-
job_status = await studies_jobs.inspect_study_job(
223-
study_id=function.project_id,
224-
job_id=function_job.project_job_id,
225-
job_service=job_service,
226-
)
227-
elif (function.function_class == FunctionClass.SOLVER) and (
228-
function_job.function_class == FunctionClass.SOLVER
229-
):
230-
job_status = await solvers_jobs.inspect_job(
231-
solver_key=function.solver_key,
232-
version=function.solver_version,
233-
job_id=function_job.solver_job_id,
234-
job_service=job_service,
235-
)
236-
else:
237-
raise UnsupportedFunctionFunctionJobClassCombinationError(
238-
function_class=function.function_class,
239-
function_job_class=function_job.function_class,
240-
)
241-
242-
new_job_status = FunctionJobStatus(status=job_status.state)
243-
244-
return await wb_api_rpc.update_function_job_status(
245-
function_job_id=function_job.uid,
246-
user_id=user_id,
247-
product_name=product_name,
248-
job_status=new_job_status,
208+
return await function_job_service.inspect_function_job(
209+
function=function, function_job=function_job
249210
)
250211

251212

services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,16 @@
4040
from ..._service_function_jobs import FunctionJobService
4141
from ..._service_functions import FunctionService
4242
from ..._service_jobs import JobService
43-
from ..._service_solvers import SolverService
4443
from ...models.pagination import Page, PaginationParams
4544
from ...models.schemas.errors import ErrorGet
4645
from ...models.schemas.jobs import JobInputs
47-
from ...services_http.director_v2 import DirectorV2Api
48-
from ...services_http.webserver import AuthSession
4946
from ...services_rpc.wb_api_server import WbApiRpcClient
5047
from ..dependencies.authentication import get_current_user_id, get_product_name
51-
from ..dependencies.functions import get_stored_job_status
5248
from ..dependencies.services import (
53-
get_api_client,
5449
get_function_job_service,
5550
get_function_service,
5651
get_job_service,
57-
get_solver_service,
5852
)
59-
from ..dependencies.webserver_http import get_webserver_session
6053
from ..dependencies.webserver_rpc import get_wb_api_rpc_client
6154
from . import solvers_jobs, studies_jobs
6255
from ._constants import (
@@ -372,13 +365,13 @@ async def run_function( # noqa: PLR0913
372365
function_id: FunctionID,
373366
to_run_function: Annotated[RegisteredFunction, Depends(get_function)],
374367
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
375-
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
376368
url_for: Annotated[Callable, Depends(get_reverse_url_mapper)],
377-
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
378369
function_inputs: FunctionInputs,
379370
user_id: Annotated[UserID, Depends(get_current_user_id)],
380371
product_name: Annotated[str, Depends(get_product_name)],
381-
solver_service: Annotated[SolverService, Depends(get_solver_service)],
372+
function_jobs_service: Annotated[
373+
FunctionJobService, Depends(get_function_job_service)
374+
],
382375
job_service: Annotated[JobService, Depends(get_job_service)],
383376
x_simcore_parent_project_uuid: Annotated[ProjectID | Literal["null"], Header()],
384377
x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()],
@@ -414,8 +407,6 @@ async def run_function( # noqa: PLR0913
414407
function_id=function_id,
415408
)
416409

417-
from .function_jobs_routes import function_job_status
418-
419410
joined_inputs = _join_inputs(
420411
to_run_function.default_inputs,
421412
function_inputs,
@@ -439,20 +430,9 @@ async def run_function( # noqa: PLR0913
439430
product_name=product_name,
440431
):
441432
for cached_function_job in cached_function_jobs:
442-
job_status = await function_job_status(
433+
job_status = await function_jobs_service.inspect_function_job(
443434
function=to_run_function,
444435
function_job=cached_function_job,
445-
stored_job_status=await get_stored_job_status(
446-
function_job_id=cached_function_job.uid,
447-
user_id=user_id,
448-
product_name=product_name,
449-
wb_api_rpc=wb_api_rpc,
450-
),
451-
wb_api_rpc=wb_api_rpc,
452-
user_id=user_id,
453-
director2_api=director2_api,
454-
product_name=product_name,
455-
job_service=job_service,
456436
)
457437
if job_status.status == RunningState.SUCCESS:
458438
return cached_function_job
@@ -565,13 +545,13 @@ async def map_function( # noqa: PLR0913
565545
to_run_function: Annotated[RegisteredFunction, Depends(get_function)],
566546
function_inputs_list: FunctionInputsList,
567547
wb_api_rpc: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
568-
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
569548
url_for: Annotated[Callable, Depends(get_reverse_url_mapper)],
570-
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
571549
user_id: Annotated[UserID, Depends(get_current_user_id)],
572550
product_name: Annotated[str, Depends(get_product_name)],
573-
solver_service: Annotated[SolverService, Depends(get_solver_service)],
574551
job_service: Annotated[JobService, Depends(get_job_service)],
552+
function_jobs_service: Annotated[
553+
FunctionJobService, Depends(get_function_job_service)
554+
],
575555
x_simcore_parent_project_uuid: Annotated[ProjectID | Literal["null"], Header()],
576556
x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()],
577557
) -> RegisteredFunctionJobCollection:
@@ -583,12 +563,10 @@ async def map_function( # noqa: PLR0913
583563
function_inputs=function_inputs,
584564
product_name=product_name,
585565
user_id=user_id,
586-
webserver_api=webserver_api,
587566
url_for=url_for,
588-
director2_api=director2_api,
589567
request=request,
590-
solver_service=solver_service,
591568
job_service=job_service,
569+
function_jobs_service=function_jobs_service,
592570
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
593571
x_simcore_parent_node_id=x_simcore_parent_node_id,
594572
)

0 commit comments

Comments
 (0)