Skip to content

Commit 8c727c7

Browse files
committed
completed testing and added timeout
1 parent 0f91c3f commit 8c727c7

File tree

2 files changed

+40
-6
lines changed

2 files changed

+40
-6
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -363,11 +363,19 @@ async def _handle_computational_retrieval_error(
363363
)
364364
)
365365
task_errors: list[ErrorDict] = []
366+
check_time = arrow.utcnow()
366367
if task.errors:
367368
for error in task.errors:
368369
if error["type"] == _TASK_RETRIEVAL_ERROR_TYPE:
369370
# already had a timeout error, let's keep it
370371
task_errors.append(error)
372+
assert "ctx" in error # nosec
373+
assert (
374+
_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY in error["ctx"]
375+
) # nosec
376+
check_time = arrow.get(
377+
error["ctx"][_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY]
378+
)
371379
break
372380
if not task_errors:
373381
# first time we have this error
@@ -377,14 +385,22 @@ async def _handle_computational_retrieval_error(
377385
msg=f"{result}",
378386
type=_TASK_RETRIEVAL_ERROR_TYPE,
379387
ctx={
380-
_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: f"{arrow.utcnow()}",
388+
_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: f"{check_time}",
381389
"user_id": user_id,
382390
"project_id": f"{task.project_id}",
383391
"node_id": f"{task.node_id}",
384392
"job_id": task.job_id,
385393
},
386394
)
387395
)
396+
397+
# if the task has been running for too long, we consider it failed
398+
elapsed_time = arrow.utcnow() - check_time
399+
if (
400+
elapsed_time
401+
> self.settings.COMPUTATIONAL_BACKEND_MAX_WAITING_FOR_RETRIEVING_RESULTS
402+
):
403+
return RunningState.FAILED, SimcorePlatformStatus.BAD, task_errors, True
388404
# state is kept as STARTED so it will be retried
389405
return RunningState.STARTED, SimcorePlatformStatus.BAD, task_errors, False
390406

@@ -457,10 +473,6 @@ async def _process_task_result(
457473
) -> tuple[bool, str]:
458474
"""Returns True and the job ID if the task was successfully processed and can be released from the Dask cluster."""
459475
_logger.debug("received %s result: %s", f"{task=}", f"{result=}")
460-
task_final_state = RunningState.FAILED
461-
simcore_platform_status = SimcorePlatformStatus.OK
462-
task_errors: list[ErrorDict] = []
463-
task_completed = True
464476

465477
assert task.job_id # nosec
466478
(

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2387,7 +2387,7 @@ async def mocked_get_task_result(job_id: str) -> TaskOutputData:
23872387
len(expected_timeouted_tasks)
23882388
)
23892389
mocked_dask_client.get_task_result.reset_mock()
2390-
await asyncio.sleep(0.5) # wait a bit to ensure the retry decorator has reset
2390+
23912391
comp_tasks, _ = await assert_comp_tasks_and_comp_run_snapshot_tasks(
23922392
sqlalchemy_async_engine,
23932393
project_uuid=running_project.project.uuid,
@@ -2417,3 +2417,25 @@ async def mocked_get_task_result(job_id: str) -> TaskOutputData:
24172417
comp_runs.c.project_uuid == f"{running_project.project.uuid}",
24182418
),
24192419
)
2420+
2421+
# now we wait for the max time and the task should be marked as FAILED
2422+
await asyncio.sleep(with_short_max_wait_for_retrieving_results.total_seconds() + 1)
2423+
await scheduler_api.apply(
2424+
user_id=running_project.project.prj_owner,
2425+
project_id=running_project.project.uuid,
2426+
iteration=1,
2427+
)
2428+
assert mocked_dask_client.get_task_result.call_count == len(
2429+
expected_timeouted_tasks
2430+
)
2431+
# NOTE: we do not check all tasks here as some are depending on random others
2432+
# so some are ABORTED and others are FAILED depending on the random sample above
2433+
await assert_comp_runs(
2434+
sqlalchemy_async_engine,
2435+
expected_total=1,
2436+
expected_state=RunningState.FAILED,
2437+
where_statement=and_(
2438+
comp_runs.c.user_id == running_project.project.prj_owner,
2439+
comp_runs.c.project_uuid == f"{running_project.project.uuid}",
2440+
),
2441+
)

0 commit comments

Comments
 (0)