Skip to content

Commit 0a79893

Browse files
committed
use run function endpoint directly in map endpoint
1 parent e85bd50 commit 0a79893

File tree

2 files changed

+16
-74
lines changed

2 files changed

+16
-74
lines changed

services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py

Lines changed: 13 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,10 @@
2929

3030
from ..._service_function_jobs import FunctionJobService
3131
from ..._service_functions import FunctionService
32-
from ...celery_worker.worker_tasks.functions_tasks import function_map as map_task
3332
from ...celery_worker.worker_tasks.functions_tasks import (
3433
run_function as run_function_task,
3534
)
3635
from ...exceptions.function_errors import FunctionJobCacheNotFoundError
37-
from ...models.domain.functions import PreRegisteredFunctionJobData
3836
from ...models.pagination import Page, PaginationParams
3937
from ...models.schemas.errors import ErrorGet
4038
from ...models.schemas.jobs import JobPricingSpecification
@@ -454,81 +452,23 @@ async def map_function(
454452
x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()],
455453
) -> RegisteredFunctionJobCollection:
456454

457-
task_manager = get_task_manager(request.app)
458-
parent_project_uuid = (
459-
x_simcore_parent_project_uuid
460-
if isinstance(x_simcore_parent_project_uuid, ProjectID)
461-
else None
462-
)
463-
parent_node_id = (
464-
x_simcore_parent_node_id
465-
if isinstance(x_simcore_parent_node_id, NodeID)
466-
else None
467-
)
468-
pricing_spec = JobPricingSpecification.create_from_headers(request.headers)
469-
job_links = await function_service.get_function_job_links(to_run_function, url_for)
470-
471-
job_inputs_list = [
472-
await function_jobs_service.create_function_job_inputs(
473-
function=to_run_function, function_inputs=function_inputs
474-
)
475-
for function_inputs in function_inputs_list
476-
]
477-
478455
job_ids: list[FunctionJobID] = []
479-
pre_registered_function_job_data_list: list[PreRegisteredFunctionJobData] = []
480-
481-
for job_inputs in job_inputs_list:
482-
try:
483-
cached_job = await function_jobs_service.get_cached_function_job(
484-
function=to_run_function,
485-
job_inputs=job_inputs,
486-
)
487-
job_ids.append(cached_job.uid)
488-
except FunctionJobCacheNotFoundError:
489-
data = await function_jobs_service.pre_register_function_job(
490-
function=to_run_function,
491-
job_inputs=job_inputs,
492-
)
493-
pre_registered_function_job_data_list.append(data)
494-
job_ids.append(data.function_job_id)
495-
496-
# run map in celery task
497-
job_filter = AsyncJobFilter(
498-
user_id=user_identity.user_id,
499-
product_name=user_identity.product_name,
500-
client_name=ASYNC_JOB_CLIENT_NAME,
501-
)
502-
task_filter = TaskFilter.model_validate(job_filter.model_dump())
503-
task_name = map_task.__name__
504-
505-
task_uuid = await task_manager.submit_task(
506-
TaskMetadata(
507-
name=task_name,
508-
ephemeral=True,
509-
queue=TasksQueue.API_WORKER_QUEUE,
510-
),
511-
task_filter=task_filter,
512-
user_identity=user_identity,
513-
function=to_run_function,
514-
pre_registered_function_job_data_list=pre_registered_function_job_data_list,
515-
pricing_spec=pricing_spec,
516-
job_links=job_links,
517-
x_simcore_parent_project_uuid=parent_project_uuid,
518-
x_simcore_parent_node_id=parent_node_id,
519-
)
520456

521-
# patch pre-registered function jobs
522-
for data in pre_registered_function_job_data_list:
523-
await function_jobs_service.patch_registered_function_job(
524-
user_id=user_identity.user_id,
525-
product_name=user_identity.product_name,
526-
function_job_id=data.function_job_id,
527-
function_class=to_run_function.function_class,
528-
job_creation_task_id=TaskID(task_uuid),
457+
for function_inputs in function_inputs_list:
458+
job = await run_function(
459+
request=request,
460+
user_identity=user_identity,
461+
to_run_function=to_run_function,
462+
url_for=url_for,
463+
function_inputs=function_inputs,
464+
function_service=function_service,
465+
function_job_service=function_jobs_service,
466+
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
467+
x_simcore_parent_node_id=x_simcore_parent_node_id,
529468
)
469+
job_ids.append(job.uid)
530470

531-
function_job_collection_description = f"Function job collection of map of function {to_run_function.uid} with {len(pre_registered_function_job_data_list)} inputs"
471+
function_job_collection_description = f"Function job collection of map of function {to_run_function.uid} with {len(function_inputs_list)} inputs"
532472
return await web_api_rpc_client.register_function_job_collection(
533473
function_job_collection=FunctionJobCollection(
534474
title="Function job collection of function map",

services/api-server/tests/unit/api_functions/celery/test_functions_celery.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,9 @@ async def _register_function_job_collection_side_effect(*args, **kwargs):
670670
assert response.status_code == status.HTTP_200_OK
671671

672672
job_collection = FunctionJobCollection.model_validate(response.json())
673-
assert job_collection.job_ids == _generated_function_job_ids
673+
assert (
674+
job_collection.job_ids == _generated_function_job_ids
675+
), "Job ID did not preserve order or were incorrectly propagated"
674676
celery_task_ids = {
675677
elm.kwargs["registered_function_job_patch"].job_creation_task_id
676678
for elm in patch_mock.call_args_list

0 commit comments

Comments
 (0)