Skip to content

Commit 5d74010

Browse files
committed
✨ JobService: Implement job listing with pagination and refactor SolverService and StudiesService to utilize it
1 parent 864aed4 commit 5d74010

File tree

4 files changed

+99
-115
lines changed

4 files changed

+99
-115
lines changed

services/api-server/src/simcore_service_api_server/_service_job.py

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,33 @@
11
import logging
22
from collections.abc import Callable
3-
from typing import Annotated
43

5-
from fastapi import Depends
64
from models_library.api_schemas_webserver.projects import ProjectCreateNew, ProjectGet
75
from models_library.products import ProductName
86
from models_library.projects import ProjectID
97
from models_library.projects_nodes_io import NodeID
8+
from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt
9+
from models_library.rpc_pagination import PageLimitInt
1010
from models_library.users import UserID
1111
from pydantic import HttpUrl
12-
from servicelib.fastapi.app_state import SingletonInAppStateMixin
1312
from servicelib.logging_utils import log_context
1413

15-
from .api.dependencies.authentication import (
16-
get_current_user_id,
17-
get_product_name,
18-
)
19-
from .api.dependencies.webserver_http import get_webserver_session
20-
from .api.dependencies.webserver_rpc import (
21-
get_wb_api_rpc_client,
22-
)
2314
from .models.schemas.jobs import Job, JobInputs
2415
from .models.schemas.programs import Program
2516
from .models.schemas.solvers import Solver
2617
from .services_http.solver_job_models_converters import (
2718
create_job_from_project,
19+
create_job_inputs_from_node_inputs,
2820
create_new_project_for_job,
2921
)
3022
from .services_http.webserver import AuthSession
3123
from .services_rpc.wb_api_server import WbApiRpcClient
3224

3325
_logger = logging.getLogger(__name__)
3426

27+
DEFAULT_PAGINATION_LIMIT = 999 # MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE - 1
3528

36-
class JobService(SingletonInAppStateMixin):
37-
app_state_name = "JobService"
29+
30+
class JobService:
3831
_web_rest_api: AuthSession
3932
_web_rpc_api: WbApiRpcClient
4033
_user_id: UserID
@@ -43,16 +36,64 @@ class JobService(SingletonInAppStateMixin):
4336
def __init__(
4437
self,
4538
*,
46-
web_rest_api: Annotated[AuthSession, Depends(get_webserver_session)],
47-
web_rpc_api: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
48-
user_id: Annotated[UserID, Depends(get_current_user_id)],
49-
product_name: Annotated[ProductName, Depends(get_product_name)],
39+
web_rest_api: AuthSession,
40+
web_rpc_api: WbApiRpcClient,
41+
user_id: UserID,
42+
product_name: ProductName,
5043
):
5144
self._web_rest_api = web_rest_api
5245
self._web_rpc_api = web_rpc_api
5346
self._user_id = user_id
5447
self._product_name = product_name
5548

49+
async def list_jobs_by_resource_prefix(
50+
self,
51+
*,
52+
job_parent_resource_name_prefix: str,
53+
offset: PageOffsetInt = 0,
54+
limit: PageLimitInt = DEFAULT_PAGINATION_LIMIT,
55+
) -> tuple[list[Job], PageMetaInfoLimitOffset]:
56+
"""Lists all jobs for a user with pagination based on resource name prefix"""
57+
58+
# List projects marked as jobs
59+
projects_page = await self._web_rpc_api.list_projects_marked_as_jobs(
60+
product_name=self._product_name,
61+
user_id=self._user_id,
62+
offset=offset,
63+
limit=limit,
64+
job_parent_resource_name_prefix=job_parent_resource_name_prefix,
65+
)
66+
67+
# Convert projects to jobs
68+
jobs: list[Job] = []
69+
for project_job in projects_page.data:
70+
assert ( # nosec
71+
len(project_job.workbench) == 1
72+
), "Expected only one solver node in workbench"
73+
74+
solver_node = next(iter(project_job.workbench.values()))
75+
job_inputs: JobInputs = create_job_inputs_from_node_inputs(
76+
inputs=solver_node.inputs or {}
77+
)
78+
assert project_job.job_parent_resource_name # nosec
79+
80+
jobs.append(
81+
Job(
82+
id=project_job.uuid,
83+
name=Job.compose_resource_name(
84+
project_job.job_parent_resource_name, project_job.uuid
85+
),
86+
inputs_checksum=job_inputs.compute_checksum(),
87+
created_at=project_job.created_at,
88+
runner_name=project_job.job_parent_resource_name,
89+
url=None,
90+
runner_url=None,
91+
outputs_url=None,
92+
)
93+
)
94+
95+
return jobs, projects_page.meta
96+
5697
async def create_job(
5798
self,
5899
*,

services/api-server/src/simcore_service_api_server/_service_solvers.py

Lines changed: 7 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from models_library.api_schemas_catalog.services import ServiceListFilters
33
from models_library.basic_types import VersionStr
44
from models_library.products import ProductName
5-
from models_library.projects_nodes import Node
65
from models_library.rest_pagination import (
76
MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE,
87
PageMetaInfoLimitOffset,
@@ -18,29 +17,26 @@
1817
SolverServiceListJobsFiltersError,
1918
)
2019

20+
from ._service_job import JobService
2121
from .models.api_resources import compose_resource_name
22-
from .models.schemas.jobs import Job, JobInputs
22+
from .models.schemas.jobs import Job
2323
from .models.schemas.solvers import Solver, SolverKeyId
24-
from .services_http.solver_job_models_converters import (
25-
create_job_inputs_from_node_inputs,
26-
)
2724
from .services_rpc.catalog import CatalogService
28-
from .services_rpc.wb_api_server import WbApiRpcClient
2925

3026
DEFAULT_PAGINATION_LIMIT = MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE - 1
3127

3228

3329
class SolverService:
3430
_catalog_service: CatalogService
35-
_webserver_client: WbApiRpcClient
31+
_job_service: JobService
3632

3733
def __init__(
3834
self,
3935
catalog_service: CatalogService,
40-
webserver_client: WbApiRpcClient,
36+
job_service: JobService,
4137
):
4238
self._catalog_service = catalog_service
43-
self._webserver_client = webserver_client
39+
self._job_service = job_service
4440

4541
async def get_solver(
4642
self,
@@ -94,8 +90,6 @@ async def get_latest_release(
9490
async def list_jobs(
9591
self,
9692
*,
97-
product_name: ProductName,
98-
user_id: UserID,
9993
# filters
10094
solver_key: SolverKeyId | None = None,
10195
solver_version: VersionStr | None = None,
@@ -121,46 +115,13 @@ async def list_jobs(
121115
*collection_or_resource_ids
122116
)
123117

124-
# 2. List projects marked as jobs
125-
projects_page = await self._webserver_client.list_projects_marked_as_jobs(
126-
product_name=product_name,
127-
user_id=user_id,
118+
# Use the common implementation from JobService
119+
return await self._job_service.list_jobs_by_resource_prefix(
128120
offset=offset,
129121
limit=limit,
130122
job_parent_resource_name_prefix=job_parent_resource_name_prefix,
131123
)
132124

133-
# 3. Convert projects to jobs
134-
jobs: list[Job] = []
135-
for project_job in projects_page.data:
136-
137-
assert ( # nosec
138-
len(project_job.workbench) == 1
139-
), "Expected only one solver node in workbench"
140-
141-
solver_node: Node = next(iter(project_job.workbench.values()))
142-
job_inputs: JobInputs = create_job_inputs_from_node_inputs(
143-
inputs=solver_node.inputs or {}
144-
)
145-
assert project_job.job_parent_resource_name # nosec
146-
147-
jobs.append(
148-
Job(
149-
id=project_job.uuid,
150-
name=Job.compose_resource_name(
151-
project_job.job_parent_resource_name, project_job.uuid
152-
),
153-
inputs_checksum=job_inputs.compute_checksum(),
154-
created_at=project_job.created_at,
155-
runner_name=project_job.job_parent_resource_name,
156-
url=None,
157-
runner_url=None,
158-
outputs_url=None,
159-
)
160-
)
161-
162-
return jobs, projects_page.meta
163-
164125
async def solver_release_history(
165126
self,
166127
*,
Lines changed: 7 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,30 @@
1-
from models_library.products import ProductName
2-
from models_library.projects_nodes import Node
31
from models_library.rest_pagination import (
42
MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE,
53
PageMetaInfoLimitOffset,
64
PageOffsetInt,
75
)
86
from models_library.rpc_pagination import PageLimitInt
9-
from models_library.users import UserID
107
from simcore_service_api_server.models.schemas.studies import StudyID
118

9+
from ._service_job import JobService
1210
from .models.api_resources import compose_resource_name
13-
from .models.schemas.jobs import Job, JobInputs
14-
from .services_http.solver_job_models_converters import (
15-
create_job_inputs_from_node_inputs,
16-
)
17-
from .services_rpc.wb_api_server import WbApiRpcClient
11+
from .models.schemas.jobs import Job
1812

1913
DEFAULT_PAGINATION_LIMIT = MAXIMUM_NUMBER_OF_ITEMS_PER_PAGE - 1
2014

2115

2216
class StudiesService:
23-
_webserver_client: WbApiRpcClient
17+
_job_service: JobService
2418

2519
def __init__(
2620
self,
27-
webserver_client: WbApiRpcClient,
21+
job_service: JobService,
2822
):
29-
self._webserver_client = webserver_client
23+
self._job_service = job_service
3024

3125
async def list_jobs(
3226
self,
3327
*,
34-
product_name: ProductName,
35-
user_id: UserID,
3628
# filters
3729
study_id: StudyID | None = None,
3830
# pagination
@@ -52,42 +44,9 @@ async def list_jobs(
5244
*collection_or_resource_ids
5345
)
5446

55-
# 2. List projects marked as jobs
56-
projects_page = await self._webserver_client.list_projects_marked_as_jobs(
57-
product_name=product_name,
58-
user_id=user_id,
47+
# Use the common implementation from JobService
48+
return await self._job_service.list_jobs_by_resource_prefix(
5949
offset=offset,
6050
limit=limit,
6151
job_parent_resource_name_prefix=job_parent_resource_name_prefix,
6252
)
63-
64-
# 3. Convert projects to jobs
65-
jobs: list[Job] = []
66-
for project_job in projects_page.data:
67-
68-
assert ( # nosec
69-
len(project_job.workbench) == 1
70-
), "Expected only one solver node in workbench"
71-
72-
solver_node: Node = next(iter(project_job.workbench.values()))
73-
job_inputs: JobInputs = create_job_inputs_from_node_inputs(
74-
inputs=solver_node.inputs or {}
75-
)
76-
assert project_job.job_parent_resource_name # nosec
77-
78-
jobs.append(
79-
Job(
80-
id=project_job.uuid,
81-
name=Job.compose_resource_name(
82-
project_job.job_parent_resource_name, project_job.uuid
83-
),
84-
inputs_checksum=job_inputs.compute_checksum(),
85-
created_at=project_job.created_at,
86-
runner_name=project_job.job_parent_resource_name,
87-
url=None,
88-
runner_url=None,
89-
outputs_url=None,
90-
)
91-
)
92-
93-
return jobs, projects_page.meta

services/api-server/src/simcore_service_api_server/api/dependencies/services.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,19 @@
44
from typing import Annotated
55

66
from fastapi import Depends, HTTPException, Request, status
7+
from models_library.products import ProductName
8+
from models_library.users import UserID
79
from servicelib.rabbitmq import RabbitMQRPCClient
810
from simcore_service_api_server._service_studies import StudiesService
911

12+
from ..._service_job import JobService
1013
from ..._service_solvers import SolverService
1114
from ...services_rpc.catalog import CatalogService
1215
from ...services_rpc.wb_api_server import WbApiRpcClient
1316
from ...utils.client_base import BaseServiceClientApi
17+
from .authentication import get_current_user_id, get_product_name
1418
from .rabbitmq import get_rabbitmq_rpc_client
19+
from .webserver_http import AuthSession, get_webserver_session
1520
from .webserver_rpc import get_wb_api_rpc_client
1621

1722

@@ -50,9 +55,27 @@ def get_catalog_service(
5055
return CatalogService(client=rpc_client)
5156

5257

58+
def get_job_service(
59+
web_rest_api: Annotated[AuthSession, Depends(get_webserver_session)],
60+
web_rpc_api: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
61+
user_id: Annotated[UserID, Depends(get_current_user_id)],
62+
product_name: Annotated[ProductName, Depends(get_product_name)],
63+
) -> JobService:
64+
"""
65+
"Assembles" the JobsService layer to the underlying service and client interfaces
66+
in the context of the rest controller (i.e. api/dependencies)
67+
"""
68+
return JobService(
69+
web_rest_api=web_rest_api,
70+
web_rpc_api=web_rpc_api,
71+
user_id=user_id,
72+
product_name=product_name,
73+
)
74+
75+
5376
def get_solver_service(
5477
catalog_service: Annotated[CatalogService, Depends(get_catalog_service)],
55-
webserver_client: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
78+
job_service: Annotated[JobService, Depends(get_job_service)],
5679
) -> SolverService:
5780
"""
5881
"Assembles" the SolverService layer to the underlying service and client interfaces
@@ -61,13 +84,13 @@ def get_solver_service(
6184

6285
return SolverService(
6386
catalog_service=catalog_service,
64-
webserver_client=webserver_client,
87+
job_service=job_service,
6588
)
6689

6790

6891
def get_studies_service(
69-
webserver_client: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
92+
job_service: Annotated[JobService, Depends(get_job_service)],
7093
) -> StudiesService:
7194
return StudiesService(
72-
webserver_client=webserver_client,
95+
job_service=job_service,
7396
)

0 commit comments

Comments
 (0)