Skip to content

Commit 4eb2262

Browse files
committed
factor out function run pre check
1 parent cf57e04 commit 4eb2262

File tree

6 files changed

+80
-37
lines changed

6 files changed

+80
-37
lines changed

services/api-server/src/simcore_service_api_server/_service_function_jobs.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -160,17 +160,12 @@ async def inspect_function_job(
160160
job_status=new_job_status,
161161
)
162162

163-
async def run_function(
163+
async def run_function_pre_check(
164164
self,
165165
*,
166166
function: RegisteredFunction,
167167
function_inputs: FunctionInputs,
168-
pricing_spec: JobPricingSpecification | None,
169-
job_links: JobLinks,
170-
x_simcore_parent_project_uuid: NodeID | None,
171-
x_simcore_parent_node_id: NodeID | None,
172-
) -> RegisteredFunctionJob:
173-
168+
) -> JobInputs:
174169
user_api_access_rights = (
175170
await self._web_rpc_client.get_functions_user_api_access_rights(
176171
user_id=self.user_id, product_name=self.product_name
@@ -206,9 +201,24 @@ async def run_function(
206201
if not is_valid:
207202
raise FunctionInputsValidationError(error=validation_str)
208203

204+
return JobInputs(
205+
values=joined_inputs or {},
206+
)
207+
208+
async def run_function(
209+
self,
210+
*,
211+
function: RegisteredFunction,
212+
job_inputs: JobInputs,
213+
pricing_spec: JobPricingSpecification | None,
214+
job_links: JobLinks,
215+
x_simcore_parent_project_uuid: NodeID | None,
216+
x_simcore_parent_node_id: NodeID | None,
217+
) -> RegisteredFunctionJob:
218+
209219
if cached_function_jobs := await self._web_rpc_client.find_cached_function_jobs(
210220
function_id=function.uid,
211-
inputs=joined_inputs,
221+
inputs=job_inputs.values,
212222
user_id=self.user_id,
213223
product_name=self.product_name,
214224
):
@@ -223,7 +233,7 @@ async def run_function(
223233
if function.function_class == FunctionClass.PROJECT:
224234
study_job = await self._job_service.create_studies_job(
225235
study_id=function.project_id,
226-
job_inputs=JobInputs(values=joined_inputs or {}),
236+
job_inputs=job_inputs,
227237
hidden=True,
228238
job_links=job_links,
229239
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
@@ -239,7 +249,7 @@ async def run_function(
239249
function_uid=function.uid,
240250
title=f"Function job of function {function.uid}",
241251
description=function.description,
242-
inputs=joined_inputs,
252+
inputs=job_inputs.values,
243253
outputs=None,
244254
project_job_id=study_job.id,
245255
),
@@ -251,7 +261,7 @@ async def run_function(
251261
solver_job = await self._job_service.create_solver_job(
252262
solver_key=function.solver_key,
253263
version=function.solver_version,
254-
inputs=JobInputs(values=joined_inputs or {}),
264+
inputs=job_inputs,
255265
job_links=job_links,
256266
hidden=True,
257267
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
@@ -268,7 +278,7 @@ async def run_function(
268278
function_uid=function.uid,
269279
title=f"Function job of function {function.uid}",
270280
description=function.description,
271-
inputs=joined_inputs,
281+
inputs=job_inputs.values,
272282
outputs=None,
273283
solver_job_id=solver_job.id,
274284
),
@@ -291,16 +301,24 @@ async def map_function(
291301
x_simcore_parent_node_id: NodeID | None,
292302
) -> RegisteredFunctionJobCollection:
293303

304+
job_inputs = [
305+
await self.run_function_pre_check(
306+
function=function,
307+
function_inputs=inputs,
308+
)
309+
for inputs in function_inputs_list
310+
]
311+
294312
function_jobs = [
295313
await self.run_function(
296314
function=function,
297-
function_inputs=function_inputs,
315+
job_inputs=inputs,
298316
pricing_spec=pricing_spec,
299317
job_links=job_links,
300318
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
301319
x_simcore_parent_node_id=x_simcore_parent_node_id,
302320
)
303-
for function_inputs in function_inputs_list
321+
for inputs in job_inputs
304322
]
305323

306324
function_job_collection_description = f"Function job collection of map of function {function.uid} with {len(function_inputs_list)} inputs"

services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,9 @@ async def run_function( # noqa: PLR0913
328328
url_for: Annotated[Callable, Depends(get_reverse_url_mapper)],
329329
function_inputs: FunctionInputs,
330330
function_service: Annotated[FunctionService, Depends(get_function_service)],
331+
function_job_service: Annotated[
332+
FunctionJobService, Depends(get_function_job_service)
333+
],
331334
x_simcore_parent_project_uuid: Annotated[ProjectID | Literal["null"], Header()],
332335
x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()],
333336
) -> TaskGet:
@@ -345,6 +348,10 @@ async def run_function( # noqa: PLR0913
345348
pricing_spec = JobPricingSpecification.create_from_headers(request.headers)
346349
job_links = await function_service.get_function_job_links(to_run_function, url_for)
347350

351+
job_inputs = await function_job_service.run_function_pre_check(
352+
function=to_run_function, function_inputs=function_inputs
353+
)
354+
348355
job_filter = AsyncJobFilter(
349356
user_id=user_identity.user_id,
350357
product_name=user_identity.product_name,
@@ -362,7 +369,7 @@ async def run_function( # noqa: PLR0913
362369
task_filter=task_filter,
363370
user_identity=user_identity,
364371
function=to_run_function,
365-
function_inputs=function_inputs,
372+
job_inputs=job_inputs,
366373
pricing_spec=pricing_spec,
367374
job_links=job_links,
368375
x_simcore_parent_project_uuid=parent_project_uuid,

services/api-server/src/simcore_service_api_server/celery/worker_tasks/functions_tasks.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
)
44
from celery_library.utils import get_app_server # pylint: disable=no-name-in-module
55
from fastapi import FastAPI
6-
from models_library.functions import FunctionInputs, RegisteredFunction
6+
from models_library.functions import RegisteredFunction
77
from models_library.projects_nodes_io import NodeID
88
from servicelib.celery.models import TaskID
9+
from simcore_service_api_server._service_function_jobs import FunctionJobService
910

1011
from ...api.dependencies.authentication import Identity
1112
from ...api.dependencies.rabbitmq import get_rabbitmq_rpc_client
@@ -20,12 +21,14 @@
2021
from ...api.dependencies.webserver_http import get_session_cookie, get_webserver_session
2122
from ...api.dependencies.webserver_rpc import get_wb_api_rpc_client
2223
from ...models.api_resources import JobLinks
23-
from ...models.schemas.jobs import JobPricingSpecification
24+
from ...models.schemas.jobs import JobInputs, JobPricingSpecification
2425
from ...services_http.director_v2 import DirectorV2Api
2526
from ...services_http.storage import StorageApi
2627

2728

28-
async def _assemble_function_job_service(*, app: FastAPI, user_identity: Identity):
29+
async def _assemble_function_job_service(
30+
*, app: FastAPI, user_identity: Identity
31+
) -> FunctionJobService:
2932
# to avoid this show we could introduce a dependency injection
3033
# system which is not linked to FastAPI (i.e. can be resolved manually).
3134
# One suggestion: https://github.com/ets-labs/python-dependency-injector, which is compatible
@@ -91,7 +94,7 @@ async def run_function(
9194
*,
9295
user_identity: Identity,
9396
function: RegisteredFunction,
94-
function_inputs: FunctionInputs,
97+
job_inputs: JobInputs,
9598
pricing_spec: JobPricingSpecification | None,
9699
job_links: JobLinks,
97100
x_simcore_parent_project_uuid: NodeID | None,
@@ -105,7 +108,7 @@ async def run_function(
105108

106109
return await function_job_service.run_function(
107110
function=function,
108-
function_inputs=function_inputs,
111+
job_inputs=job_inputs,
109112
pricing_spec=pricing_spec,
110113
job_links=job_links,
111114
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,

services/api-server/src/simcore_service_api_server/celery/worker_tasks/tasks.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616

1717
from ...api.dependencies.authentication import Identity
1818
from ...models.api_resources import JobLinks
19-
from ...models.schemas.jobs import JobPricingSpecification
19+
from ...models.schemas.jobs import JobInputs, JobPricingSpecification
2020
from .functions_tasks import run_function
2121

2222
_logger = logging.getLogger(__name__)
2323

2424
pydantic_types_to_register = (
2525
Identity,
26+
JobInputs,
2627
JobLinks,
2728
JobPricingSpecification,
2829
RegisteredProjectFunction,

services/api-server/tests/unit/api_functions/celery/test_functions.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
from models_library.functions import (
3232
FunctionClass,
3333
FunctionID,
34-
FunctionInputs,
3534
FunctionJobID,
3635
FunctionUserAccessRights,
3736
FunctionUserApiAccessRights,
@@ -42,26 +41,31 @@
4241
)
4342
from models_library.projects import ProjectID
4443
from models_library.users import UserID
45-
from pytest_mock import MockType
44+
from pytest_mock import MockerFixture, MockType
4645
from pytest_simcore.helpers.httpx_calls_capture_models import HttpApiCallCaptureModel
4746
from servicelib.celery.models import TaskFilter, TaskID, TaskMetadata, TasksQueue
4847
from servicelib.common_headers import (
4948
X_SIMCORE_PARENT_NODE_ID,
5049
X_SIMCORE_PARENT_PROJECT_UUID,
5150
)
5251
from simcore_service_api_server._meta import API_VTAG
52+
from simcore_service_api_server._service_function_jobs import FunctionJobService
5353
from simcore_service_api_server.api.dependencies.authentication import Identity
5454
from simcore_service_api_server.api.dependencies.celery import (
5555
ASYNC_JOB_CLIENT_NAME,
5656
get_task_manager,
5757
)
58+
from simcore_service_api_server.api.dependencies.services import (
59+
get_function_job_service,
60+
)
5861
from simcore_service_api_server.api.routes.functions_routes import get_function
5962
from simcore_service_api_server.celery.worker_tasks.functions_tasks import (
6063
run_function as run_function_task,
6164
)
6265
from simcore_service_api_server.exceptions.backend_errors import BaseBackEndError
6366
from simcore_service_api_server.models.api_resources import JobLinks
6467
from simcore_service_api_server.models.schemas.jobs import (
68+
JobInputs,
6569
JobPricingSpecification,
6670
NodeID,
6771
)
@@ -112,7 +116,7 @@ async def run_function(
112116
*,
113117
user_identity: Identity,
114118
function: RegisteredFunction,
115-
function_inputs: FunctionInputs,
119+
job_inputs: JobInputs,
116120
pricing_spec: JobPricingSpecification | None,
117121
job_links: JobLinks,
118122
x_simcore_parent_project_uuid: NodeID | None,
@@ -122,7 +126,7 @@ async def run_function(
122126
title=_faker.sentence(),
123127
description=_faker.paragraph(),
124128
function_uid=FunctionID(_faker.uuid4()),
125-
inputs=function_inputs,
129+
inputs=job_inputs.values,
126130
outputs=None,
127131
function_class=FunctionClass.PROJECT,
128132
uid=FunctionJobID(_faker.uuid4()),
@@ -149,19 +153,9 @@ async def test_with_fake_run_function(
149153
app: FastAPI,
150154
client: AsyncClient,
151155
auth: BasicAuth,
156+
mocker: MockerFixture,
152157
with_api_server_celery_worker: TestWorkController,
153158
):
154-
app.dependency_overrides[get_function] = (
155-
lambda: RegisteredProjectFunction.model_validate(
156-
RegisteredProjectFunction.model_config.get("json_schema_extra", {}).get(
157-
"examples", []
158-
)[0]
159-
)
160-
)
161-
162-
headers = {}
163-
headers[X_SIMCORE_PARENT_PROJECT_UUID] = "null"
164-
headers[X_SIMCORE_PARENT_NODE_ID] = "null"
165159

166160
body = {
167161
"input_1": _faker.uuid4(),
@@ -175,6 +169,25 @@ async def test_with_fake_run_function(
175169
],
176170
}
177171

172+
async def mock_get_function_job_service() -> FunctionJobService:
173+
mock = mocker.AsyncMock(spec=FunctionJobService)
174+
mock.run_function_pre_check.return_value = JobInputs(values=body)
175+
return mock
176+
177+
app.dependency_overrides[get_function_job_service] = mock_get_function_job_service
178+
179+
app.dependency_overrides[get_function] = (
180+
lambda: RegisteredProjectFunction.model_validate(
181+
RegisteredProjectFunction.model_config.get("json_schema_extra", {}).get(
182+
"examples", []
183+
)[0]
184+
)
185+
)
186+
187+
headers = {}
188+
headers[X_SIMCORE_PARENT_PROJECT_UUID] = "null"
189+
headers[X_SIMCORE_PARENT_NODE_ID] = "null"
190+
178191
response = await client.post(
179192
f"/{API_VTAG}/functions/{_faker.uuid4()}:run",
180193
auth=auth,

services/api-server/tests/unit/api_functions/test_api_routers_functions.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from simcore_service_api_server.api.dependencies.authentication import Identity
5151
from simcore_service_api_server.celery.worker_tasks import functions_tasks
5252
from simcore_service_api_server.models.api_resources import JobLinks
53+
from simcore_service_api_server.models.schemas.jobs import JobInputs
5354
from simcore_service_api_server.services_rpc.wb_api_server import WbApiRpcClient
5455

5556
_faker = Faker()
@@ -478,7 +479,7 @@ def _default_side_effect(
478479
task_id=TaskID(_faker.uuid4()),
479480
user_identity=user_identity,
480481
function=mock_registered_project_function,
481-
function_inputs={},
482+
job_inputs=JobInputs(values={}),
482483
pricing_spec=None,
483484
job_links=job_links,
484485
x_simcore_parent_project_uuid=None,

0 commit comments

Comments
 (0)