Skip to content

Commit 32e3d80

Browse files
committed
fix tests
1 parent 1437b91 commit 32e3d80

File tree

6 files changed

+41
-51
lines changed

6 files changed

+41
-51
lines changed

services/api-server/src/simcore_service_api_server/_service_function_jobs.py

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass
2-
from typing import Final, overload
2+
from typing import overload
33

44
import jsonschema
55
from common_library.exclude import as_dict_exclude_none
@@ -31,24 +31,16 @@
3131
from models_library.rpc_pagination import PageLimitInt
3232
from models_library.users import UserID
3333
from pydantic import ValidationError
34-
from servicelib.celery.models import TaskUUID
35-
from servicelib.celery.task_manager import TaskManager
3634
from simcore_service_api_server._service_functions import FunctionService
3735
from simcore_service_api_server.services_rpc.storage import StorageService
3836

3937
from ._service_jobs import JobService
40-
from .api.routes.tasks import _get_task_filter
4138
from .models.api_resources import JobLinks
4239
from .models.domain.functions import PreRegisteredFunctionJobData
4340
from .models.schemas.jobs import JobInputs, JobPricingSpecification
4441
from .services_http.webserver import AuthSession
4542
from .services_rpc.wb_api_server import WbApiRpcClient
4643

47-
_JOB_CREATION_TASK_STATUS_PREFIX: Final[str] = "JOB_CREATION_TASK_STATUS_"
48-
_JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS: Final[str] = (
49-
f"{_JOB_CREATION_TASK_STATUS_PREFIX}NOT_YET_SCHEDULED"
50-
)
51-
5244

5345
def join_inputs(
5446
default_inputs: FunctionInputs | None,
@@ -64,21 +56,6 @@ def join_inputs(
6456
return {**default_inputs, **function_inputs}
6557

6658

67-
async def _celery_task_status(
68-
job_creation_task_id: TaskID | None,
69-
task_manager: TaskManager,
70-
user_id: UserID,
71-
product_name: ProductName,
72-
) -> str:
73-
if job_creation_task_id is None:
74-
return _JOB_CREATION_TASK_NOT_YET_SCHEDULED_STATUS
75-
task_filter = _get_task_filter(user_id, product_name)
76-
task_status = await task_manager.get_task_status(
77-
task_uuid=TaskUUID(job_creation_task_id), task_filter=task_filter
78-
)
79-
return f"{_JOB_CREATION_TASK_STATUS_PREFIX}{task_status.task_state}"
80-
81-
8259
@dataclass(frozen=True, kw_only=True)
8360
class FunctionJobService:
8461
user_id: UserID

services/api-server/src/simcore_service_api_server/_service_function_jobs_task_client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ class FunctionJobTaskClientService:
9393
async def list_function_jobs_with_status(
9494
self,
9595
*,
96-
function: RegisteredFunction,
9796
filter_by_function_id: FunctionID | None = None,
9897
filter_by_function_job_ids: list[FunctionJobID] | None = None,
9998
filter_by_function_job_collection_id: FunctionJobCollectionID | None = None,
@@ -138,7 +137,9 @@ async def list_function_jobs_with_status(
138137
if function_job_wso.status.status == RunningState.SUCCESS:
139138
function_job_wso.outputs = await self.function_job_outputs(
140139
function_job=function_job_wso,
141-
function=function,
140+
function=await self._function_service.get_function(
141+
function_id=function_job_wso.function_uid,
142+
),
142143
stored_job_outputs=None,
143144
)
144145
return function_jobs_list_ws, meta

services/api-server/src/simcore_service_api_server/api/routes/function_jobs_routes.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from simcore_service_api_server._service_function_jobs_task_client import (
2323
FunctionJobTaskClientService,
2424
)
25-
from sqlalchemy.ext.asyncio import AsyncEngine
2625

2726
from ..._service_function_jobs import FunctionJobService
2827
from ..._service_functions import FunctionService
@@ -33,8 +32,6 @@
3332
from ...models.schemas.functions_filters import FunctionJobsListFilters
3433
from ...services_rpc.wb_api_server import WbApiRpcClient
3534
from ..dependencies.authentication import get_current_user_id, get_product_name
36-
from ..dependencies.celery import get_task_manager
37-
from ..dependencies.database import get_db_asyncpg_engine
3835
from ..dependencies.functions import (
3936
get_function_from_functionjob,
4037
get_function_job_dependency,
@@ -127,15 +124,13 @@ async def list_function_jobs(
127124
FunctionJobService, Depends(get_function_job_service)
128125
],
129126
filters: Annotated[FunctionJobsListFilters, Depends(get_function_jobs_filters)],
130-
function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)],
131127
include_status: Annotated[ # noqa: FBT002
132128
bool, Query(description="Include job status in response")
133129
] = False,
134130
):
135131
if include_status:
136132
function_jobs_list_ws, meta = (
137133
await function_job_task_client_service.list_function_jobs_with_status(
138-
function=function,
139134
pagination_offset=page_params.offset,
140135
pagination_limit=page_params.limit,
141136
filter_by_function_job_ids=filters.function_job_ids,
@@ -240,13 +235,12 @@ async def function_job_status(
240235
RegisteredFunctionJob, Depends(get_function_job_dependency)
241236
],
242237
function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)],
243-
function_job_service: Annotated[
244-
FunctionJobService, Depends(get_function_job_service)
238+
function_job_task_client_service: Annotated[
239+
FunctionJobTaskClientService, Depends(get_function_job_task_client_service)
245240
],
246241
) -> FunctionJobStatus:
247-
task_manager = get_task_manager(app)
248-
return await function_job_service.inspect_function_job(
249-
function=function, function_job=function_job, task_manager=task_manager
242+
return await function_job_task_client_service.inspect_function_job(
243+
function=function, function_job=function_job
250244
)
251245

252246

@@ -285,22 +279,16 @@ async def function_job_outputs(
285279
function_job: Annotated[
286280
RegisteredFunctionJob, Depends(get_function_job_dependency)
287281
],
288-
function_job_service: Annotated[
289-
FunctionJobService, Depends(get_function_job_service)
282+
function_job_task_client_service: Annotated[
283+
FunctionJobTaskClientService, Depends(get_function_job_task_client_service)
290284
],
291285
function: Annotated[RegisteredFunction, Depends(get_function_from_functionjob)],
292-
user_id: Annotated[UserID, Depends(get_current_user_id)],
293-
product_name: Annotated[ProductName, Depends(get_product_name)],
294286
stored_job_outputs: Annotated[FunctionOutputs, Depends(get_stored_job_outputs)],
295-
async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)],
296287
) -> FunctionOutputs:
297-
return await function_job_service.function_job_outputs(
288+
return await function_job_task_client_service.function_job_outputs(
298289
function_job=function_job,
299-
user_id=user_id,
300-
product_name=product_name,
301290
function=function,
302291
stored_job_outputs=stored_job_outputs,
303-
async_pg_engine=async_pg_engine,
304292
)
305293

306294

services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from servicelib.utils import limited_gather
3030

3131
from ..._service_function_jobs import FunctionJobService
32+
from ..._service_function_jobs_task_client import FunctionJobTaskClientService
3233
from ..._service_functions import FunctionService
3334
from ...celery_worker.worker_tasks.functions_tasks import (
3435
run_function as run_function_task,
@@ -45,7 +46,11 @@
4546
get_product_name,
4647
)
4748
from ..dependencies.celery import ASYNC_JOB_CLIENT_NAME, get_task_manager
48-
from ..dependencies.services import get_function_job_service, get_function_service
49+
from ..dependencies.services import (
50+
get_function_job_service,
51+
get_function_job_task_client_service,
52+
get_function_service,
53+
)
4954
from ..dependencies.webserver_rpc import get_wb_api_rpc_client
5055
from ._constants import (
5156
FMSG_CHANGELOG_ADDED_IN_VERSION,
@@ -332,6 +337,9 @@ async def run_function(
332337
function_job_service: Annotated[
333338
FunctionJobService, Depends(get_function_job_service)
334339
],
340+
function_job_task_client_service: Annotated[
341+
FunctionJobTaskClientService, Depends(get_function_job_task_client_service)
342+
],
335343
x_simcore_parent_project_uuid: Annotated[ProjectID | Literal["null"], Header()],
336344
x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()],
337345
) -> RegisteredFunctionJob:
@@ -355,7 +363,7 @@ async def run_function(
355363

356364
# check if results are cached
357365
with contextlib.suppress(FunctionJobCacheNotFoundError):
358-
return await function_job_service.get_cached_function_job(
366+
return await function_job_task_client_service.get_cached_function_job(
359367
function=to_run_function,
360368
job_inputs=job_inputs,
361369
)
@@ -447,6 +455,9 @@ async def map_function(
447455
function_jobs_service: Annotated[
448456
FunctionJobService, Depends(get_function_job_service)
449457
],
458+
function_job_task_client_service: Annotated[
459+
FunctionJobTaskClientService, Depends(get_function_job_task_client_service)
460+
],
450461
function_service: Annotated[FunctionService, Depends(get_function_service)],
451462
web_api_rpc_client: Annotated[WbApiRpcClient, Depends(get_wb_api_rpc_client)],
452463
x_simcore_parent_project_uuid: Annotated[ProjectID | Literal["null"], Header()],
@@ -462,6 +473,7 @@ async def _run_single_function(function_inputs: FunctionInputs) -> FunctionJobID
462473
function_inputs=function_inputs,
463474
function_service=function_service,
464475
function_job_service=function_jobs_service,
476+
function_job_task_client_service=function_job_task_client_service,
465477
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
466478
x_simcore_parent_node_id=x_simcore_parent_node_id,
467479
)

services/api-server/tests/unit/api_functions/conftest.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
3939
from pytest_simcore.helpers.typing_env import EnvVarsDict
4040
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
41+
from simcore_service_api_server.api.dependencies import services
4142
from simcore_service_api_server.api.dependencies.services import get_rabbitmq_rpc_client
4243
from simcore_service_api_server.api.routes.functions_routes import get_wb_api_rpc_client
4344
from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient
@@ -83,6 +84,14 @@ def _():
8384
return mocker
8485

8586

87+
@pytest.fixture
88+
async def mock_celery_task_manager(app: FastAPI, mocker: MockerFixture) -> MockType:
89+
def _(app: FastAPI):
90+
return None
91+
92+
return mocker.patch.object(services, services.get_task_manager.__name__, _)
93+
94+
8695
@pytest.fixture
8796
async def mock_wb_api_server_rpc(app: FastAPI, mocker: MockerFixture) -> MockerFixture:
8897

services/api-server/tests/unit/api_functions/test_api_routers_function_jobs.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@
3434
from pytest_mock import MockerFixture, MockType
3535
from servicelib.celery.models import TaskFilter, TaskState, TaskStatus, TaskUUID
3636
from simcore_service_api_server._meta import API_VTAG
37-
from simcore_service_api_server._service_function_jobs import FunctionJobService
38-
from simcore_service_api_server.api.routes import function_jobs_routes
39-
from simcore_service_api_server.api.routes.function_jobs_routes import (
37+
from simcore_service_api_server._service_function_jobs_task_client import (
4038
_JOB_CREATION_TASK_STATUS_PREFIX,
39+
FunctionJobTaskClientService,
4140
)
41+
from simcore_service_api_server.api.routes import function_jobs_routes
4242
from simcore_service_api_server.models.schemas.jobs import JobStatus
4343

4444
_faker = Faker()
@@ -112,6 +112,7 @@ async def test_get_function_job(
112112
async def test_list_function_jobs(
113113
client: AsyncClient,
114114
mock_rabbitmq_rpc_client: MockerFixture,
115+
mock_celery_task_manager: MockType,
115116
mock_handler_in_functions_rpc_interface: Callable[[str, Any], None],
116117
mock_registered_project_function_job: RegisteredProjectFunctionJob,
117118
auth: httpx.BasicAuth,
@@ -138,6 +139,7 @@ async def test_list_function_jobs(
138139
async def test_list_function_jobs_with_status(
139140
client: AsyncClient,
140141
mock_rabbitmq_rpc_client: MockerFixture,
142+
mock_celery_task_manager: MockType,
141143
mock_handler_in_functions_rpc_interface: Callable[[str, Any], None],
142144
mock_registered_project_function: RegisteredProjectFunction,
143145
mock_registered_project_function_job: RegisteredProjectFunctionJob,
@@ -167,7 +169,7 @@ async def test_list_function_jobs_with_status(
167169
)
168170

169171
mock_function_job_outputs = mocker.patch.object(
170-
FunctionJobService, "function_job_outputs", return_value=mock_outputs
172+
FunctionJobTaskClientService, "function_job_outputs", return_value=mock_outputs
171173
)
172174
mock_handler_in_functions_rpc_interface("get_function_job_status", mock_status)
173175
response = await client.get(
@@ -191,6 +193,7 @@ async def test_list_function_jobs_with_status(
191193

192194
async def test_list_function_jobs_with_job_id_filter(
193195
client: AsyncClient,
196+
mock_celery_task_manager: MockType,
194197
mock_rabbitmq_rpc_client: MockerFixture,
195198
mock_handler_in_functions_rpc_interface: Callable[[str], MockType],
196199
mock_registered_project_function_job: RegisteredProjectFunctionJob,

0 commit comments

Comments
 (0)