Skip to content

Commit fbb9a0f

Browse files
committed
limit gathering
1 parent de580ec commit fbb9a0f

File tree

1 file changed

+12
-2
lines changed
  • services/director-v2/src/simcore_service_director_v2/modules

1 file changed

+12
-2
lines changed

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
from pydantic.networks import AnyUrl
5959
from servicelib.logging_errors import create_troubleshootting_log_kwargs
6060
from servicelib.logging_utils import log_context
61+
from servicelib.utils import limited_gather
6162
from settings_library.s3 import S3Settings
6263
from simcore_sdk.node_ports_common.exceptions import NodeportsException
6364
from simcore_sdk.node_ports_v2 import FileLinkType
@@ -92,6 +93,7 @@
9293

9394

9495
_UserCallbackInSepThread = Callable[[], None]
96+
_MAX_CONCURRENT_CLIENT_CONNECTIONS: Final[int] = 10
9597

9698

9799
@dataclass(frozen=True, kw_only=True, slots=True)
@@ -423,7 +425,11 @@ async def _get_task_progress(job_id: str) -> TaskProgressEvent | None:
423425
# we are interested in the last event
424426
return TaskProgressEvent.model_validate_json(dask_events[-1][1])
425427

426-
return await asyncio.gather(*(_get_task_progress(job_id) for job_id in job_ids))
428+
return await limited_gather(
429+
*(_get_task_progress(job_id) for job_id in job_ids),
430+
log=_logger,
431+
limit=_MAX_CONCURRENT_CLIENT_CONNECTIONS,
432+
)
427433

428434
async def get_tasks_status(self, job_ids: Iterable[str]) -> list[RunningState]:
429435
dask_utils.check_scheduler_is_still_the_same(
@@ -501,7 +507,11 @@ async def _get_task_state(job_id: str) -> RunningState:
501507

502508
return parsed_event.state
503509

504-
return await asyncio.gather(*(_get_task_state(job_id) for job_id in job_ids))
510+
return await limited_gather(
511+
*(_get_task_state(job_id) for job_id in job_ids),
512+
log=_logger,
513+
limit=_MAX_CONCURRENT_CLIENT_CONNECTIONS,
514+
)
505515

506516
async def abort_computation_task(self, job_id: str) -> None:
507517
# Dask future may be cancelled, but only a future that was not already taken by

0 commit comments

Comments
 (0)