2424from pydantic import PositiveInt
2525from servicelib .common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
2626from servicelib .logging_utils import log_catch , log_context
27+ from servicelib .utils import limited_as_completed
2728
2829from ...core .errors import (
2930 ComputationalBackendNotConnectedError ,
3031 ComputationalBackendOnDemandNotReadyError ,
32+ ComputationalBackendTaskResultsNotReadyError ,
3133 TaskSchedulingError ,
3234)
3335from ...models .comp_runs import CompRunsAtDB , Iteration , RunMetadataDict
@@ -289,39 +291,31 @@ async def _process_completed_tasks(
289291 iteration : Iteration ,
290292 comp_run : CompRunsAtDB ,
291293 ) -> None :
292- try :
293- async with _cluster_dask_client (
294- user_id ,
295- self ,
296- use_on_demand_clusters = comp_run .use_on_demand_clusters ,
297- project_id = comp_run .project_uuid ,
298- run_id = comp_run .run_id ,
299- run_metadata = comp_run .metadata ,
300- ) as client :
301- tasks_results = await asyncio .gather (
302- * [client .get_task_result (t .job_id or "undefined" ) for t in tasks ],
303- return_exceptions = True ,
304- )
305- await asyncio .gather (
306- * [
294+ async with _cluster_dask_client (
295+ user_id ,
296+ self ,
297+ use_on_demand_clusters = comp_run .use_on_demand_clusters ,
298+ project_id = comp_run .project_uuid ,
299+ run_id = comp_run .run_id ,
300+ run_metadata = comp_run .metadata ,
301+ ) as client :
302+ tasks_results = await asyncio .gather (
303+ * [client .get_task_result (t .job_id or "undefined" ) for t in tasks ],
304+ return_exceptions = True ,
305+ )
306+ async for future in limited_as_completed (
307+ (
307308 self ._process_task_result (
308309 task , result , comp_run .metadata , iteration , comp_run .run_id
309310 )
310311 for task , result in zip (tasks , tasks_results , strict = True )
311- ]
312- )
313- finally :
314- async with _cluster_dask_client (
315- user_id ,
316- self ,
317- use_on_demand_clusters = comp_run .use_on_demand_clusters ,
318- project_id = comp_run .project_uuid ,
319- run_id = comp_run .run_id ,
320- run_metadata = comp_run .metadata ,
321- ) as client :
322- await asyncio .gather (
323- * [client .release_task_result (t .job_id ) for t in tasks if t .job_id ]
324- )
312+ ),
313+ limit = 10 ,
314+ ):
315+ with log_catch (_logger , reraise = False ):
316+ task_can_be_cleaned , job_id = await future
317+ if task_can_be_cleaned :
318+ await client .release_task_result (job_id )
325319
326320 async def _process_task_result (
327321 self ,
@@ -330,7 +324,8 @@ async def _process_task_result(
330324 run_metadata : RunMetadataDict ,
331325 iteration : Iteration ,
332326 run_id : PositiveInt ,
333- ) -> None :
327+ ) -> tuple [bool , str ]:
328+ """will return True and the job id if the task was successfully processed and can be released from the dask cluster"""
334329 _logger .debug ("received %s result: %s" , f"{ task = } " , f"{ result = } " )
335330 task_final_state = RunningState .FAILED
336331 simcore_platform_status = SimcorePlatformStatus .OK
@@ -357,17 +352,15 @@ async def _process_task_result(
357352 result ,
358353 )
359354 task_final_state = RunningState .SUCCESS
360-
355+ elif isinstance (result , ComputationalBackendTaskResultsNotReadyError ):
356+ # we did not manage to get the current state of the task
357+ # so we keep it as is
358+ assert task .job_id # nosec
359+ return False , task .job_id
361360 else :
362361 if isinstance (result , TaskCancelledError ):
363362 task_final_state = RunningState .ABORTED
364- # elif isinstance(
365- # result, ComputationalBackendTaskResultsNotReadyError
366- # ):
367- # # we did not manage to get the current state of the task
368- # # so we keep it as is
369- # task_final_state = task.state
370- # elif isinstance(result, ComputationalBackendNotConnectedError):
363+
371364 else :
372365 task_final_state = RunningState .FAILED
373366 errors .append (
@@ -424,6 +417,8 @@ async def _process_task_result(
424417 optional_progress = 1 ,
425418 optional_stopped = arrow .utcnow ().datetime ,
426419 )
420+ assert task .job_id # nosec
421+ return True , task .job_id
427422
428423 async def _task_progress_change_handler (
429424 self , event : tuple [UnixTimestamp , Any ]
0 commit comments