Skip to content

Commit f2cd24a

Browse files
committed
Finish merge with celery worker PR
1 parent 4b1a4d7 commit f2cd24a

File tree

13 files changed

+140
-151
lines changed

13 files changed

+140
-151
lines changed

.github/copilot-instructions.md

Lines changed: 0 additions & 61 deletions
This file was deleted.

packages/simcore-sdk/src/simcore_sdk/node_ports_v2/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ async def ports(
4141

4242
__all__ = (
4343
"DBManager",
44-
"exceptions",
4544
"FileLinkType",
4645
"Nodeports",
4746
"Port",
47+
"exceptions",
4848
"ports",
4949
)

scripts/common-service.Makefile

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,14 @@ info: ## displays service info
136136
#
137137

138138
.PHONY: _run-test-dev _run-test-ci
139+
# Use OVERRIDE_TEST_TARGET to override the default test target
140+
# Use TEST_DISABLE_PDB to disable pdb in these tests
141+
TEST_TARGET := $(if $(OVERRIDE_TEST_TARGET), $(OVERRIDE_TEST_TARGET), $(if $(target),$(target),$(CURDIR)/tests/unit))
142+
# Use TEST_DISABLE_PDB to disable pdb in these tests
143+
PDB_OPTION := $(if $(filter-out 0,$(TEST_DISABLE_PDB)),,--pdb)
144+
# Use TEST_ENABLE_TESTIT to enable testit in these tests
145+
TESTIT_OPTION := $(if $(filter-out 0,$(TEST_ENABLE_TESTIT)),-m testit,)
139146

140-
TEST_TARGET := $(if $(target),$(target),$(CURDIR)/tests/unit)
141147
PYTEST_ADDITIONAL_PARAMETERS := $(if $(pytest-parameters),$(pytest-parameters),)
142148
_run-test-dev: _check_venv_active
143149
# runs tests for development (e.g w/ pdb)
@@ -153,7 +159,8 @@ _run-test-dev: _check_venv_active
153159
--failed-first \
154160
--junitxml=junit.xml -o junit_family=legacy \
155161
--keep-docker-up \
156-
--pdb \
162+
$(PDB_OPTION) \
163+
$(TESTIT_OPTION) \
157164
-vv \
158165
$(PYTEST_ADDITIONAL_PARAMETERS) \
159166
$(TEST_TARGET)

services/api-server/src/simcore_service_api_server/_service_function_jobs.py

Lines changed: 18 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from models_library.users import UserID
4040
from pydantic import ValidationError
4141
from simcore_service_api_server._service_functions import FunctionService
42-
from simcore_service_api_server.services_http.storage import StorageApi
42+
from simcore_service_api_server.services_rpc.storage import StorageService
4343
from sqlalchemy.ext.asyncio import AsyncEngine
4444

4545
from ._service_jobs import JobService
@@ -73,11 +73,10 @@ class FunctionJobService:
7373
user_id: UserID
7474
product_name: ProductName
7575
_web_rpc_client: WbApiRpcClient
76-
_storage_client: StorageApi
76+
_storage_client: StorageService
7777
_job_service: JobService
7878
_function_service: FunctionService
7979
_webserver_api: AuthSession
80-
_async_pg_engine: AsyncEngine
8180

8281
async def list_function_jobs(
8382
self,
@@ -121,41 +120,15 @@ async def list_function_jobs_with_status(
121120
pagination_offset=pagination_offset, pagination_limit=pagination_limit
122121
)
123122

124-
function_jobs_wso, page_meta = (
125-
await self._web_rpc_client.list_function_jobs_with_status(
126-
user_id=self.user_id,
127-
product_name=self.product_name,
128-
filter_by_function_id=filter_by_function_id,
129-
filter_by_function_job_ids=filter_by_function_job_ids,
130-
filter_by_function_job_collection_id=filter_by_function_job_collection_id,
131-
**pagination_kwargs,
132-
)
123+
return await self._web_rpc_client.list_function_jobs_with_status(
124+
user_id=self.user_id,
125+
product_name=self.product_name,
126+
filter_by_function_id=filter_by_function_id,
127+
filter_by_function_job_ids=filter_by_function_job_ids,
128+
filter_by_function_job_collection_id=filter_by_function_job_collection_id,
129+
**pagination_kwargs,
133130
)
134131

135-
for function_job_wso in function_jobs_wso:
136-
if (
137-
function_job_wso.status.status
138-
not in (
139-
RunningState.SUCCESS,
140-
RunningState.FAILED,
141-
)
142-
) or function_job_wso.outputs is None:
143-
function = await self._function_service.get_function(
144-
function_id=function_job_wso.function_uid
145-
)
146-
function_job_wso.status = await self.inspect_function_job(
147-
function=function, function_job=function_job_wso
148-
)
149-
if function_job_wso.status.status == RunningState.SUCCESS:
150-
function_job_wso.outputs = await self.function_job_outputs(
151-
function_job=function_job_wso,
152-
function=function,
153-
user_id=self.user_id,
154-
product_name=self.product_name,
155-
stored_job_outputs=None,
156-
)
157-
return function_jobs_wso, page_meta
158-
159132
async def validate_function_inputs(
160133
self, *, function_id: FunctionID, inputs: FunctionInputs
161134
) -> tuple[bool, str]:
@@ -525,15 +498,19 @@ async def function_job_outputs(
525498
user_id: UserID,
526499
product_name: ProductName,
527500
stored_job_outputs: FunctionOutputs | None,
501+
async_pg_engine: AsyncEngine,
528502
) -> FunctionOutputs:
529503

530504
if stored_job_outputs is not None:
531505
return stored_job_outputs
532506

533-
job_status = await self.inspect_function_job(
534-
function=function,
535-
function_job=function_job,
536-
)
507+
try:
508+
job_status = await self.inspect_function_job(
509+
function=function,
510+
function_job=function_job,
511+
)
512+
except FunctionJobProjectMissingError:
513+
return None
537514

538515
if job_status.status != RunningState.SUCCESS:
539516
return None
@@ -564,6 +541,7 @@ async def function_job_outputs(
564541
solver_key=function.solver_key,
565542
version=function.solver_version,
566543
job_id=function_job.solver_job_id,
544+
async_pg_engine=async_pg_engine,
567545
)
568546
).results
569547
)

services/api-server/src/simcore_service_api_server/_service_jobs.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ class JobService:
9999
_storage_rest_client: StorageApi
100100
_directorv2_rpc_client: DirectorV2Service
101101
_solver_service: SolverService
102-
_async_pg_engine: AsyncEngine
103102

104103
user_id: UserID
105104
product_name: ProductName
@@ -300,7 +299,11 @@ async def get_job(
300299
)
301300

302301
async def get_solver_job_outputs(
303-
self, solver_key: SolverKeyId, version: VersionStr, job_id: JobID
302+
self,
303+
solver_key: SolverKeyId,
304+
version: VersionStr,
305+
job_id: JobID,
306+
async_pg_engine: AsyncEngine,
304307
) -> JobOutputs:
305308
job_name = compose_solver_job_resource_name(solver_key, version, job_id)
306309
_logger.debug("Get Job '%s' outputs", job_name)
@@ -340,7 +343,7 @@ async def get_solver_job_outputs(
340343
user_id=self.user_id,
341344
project_uuid=job_id,
342345
node_uuid=UUID(node_ids[0]),
343-
db_engine=self._async_pg_engine,
346+
db_engine=async_pg_engine,
344347
)
345348

346349
results: dict[str, ArgumentTypes] = {}

services/api-server/src/simcore_service_api_server/api/dependencies/services.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from models_library.products import ProductName
88
from models_library.users import UserID
99
from servicelib.rabbitmq import RabbitMQRPCClient
10-
from sqlalchemy.ext.asyncio import AsyncEngine
1110

1211
from ..._service_function_jobs import FunctionJobService
1312
from ..._service_functions import FunctionService
@@ -22,7 +21,6 @@
2221
from ...services_rpc.storage import StorageService
2322
from ...services_rpc.wb_api_server import WbApiRpcClient
2423
from ...utils.client_base import BaseServiceClientApi
25-
from ..dependencies.database import get_db_asyncpg_engine
2624
from .authentication import get_current_user_id, get_product_name
2725
from .rabbitmq import get_rabbitmq_rpc_client
2826
from .webserver_http import get_webserver_session
@@ -120,7 +118,6 @@ def get_job_service(
120118
user_id: Annotated[UserID, Depends(get_current_user_id)],
121119
product_name: Annotated[ProductName, Depends(get_product_name)],
122120
solver_service: Annotated[SolverService, Depends(get_solver_service)],
123-
async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)],
124121
) -> JobService:
125122
"""
126123
"Assembles" the JobsService layer to the underlying service and client interfaces
@@ -134,7 +131,6 @@ def get_job_service(
134131
_director2_api=director2_api,
135132
_storage_rest_client=storage_api,
136133
_solver_service=solver_service,
137-
_async_pg_engine=async_pg_engine,
138134
user_id=user_id,
139135
product_name=product_name,
140136
)
@@ -159,16 +155,15 @@ def get_function_job_service(
159155
user_id: Annotated[UserID, Depends(get_current_user_id)],
160156
product_name: Annotated[ProductName, Depends(get_product_name)],
161157
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
162-
storage_service: Annotated[StorageApi, Depends(get_storage_service)],
163-
async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)],
158+
storage_service: Annotated[StorageService, Depends(get_storage_service)],
159+
# async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)],
164160
) -> FunctionJobService:
165161
return FunctionJobService(
166162
_web_rpc_client=web_rpc_api,
167163
_job_service=job_service,
168164
_function_service=function_service,
169165
_storage_client=storage_service,
170166
_webserver_api=webserver_api,
171-
_async_pg_engine=async_pg_engine,
172167
user_id=user_id,
173168
product_name=product_name,
174169
)

services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,24 @@
2020
UnsupportedFunctionFunctionJobClassCombinationError,
2121
)
2222
from models_library.products import ProductName
23+
from models_library.projects_state import RunningState
2324
from models_library.users import UserID
2425
from servicelib.celery.models import TaskUUID
2526
from servicelib.fastapi.dependencies import get_app
2627
from servicelib.logging_errors import create_troubleshootting_log_kwargs
27-
from simcore_service_api_server._service_functions import FunctionService
28-
from simcore_service_api_server.models.schemas.functions_filters import (
29-
FunctionJobsListFilters,
30-
)
28+
from sqlalchemy.ext.asyncio import AsyncEngine
3129

3230
from ..._service_function_jobs import FunctionJobService
31+
from ..._service_functions import FunctionService
3332
from ..._service_jobs import JobService
3433
from ...exceptions.function_errors import FunctionJobProjectMissingError
3534
from ...models.pagination import Page, PaginationParams
3635
from ...models.schemas.errors import ErrorGet
36+
from ...models.schemas.functions_filters import FunctionJobsListFilters
3737
from ...services_rpc.wb_api_server import WbApiRpcClient
3838
from ..dependencies.authentication import get_current_user_id, get_product_name
3939
from ..dependencies.celery import get_task_manager
40+
from ..dependencies.database import get_db_asyncpg_engine
4041
from ..dependencies.functions import (
4142
get_function_from_functionjob,
4243
get_function_job_dependency,
@@ -104,11 +105,11 @@
104105
)
105106
)
106107

107-
if endpoint in ["list_function_jobs_with_status"]:
108+
if endpoint in ["list_function_jobs"]:
108109
CHANGE_LOGS[endpoint].append(
109110
FMSG_CHANGELOG_ADDED_IN_VERSION.format(
110111
WITH_STATUS_RELEASE_VERSION,
111-
"add /with-status endpoint to list function jobs with their status",
112+
"add include_status bool query parameter to list function jobs with their status",
112113
)
113114
)
114115

@@ -123,11 +124,16 @@
123124
),
124125
)
125126
async def list_function_jobs(
127+
app: Annotated[FastAPI, Depends(get_app)],
126128
page_params: Annotated[PaginationParams, Depends()],
127129
function_job_service: Annotated[
128130
FunctionJobService, Depends(get_function_job_service)
129131
],
132+
function_service: Annotated[FunctionService, Depends(get_function_service)],
130133
filters: Annotated[FunctionJobsListFilters, Depends(get_function_jobs_filters)],
134+
async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)],
135+
user_id: Annotated[UserID, Depends(get_current_user_id)],
136+
product_name: Annotated[ProductName, Depends(get_product_name)],
131137
include_status: Annotated[ # noqa: FBT002
132138
bool, Query(description="Include job status in response")
133139
] = False,
@@ -145,6 +151,38 @@ async def list_function_jobs(
145151
filter_by_function_id=filters.function_id,
146152
)
147153
)
154+
# the code below should ideally be in the service layer, but this can only be done if the
155+
# celery status resolution is done in the service layer too
156+
for function_job_wso in function_jobs_list:
157+
if (
158+
function_job_wso.status.status
159+
not in (
160+
RunningState.SUCCESS,
161+
RunningState.FAILED,
162+
)
163+
) or function_job_wso.outputs is None:
164+
function = await function_service.get_function(
165+
function_id=function_job_wso.function_uid
166+
)
167+
function_job_wso.status = await function_job_status(
168+
app=app,
169+
function=function,
170+
function_job=function_job_wso,
171+
function_job_service=function_job_service,
172+
user_id=user_id,
173+
product_name=product_name,
174+
)
175+
if function_job_wso.status.status == RunningState.SUCCESS:
176+
function_job_wso.outputs = (
177+
await function_job_service.function_job_outputs(
178+
function_job=function_job_wso,
179+
function=function,
180+
user_id=user_id,
181+
product_name=product_name,
182+
stored_job_outputs=None,
183+
async_pg_engine=async_pg_engine,
184+
)
185+
)
148186
else:
149187
function_jobs_list, meta = await function_job_service.list_function_jobs(
150188
pagination_offset=page_params.offset,
@@ -322,13 +360,15 @@ async def function_job_outputs(
322360
user_id: Annotated[UserID, Depends(get_current_user_id)],
323361
product_name: Annotated[ProductName, Depends(get_product_name)],
324362
stored_job_outputs: Annotated[FunctionOutputs, Depends(get_stored_job_outputs)],
363+
async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)],
325364
) -> FunctionOutputs:
326365
return await function_job_service.function_job_outputs(
327366
function_job=function_job,
328367
user_id=user_id,
329368
product_name=product_name,
330369
function=function,
331370
stored_job_outputs=stored_job_outputs,
371+
async_pg_engine=async_pg_engine,
332372
)
333373

334374

0 commit comments

Comments
 (0)