Skip to content

Commit 6bca85f

Browse files
committed
use limited_gather to run function in map
1 parent 7893372 commit 6bca85f

File tree

1 file changed

+22
-5
lines changed

1 file changed

+22
-5
lines changed

services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from models_library.users import UserID
2727
from servicelib.celery.models import TaskFilter, TaskID, TaskMetadata, TasksQueue
2828
from servicelib.fastapi.dependencies import get_reverse_url_mapper
29+
from servicelib.utils import limited_gather
2930

3031
from ..._service_function_jobs import FunctionJobService
3132
from ..._service_functions import FunctionService
@@ -452,10 +453,8 @@ async def map_function(
452453
x_simcore_parent_node_id: Annotated[NodeID | Literal["null"], Header()],
453454
) -> RegisteredFunctionJobCollection:
454455

455-
job_ids: list[FunctionJobID] = []
456-
457-
for function_inputs in function_inputs_list:
458-
job = await run_function(
456+
async def _run_single_function(function_inputs: FunctionInputs) -> FunctionJobID:
457+
result = await run_function(
459458
request=request,
460459
user_identity=user_identity,
461460
to_run_function=to_run_function,
@@ -466,7 +465,25 @@ async def map_function(
466465
x_simcore_parent_project_uuid=x_simcore_parent_project_uuid,
467466
x_simcore_parent_node_id=x_simcore_parent_node_id,
468467
)
469-
job_ids.append(job.uid)
468+
return result.uid
469+
470+
# Run all tasks concurrently, allowing them to complete even if some fail
471+
results = await limited_gather(
472+
*[
473+
_run_single_function(function_inputs)
474+
for function_inputs in function_inputs_list
475+
],
476+
reraise=False,
477+
limit=10,
478+
)
479+
480+
# Check if any tasks raised exceptions and raise the first one found
481+
for result in results:
482+
if isinstance(result, BaseException):
483+
raise result
484+
485+
# At this point, all results are FunctionJobID since we've checked for exceptions
486+
job_ids: list[FunctionJobID] = results # type: ignore[assignment]
470487

471488
function_job_collection_description = f"Function job collection of map of function {to_run_function.uid} with {len(function_inputs_list)} inputs"
472489
return await web_api_rpc_client.register_function_job_collection(

0 commit comments

Comments
 (0)