Skip to content

Commit c3f16be

Browse files
committed
refactor
1 parent 858542f commit c3f16be

File tree

1 file changed

+24
-17
lines changed
  • services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler

1 file changed

+24
-17
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
2626
from servicelib.logging_errors import create_troubleshootting_log_kwargs
2727
from servicelib.logging_utils import log_catch, log_context
28-
from servicelib.utils import limited_as_completed
28+
from servicelib.utils import limited_as_completed, limited_gather
2929

3030
from ...core.errors import (
3131
ComputationalBackendNotConnectedError,
@@ -197,25 +197,32 @@ async def _process_executing_tasks(
197197
run_id=comp_run.run_id,
198198
run_metadata=comp_run.metadata,
199199
) as client:
200-
task_progresses = await client.get_tasks_progress(
201-
[f"{t.job_id}" for t in tasks],
202-
)
203-
for task_progress_event in task_progresses:
204-
if task_progress_event:
205-
await CompTasksRepository(
206-
self.db_engine
207-
).update_project_task_progress(
208-
task_progress_event.task_owner.project_id,
209-
task_progress_event.task_owner.node_id,
200+
task_progresses = [
201+
t
202+
for t in await client.get_tasks_progress(
203+
[f"{t.job_id}" for t in tasks],
204+
)
205+
if t is not None
206+
]
207+
await limited_gather(
208+
*(
209+
CompTasksRepository(self.db_engine).update_project_task_progress(
210+
t.task_owner.project_id,
211+
t.task_owner.node_id,
210212
comp_run.run_id,
211-
task_progress_event.progress,
213+
t.progress,
212214
)
215+
for t in task_progresses
216+
),
217+
log=_logger,
218+
limit=MAX_CONCURRENT_PIPELINE_SCHEDULING,
219+
)
213220

214221
except ComputationalBackendOnDemandNotReadyError:
215222
_logger.info("The on demand computational backend is not ready yet...")
216223

217224
comp_tasks_repo = CompTasksRepository(self.db_engine)
218-
await asyncio.gather(
225+
await limited_gather(
219226
*(
220227
comp_tasks_repo.update_project_task_progress(
221228
t.task_owner.project_id,
@@ -225,9 +232,7 @@ async def _process_executing_tasks(
225232
)
226233
for t in task_progresses
227234
if t
228-
)
229-
)
230-
await asyncio.gather(
235+
),
231236
*(
232237
publish_service_progress(
233238
self.rabbitmq_client,
@@ -238,7 +243,9 @@ async def _process_executing_tasks(
238243
)
239244
for t in task_progresses
240245
if t
241-
)
246+
),
247+
log=_logger,
248+
limit=MAX_CONCURRENT_PIPELINE_SCHEDULING,
242249
)
243250

244251
async def _release_resources(self, comp_run: CompRunsAtDB) -> None:

0 commit comments

Comments
 (0)