Skip to content

Commit 4f989b6

Browse files
committed
implement run function workflow
1 parent 981736f commit 4f989b6

File tree

3 files changed

+28
-26
lines changed

3 files changed

+28
-26
lines changed

services/api-server/src/simcore_service_api_server/_service_function_jobs.py

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,17 @@ async def create_registered_function_job(
286286

287287
return job.uid
288288

289+
@overload
290+
async def patch_registered_function_job(
291+
self,
292+
*,
293+
user_id: UserID,
294+
product_name: ProductName,
295+
function_job_id: FunctionJobID,
296+
function_class: FunctionClass,
297+
job_creation_task_id: TaskID | None,
298+
) -> RegisteredFunctionJob: ...
299+
289300
@overload
290301
async def patch_registered_function_job(
291302
self,
@@ -354,6 +365,7 @@ async def patch_registered_function_job(
354365
async def run_function(
355366
self,
356367
*,
368+
job_creation_task_id: TaskID | None,
357369
function: RegisteredFunction,
358370
job_inputs: JobInputs,
359371
pricing_spec: JobPricingSpecification | None,
@@ -377,18 +389,13 @@ async def run_function(
377389
job_id=study_job.id,
378390
pricing_spec=pricing_spec,
379391
)
380-
return await self._web_rpc_client.register_function_job(
381-
function_job=ProjectFunctionJob(
382-
function_uid=function.uid,
383-
title=f"Function job of function {function.uid}",
384-
description=function.description,
385-
inputs=job_inputs.values,
386-
outputs=None,
387-
project_job_id=study_job.id,
388-
job_creation_task_id=None,
389-
),
392+
return await self.patch_registered_function_job(
390393
user_id=self.user_id,
391394
product_name=self.product_name,
395+
function_job_id=study_job.id,
396+
function_class=FunctionClass.PROJECT,
397+
job_creation_task_id=job_creation_task_id,
398+
project_job_id=study_job.id,
392399
)
393400

394401
if function.function_class == FunctionClass.SOLVER:
@@ -407,18 +414,13 @@ async def run_function(
407414
job_id=solver_job.id,
408415
pricing_spec=pricing_spec,
409416
)
410-
return await self._web_rpc_client.register_function_job(
411-
function_job=SolverFunctionJob(
412-
function_uid=function.uid,
413-
title=f"Function job of function {function.uid}",
414-
description=function.description,
415-
inputs=job_inputs.values,
416-
outputs=None,
417-
solver_job_id=solver_job.id,
418-
job_creation_task_id=None,
419-
),
417+
return await self.patch_registered_function_job(
420418
user_id=self.user_id,
421419
product_name=self.product_name,
420+
function_job_id=solver_job.id,
421+
function_class=FunctionClass.SOLVER,
422+
job_creation_task_id=job_creation_task_id,
423+
solver_job_id=solver_job.id,
422424
)
423425

424426
raise UnsupportedFunctionClassError(

services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
RegisteredFunctionJobCollection,
1818
)
1919
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobFilter
20+
from models_library.functions import FunctionClass
2021
from models_library.functions_errors import FunctionJobCacheNotFoundError
2122
from models_library.products import ProductName
2223
from models_library.projects import ProjectID
2324
from models_library.projects_nodes_io import NodeID
2425
from models_library.users import UserID
25-
from servicelib.celery.models import TaskFilter, TaskMetadata, TasksQueue
26+
from servicelib.celery.models import TaskFilter, TaskID, TaskMetadata, TasksQueue
2627
from servicelib.fastapi.dependencies import get_reverse_url_mapper
2728

2829
from ..._service_function_jobs import FunctionJobService
@@ -347,10 +348,10 @@ async def run_function( # noqa: PLR0913
347348
)
348349
pricing_spec = JobPricingSpecification.create_from_headers(request.headers)
349350
job_links = await function_service.get_function_job_links(to_run_function, url_for)
350-
351351
job_inputs = await function_job_service.create_function_job_inputs(
352352
function=to_run_function, function_inputs=function_inputs
353353
)
354+
354355
try:
355356
# checks access rights
356357
return await function_job_service.get_cached_function_job(
@@ -398,10 +399,8 @@ async def run_function( # noqa: PLR0913
398399
user_id=user_identity.user_id,
399400
product_name=user_identity.product_name,
400401
function_job_id=pre_registered_function_job_id,
401-
registered_function_job_patch=RegisteredFunctionJobPatch(
402-
status=RunningState.RUNNING,
403-
task_id=task_uuid,
404-
),
402+
function_class=FunctionClass.PROJECT,
403+
job_creation_task_id=TaskID(task_uuid),
405404
)
406405

407406

services/api-server/src/simcore_service_api_server/celery/worker_tasks/functions_tasks.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ async def run_function(
107107
)
108108

109109
return await function_job_service.run_function(
110+
job_creation_task_id=task_id,
110111
function=function,
111112
job_inputs=job_inputs,
112113
pricing_spec=pricing_spec,

0 commit comments

Comments
 (0)