Skip to content

Commit 50aade2

Browse files
committed
port start_study_job code to job_service
1 parent 4b78421 commit 50aade2

File tree

5 files changed

+56
-48
lines changed

5 files changed

+56
-48
lines changed

services/api-server/src/simcore_service_api_server/_service_jobs.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
)
4444
from .models.schemas.programs import Program
4545
from .models.schemas.solvers import Solver, SolverKeyId
46-
from .models.schemas.studies import StudyID
46+
from .models.schemas.studies import Study, StudyID
4747
from .services_http.director_v2 import DirectorV2Api
4848
from .services_http.jobs import start_project
4949
from .services_http.solver_job_models_converters import (
@@ -65,14 +65,22 @@
6565
_logger = logging.getLogger(__name__)
6666

6767

68-
def compose_job_resource_name(solver_key, solver_version, job_id) -> str:
68+
def compose_solver_job_resource_name(solver_key, solver_version, job_id) -> str:
6969
"""Creates a unique resource name for solver's jobs"""
7070
return Job.compose_resource_name(
7171
parent_name=Solver.compose_resource_name(solver_key, solver_version),
7272
job_id=job_id,
7373
)
7474

7575

76+
def compose_study_job_resource_name(study_key, job_id) -> str:
77+
"""Creates a unique resource name for study's jobs"""
78+
return Job.compose_resource_name(
79+
parent_name=Study.compose_resource_name(study_key),
80+
job_id=job_id,
81+
)
82+
83+
7684
@dataclass(frozen=True, kw_only=True)
7785
class JobService:
7886
_web_rest_client: AuthSession
@@ -352,7 +360,7 @@ async def start_solver_job(
352360
"""
353361
Raises ProjectAlreadyStartedError if the project is already started
354362
"""
355-
job_name = compose_job_resource_name(solver_key, version, job_id)
363+
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
356364
_logger.debug("Start Job '%s'", job_name)
357365
job_parent_resource_name = Solver.compose_resource_name(solver_key, version)
358366
job = await self.get_job(
@@ -446,3 +454,21 @@ async def inspect_study_job(self, *, job_id: JobID) -> JobStatus:
446454
)
447455
job_status: JobStatus = create_jobstatus_from_task(task)
448456
return job_status
457+
458+
async def start_study_job(
459+
self,
460+
*,
461+
job_id: JobID,
462+
study_id: StudyID,
463+
pricing_spec: JobPricingSpecification | None,
464+
):
465+
job_name = compose_study_job_resource_name(study_id, job_id)
466+
await start_project(
467+
job_id=job_id,
468+
expected_job_name=job_name,
469+
webserver_api=self._web_rest_client,
470+
pricing_spec=pricing_spec,
471+
)
472+
return await self.inspect_study_job(
473+
job_id=job_id,
474+
)

services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,6 @@ async def run_function( # noqa: PLR0913
470470
request=request,
471471
study_id=to_run_function.project_id,
472472
job_id=study_job.id,
473-
webserver_api=webserver_api,
474473
job_service=job_service,
475474
)
476475
return await register_function_job(

services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from models_library.projects_nodes_io import NodeID
1212
from pydantic.types import PositiveInt
1313

14-
from ..._service_jobs import JobService, compose_job_resource_name
14+
from ..._service_jobs import JobService, compose_solver_job_resource_name
1515
from ...exceptions.backend_errors import ProjectAlreadyStartedError
1616
from ...exceptions.service_errors_utils import DEFAULT_BACKEND_SERVICE_STATUS_CODES
1717
from ...models.basic_types import VersionStr
@@ -124,7 +124,7 @@ async def delete_job(
124124
job_id: JobID,
125125
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
126126
):
127-
job_name = compose_job_resource_name(solver_key, version, job_id)
127+
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
128128
_logger.debug("Deleting Job '%s'", job_name)
129129

130130
await webserver_api.delete_project(project_id=job_id)
@@ -244,7 +244,7 @@ async def stop_job(
244244
user_id: Annotated[PositiveInt, Depends(get_current_user_id)],
245245
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
246246
):
247-
job_name = compose_job_resource_name(solver_key, version, job_id)
247+
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
248248
_logger.debug("Stopping Job '%s'", job_name)
249249

250250
return await stop_project(
@@ -269,7 +269,7 @@ async def inspect_job(
269269
job_id: JobID,
270270
job_service: Annotated[JobService, Depends(get_job_service)],
271271
) -> JobStatus:
272-
job_name = compose_job_resource_name(solver_key, version, job_id)
272+
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
273273
_logger.debug("Inspecting Job '%s'", job_name)
274274

275275
return await job_service.inspect_solver_job(
@@ -296,7 +296,7 @@ async def replace_job_custom_metadata(
296296
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
297297
url_for: Annotated[Callable, Depends(get_reverse_url_mapper)],
298298
):
299-
job_name = compose_job_resource_name(solver_key, version, job_id)
299+
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
300300
_logger.debug("Custom metadata for '%s'", job_name)
301301

302302
return await replace_custom_metadata(

services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_read.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from sqlalchemy.ext.asyncio import AsyncEngine
2222
from starlette.background import BackgroundTask
2323

24-
from ..._service_jobs import JobService, compose_job_resource_name
24+
from ..._service_jobs import JobService, compose_solver_job_resource_name
2525
from ..._service_solvers import SolverService
2626
from ...exceptions.custom_errors import InsufficientCreditsError, MissingWalletError
2727
from ...exceptions.service_errors_utils import DEFAULT_BACKEND_SERVICE_STATUS_CODES
@@ -275,7 +275,8 @@ async def get_job(
275275
):
276276
"""Gets job of a given solver"""
277277
_logger.debug(
278-
"Getting Job '%s'", compose_job_resource_name(solver_key, version, job_id)
278+
"Getting Job '%s'",
279+
compose_solver_job_resource_name(solver_key, version, job_id),
279280
)
280281

281282
solver = await solver_service.get_solver(
@@ -312,7 +313,7 @@ async def get_job_outputs(
312313
job_service: Annotated[JobService, Depends(get_job_service)],
313314
storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))],
314315
):
315-
job_name = compose_job_resource_name(solver_key, version, job_id)
316+
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
316317
_logger.debug("Get Job '%s' outputs", job_name)
317318

318319
project_marked_as_job = await job_service.get_job(
@@ -395,7 +396,7 @@ async def get_job_output_logfile(
395396
user_id: Annotated[PositiveInt, Depends(get_current_user_id)],
396397
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
397398
):
398-
job_name = compose_job_resource_name(solver_key, version, job_id)
399+
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
399400
_logger.debug("Get Job '%s' outputs logfile", job_name)
400401

401402
project_id = job_id
@@ -451,7 +452,7 @@ async def get_job_custom_metadata(
451452
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
452453
url_for: Annotated[Callable, Depends(get_reverse_url_mapper)],
453454
):
454-
job_name = compose_job_resource_name(solver_key, version, job_id)
455+
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
455456
_logger.debug("Custom metadata for '%s'", job_name)
456457

457458
return await get_custom_metadata(
@@ -481,7 +482,7 @@ async def get_job_wallet(
481482
job_id: JobID,
482483
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
483484
) -> WalletGetWithAvailableCreditsLegacy:
484-
job_name = compose_job_resource_name(solver_key, version, job_id)
485+
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
485486
_logger.debug("Getting wallet for job '%s'", job_name)
486487

487488
if project_wallet := await webserver_api.get_project_wallet(project_id=job_id):
@@ -504,7 +505,7 @@ async def get_job_pricing_unit(
504505
job_id: JobID,
505506
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
506507
):
507-
job_name = compose_job_resource_name(solver_key, version, job_id)
508+
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
508509
with log_context(_logger, logging.DEBUG, "Get pricing unit"):
509510
_logger.debug("job: %s", job_name)
510511
project: ProjectGet = await webserver_api.get_project(project_id=job_id)
@@ -535,7 +536,7 @@ async def get_log_stream(
535536
):
536537
assert request # nosec
537538

538-
job_name = compose_job_resource_name(solver_key, version, job_id)
539+
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
539540
with log_context(
540541
_logger, logging.DEBUG, f"Streaming logs for {job_name=} and {user_id=}"
541542
):

services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from pydantic import HttpUrl, PositiveInt
1313
from servicelib.logging_utils import log_context
1414

15-
from ..._service_jobs import JobService
15+
from ..._service_jobs import JobService, compose_study_job_resource_name
1616
from ...exceptions.backend_errors import ProjectAlreadyStartedError
1717
from ...models.api_resources import parse_resources_ids
1818
from ...models.pagination import Page, PaginationParams
@@ -27,12 +27,11 @@
2727
JobPricingSpecification,
2828
JobStatus,
2929
)
30-
from ...models.schemas.studies import JobLogsMap, Study, StudyID
30+
from ...models.schemas.studies import JobLogsMap, StudyID
3131
from ...services_http.director_v2 import DirectorV2Api
3232
from ...services_http.jobs import (
3333
get_custom_metadata,
3434
replace_custom_metadata,
35-
start_project,
3635
stop_project,
3736
)
3837
from ...services_http.storage import StorageApi
@@ -57,14 +56,6 @@
5756
router = APIRouter()
5857

5958

60-
def _compose_job_resource_name(study_key, job_id) -> str:
61-
"""Creates a unique resource name for solver's jobs"""
62-
return Job.compose_resource_name(
63-
parent_name=Study.compose_resource_name(study_key),
64-
job_id=job_id,
65-
)
66-
67-
6859
@router.get(
6960
"/{study_id:uuid}/jobs",
7061
response_model=Page[Job],
@@ -129,7 +120,7 @@ async def create_study_job(
129120
x_simcore_parent_node_id=x_simcore_parent_node_id,
130121
hidden=hidden,
131122
)
132-
assert job.name == _compose_job_resource_name(study_id, job.id)
123+
assert job.name == compose_study_job_resource_name(study_id, job.id)
133124
job.url = url_for(
134125
"get_study_job",
135126
study_id=study_id,
@@ -175,7 +166,7 @@ async def delete_study_job(
175166
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
176167
):
177168
"""Deletes an existing study job"""
178-
job_name = _compose_job_resource_name(study_id, job_id)
169+
job_name = compose_study_job_resource_name(study_id, job_id)
179170
with log_context(_logger, logging.DEBUG, f"Deleting Job '{job_name}'"):
180171
await webserver_api.delete_project(project_id=job_id)
181172

@@ -212,7 +203,6 @@ async def start_study_job(
212203
request: Request,
213204
study_id: StudyID,
214205
job_id: JobID,
215-
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
216206
job_service: Annotated[JobService, Depends(get_job_service)],
217207
cluster_id: Annotated[ # pylint: disable=unused-argument # noqa: ARG001
218208
ClusterID | None,
@@ -230,29 +220,21 @@ async def start_study_job(
230220
):
231221
pricing_spec = JobPricingSpecification.create_from_headers(headers=request.headers)
232222

233-
job_name = _compose_job_resource_name(study_id, job_id)
223+
job_name = compose_study_job_resource_name(study_id, job_id)
234224
with log_context(_logger, logging.DEBUG, f"Starting Job '{job_name}'"):
235225
try:
236-
await start_project(
226+
return await job_service.start_study_job(
227+
study_id=study_id,
237228
job_id=job_id,
238-
expected_job_name=job_name,
239-
webserver_api=webserver_api,
240229
pricing_spec=pricing_spec,
241230
)
242231
except ProjectAlreadyStartedError:
243-
job_status: JobStatus = await inspect_study_job(
244-
study_id=study_id,
232+
job_status: JobStatus = await job_service.inspect_study_job(
245233
job_id=job_id,
246-
job_service=job_service,
247234
)
248235
return JSONResponse(
249236
content=jsonable_encoder(job_status), status_code=status.HTTP_200_OK
250237
)
251-
return await inspect_study_job(
252-
study_id=study_id,
253-
job_id=job_id,
254-
job_service=job_service,
255-
)
256238

257239

258240
@router.post(
@@ -265,7 +247,7 @@ async def stop_study_job(
265247
user_id: Annotated[PositiveInt, Depends(get_current_user_id)],
266248
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
267249
):
268-
job_name = _compose_job_resource_name(study_id, job_id)
250+
job_name = compose_study_job_resource_name(study_id, job_id)
269251
with log_context(_logger, logging.DEBUG, f"Stopping Job '{job_name}'"):
270252
return await stop_project(
271253
job_id=job_id, user_id=user_id, director2_api=director2_api
@@ -281,7 +263,7 @@ async def inspect_study_job(
281263
job_id: JobID,
282264
job_service: Annotated[JobService, Depends(get_job_service)],
283265
) -> JobStatus:
284-
job_name = _compose_job_resource_name(study_id, job_id)
266+
job_name = compose_study_job_resource_name(study_id, job_id)
285267
_logger.debug("Inspecting Job '%s'", job_name)
286268

287269
return await job_service.inspect_study_job(job_id=job_id)
@@ -298,7 +280,7 @@ async def get_study_job_outputs(
298280
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
299281
storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))],
300282
):
301-
job_name = _compose_job_resource_name(study_id, job_id)
283+
job_name = compose_study_job_resource_name(study_id, job_id)
302284
_logger.debug("Getting Job Outputs for '%s'", job_name)
303285

304286
project_outputs = await webserver_api.get_project_outputs(project_id=job_id)
@@ -345,7 +327,7 @@ async def get_study_job_custom_metadata(
345327
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
346328
url_for: Annotated[Callable, Depends(get_reverse_url_mapper)],
347329
):
348-
job_name = _compose_job_resource_name(study_id, job_id)
330+
job_name = compose_study_job_resource_name(study_id, job_id)
349331
msg = f"Gets metadata attached to study_id={study_id!r} job_id={job_id!r}.\njob_name={job_name!r}.\nSEE https://github.com/ITISFoundation/osparc-simcore/issues/4313"
350332
_logger.debug(msg)
351333

@@ -376,7 +358,7 @@ async def replace_study_job_custom_metadata(
376358
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
377359
url_for: Annotated[Callable, Depends(get_reverse_url_mapper)],
378360
):
379-
job_name = _compose_job_resource_name(study_id, job_id)
361+
job_name = compose_study_job_resource_name(study_id, job_id)
380362

381363
msg = f"Attaches metadata={replace.metadata!r} to study_id={study_id!r} job_id={job_id!r}.\njob_name={job_name!r}.\nSEE https://github.com/ITISFoundation/osparc-simcore/issues/4313"
382364
_logger.debug(msg)

0 commit comments

Comments
 (0)