Skip to content

Commit 0a394cc

Browse files
committed
make it work
1 parent f0b0364 commit 0a394cc

File tree

3 files changed

+220
-78
lines changed

3 files changed

+220
-78
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 129 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@
2323
from models_library.users import UserID
2424
from pydantic import PositiveInt
2525
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
26+
from servicelib.logging_errors import create_troubleshootting_log_kwargs
2627
from servicelib.logging_utils import log_catch, log_context
2728
from servicelib.utils import limited_as_completed
2829

2930
from ...core.errors import (
3031
ComputationalBackendNotConnectedError,
3132
ComputationalBackendOnDemandNotReadyError,
3233
ComputationalBackendTaskResultsNotReadyError,
33-
TaskSchedulingError,
34+
PortsValidationError,
3435
)
3536
from ...models.comp_runs import CompRunsAtDB, Iteration, RunMetadataDict
3637
from ...models.comp_tasks import CompTaskAtDB
@@ -59,6 +60,9 @@
5960
_logger = logging.getLogger(__name__)
6061

6162
_DASK_CLIENT_RUN_REF: Final[str] = "{user_id}:{project_id}:{run_id}"
63+
_TASK_RETRIEVAL_ERROR_TYPE: Final[str] = "task-result-retrieval-timeout"
64+
_TASK_RETRIEVAL_ERROR_MSG: Final[str] = "Retrieval of task result timed-out"
65+
_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: Final[str] = "check_time"
6266

6367

6468
@asynccontextmanager
@@ -329,66 +333,121 @@ async def _process_task_result(
329333
_logger.debug("received %s result: %s", f"{task=}", f"{result=}")
330334
task_final_state = RunningState.FAILED
331335
simcore_platform_status = SimcorePlatformStatus.OK
332-
errors: list[ErrorDict] = []
336+
task_errors: list[ErrorDict] = []
337+
task_completed = True
333338

334-
if task.job_id is not None:
335-
(
336-
_service_key,
337-
_service_version,
338-
user_id,
339-
project_id,
340-
node_id,
341-
) = parse_dask_job_id(task.job_id)
339+
assert task.job_id # nosec
340+
(
341+
_service_key,
342+
_service_version,
343+
user_id,
344+
project_id,
345+
node_id,
346+
) = parse_dask_job_id(task.job_id)
342347

343-
assert task.project_id == project_id # nosec
344-
assert task.node_id == node_id # nosec
348+
assert task.project_id == project_id # nosec
349+
assert task.node_id == node_id # nosec
350+
assert task.job_id # nosec
351+
log_error_context = {
352+
"user_id": user_id,
353+
"project_id": project_id,
354+
"node_id": node_id,
355+
"job_id": task.job_id,
356+
}
345357

358+
if isinstance(result, TaskOutputData):
359+
# That means the task successfully completed
346360
try:
347-
if isinstance(result, TaskOutputData):
348-
# success!
349-
await parse_output_data(
350-
self.db_engine,
351-
task.job_id,
352-
result,
361+
await parse_output_data(
362+
self.db_engine,
363+
task.job_id,
364+
result,
365+
)
366+
task_final_state = RunningState.SUCCESS
367+
except PortsValidationError as err:
368+
_logger.exception(
369+
**create_troubleshootting_log_kwargs(
370+
"Unexpected error while parsing output data, comp_tasks/comp_pipeline is not in sync with what was started",
371+
error=err,
372+
error_context=log_error_context,
373+
)
374+
)
375+
task_errors.extend(err.get_errors())
376+
task_final_state = RunningState.FAILED
377+
# NOTE: simcore platform state is still OK as the task ran fine, the issue is likely due to the service labels
378+
elif isinstance(result, ComputationalBackendTaskResultsNotReadyError):
379+
# Task result retrieval failed due to communication error, task will be retried
380+
# so we keep it as is
381+
_logger.warning(
382+
**create_troubleshootting_log_kwargs(
383+
f"Retrieval of task {task.job_id} result timed-out",
384+
error=result,
385+
error_context=log_error_context,
386+
tip="This can happen if the computational backend is overloaded with requests. It will be automatically retried again.",
387+
)
388+
)
389+
390+
if task.errors:
391+
for error in task.errors:
392+
if error["type"] == _TASK_RETRIEVAL_ERROR_TYPE:
393+
# already had a timeout error, let's keep it
394+
task_errors.append(error)
395+
break
396+
if not task_errors:
397+
# first time we have this error
398+
task_errors.append(
399+
ErrorDict(
400+
loc=(f"{task.project_id}", f"{task.node_id}"),
401+
msg=f"{result}",
402+
type=_TASK_RETRIEVAL_ERROR_TYPE,
403+
ctx={
404+
_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: f"{arrow.utcnow()}",
405+
"user_id": user_id,
406+
"project_id": project_id,
407+
"node_id": node_id,
408+
"job_id": task.job_id,
409+
},
410+
)
411+
)
412+
413+
task_completed = False
414+
else:
415+
# the task itself failed, check why
416+
if isinstance(result, TaskCancelledError):
417+
_logger.info(
418+
**create_troubleshootting_log_kwargs(
419+
f"Task {task.job_id} was cancelled",
420+
error=result,
421+
error_context=log_error_context,
353422
)
354-
task_final_state = RunningState.SUCCESS
355-
elif isinstance(result, ComputationalBackendTaskResultsNotReadyError):
356-
# Task result retrieval failed due to communication error, task will be retried
357-
# so we keep it as is
358-
assert task.job_id # nosec
359-
return False, task.job_id
360-
else:
361-
if isinstance(result, TaskCancelledError):
362-
task_final_state = RunningState.ABORTED
423+
)
424+
task_final_state = RunningState.ABORTED
363425

364-
else:
365-
task_final_state = RunningState.FAILED
366-
errors.append(
367-
{
368-
"loc": (
369-
f"{task.project_id}",
370-
f"{task.node_id}",
371-
),
372-
"msg": f"{result}",
373-
"type": "runtime",
374-
}
375-
)
376-
if isinstance(result, ComputationalBackendNotConnectedError):
377-
simcore_platform_status = SimcorePlatformStatus.BAD
378-
# we need to remove any invalid files in the storage
379-
await clean_task_output_and_log_files_if_invalid(
380-
self.db_engine, user_id, project_id, node_id
426+
else:
427+
_logger.info(
428+
**create_troubleshootting_log_kwargs(
429+
f"Task {task.job_id} completed with errors",
430+
error=result,
431+
error_context=log_error_context,
381432
)
382-
except TaskSchedulingError as err:
433+
)
383434
task_final_state = RunningState.FAILED
384-
simcore_platform_status = SimcorePlatformStatus.BAD
385-
errors = err.get_errors()
386-
_logger.debug(
387-
"Unexpected failure while processing results of %s: %s",
388-
f"{task=}",
389-
f"{errors=}",
435+
task_errors.append(
436+
ErrorDict(
437+
loc=(f"{task.project_id}", f"{task.node_id}"),
438+
msg=f"{result}",
439+
type="runtime",
440+
)
390441
)
391442

443+
if isinstance(result, ComputationalBackendNotConnectedError):
444+
simcore_platform_status = SimcorePlatformStatus.BAD
445+
# we need to remove any invalid files in the storage
446+
await clean_task_output_and_log_files_if_invalid(
447+
self.db_engine, user_id, project_id, node_id
448+
)
449+
450+
if task_completed:
392451
# resource tracking
393452
await publish_service_resource_tracking_stopped(
394453
self.rabbitmq_client,
@@ -408,17 +467,25 @@ async def _process_task_result(
408467
task_final_state=task_final_state,
409468
)
410469

411-
await CompTasksRepository(self.db_engine).update_project_tasks_state(
412-
task.project_id,
413-
run_id,
414-
[task.node_id],
415-
task_final_state,
416-
errors=errors,
417-
optional_progress=1,
418-
optional_stopped=arrow.utcnow().datetime,
419-
)
420-
assert task.job_id # nosec
421-
return True, task.job_id
470+
await CompTasksRepository(self.db_engine).update_project_tasks_state(
471+
task.project_id,
472+
run_id,
473+
[task.node_id],
474+
task_final_state,
475+
errors=task_errors,
476+
optional_progress=1,
477+
optional_stopped=arrow.utcnow().datetime,
478+
)
479+
else:
480+
await CompTasksRepository(self.db_engine).update_project_tasks_state(
481+
task.project_id,
482+
run_id,
483+
[task.node_id],
484+
RunningState.STARTED, # keep the same state as before
485+
errors=task_errors,
486+
)
487+
488+
return task_completed, task.job_id
422489

423490
async def _task_progress_change_handler(
424491
self, event: tuple[UnixTimestamp, Any]

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,8 @@ async def abort_computation_task(self, job_id: str) -> None:
524524

525525
async def get_task_result(self, job_id: str) -> TaskOutputData:
526526
_logger.debug("getting result of %s", f"{job_id=}")
527+
dask_utils.check_communication_with_scheduler_is_open(self.backend.client)
528+
dask_utils.check_scheduler_status(self.backend.client)
527529
try:
528530
task_future: distributed.Future = (
529531
await dask_utils.wrap_client_async_routine(

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py

Lines changed: 89 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import asyncio
1313
import datetime
14+
import random
1415
from collections.abc import AsyncIterator, Awaitable, Callable
1516
from copy import deepcopy
1617
from dataclasses import dataclass
@@ -33,6 +34,7 @@
3334
from faker import Faker
3435
from fastapi.applications import FastAPI
3536
from models_library.computations import CollectionRunID
37+
from models_library.errors import ErrorDict
3638
from models_library.projects import ProjectAtDB, ProjectID
3739
from models_library.projects_nodes_io import NodeID
3840
from models_library.projects_state import RunningState
@@ -72,6 +74,8 @@
7274
BaseCompScheduler,
7375
)
7476
from simcore_service_director_v2.modules.comp_scheduler._scheduler_dask import (
77+
_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY,
78+
_TASK_RETRIEVAL_ERROR_TYPE,
7579
DaskScheduler,
7680
)
7781
from simcore_service_director_v2.modules.comp_scheduler._utils import COMPLETED_STATES
@@ -2275,14 +2279,7 @@ async def test_run_new_pipeline_called_twice_prevents_duplicate_runs(
22752279
)
22762280

22772281

2278-
@pytest.mark.parametrize(
2279-
"exception_type",
2280-
[
2281-
ComputationalBackendTaskResultsNotReadyError,
2282-
],
2283-
)
22842282
async def test_getting_task_result_raises_exception_does_not_fail_task_and_retries(
2285-
exception_type: Exception,
22862283
with_disabled_auto_scheduling: mock.Mock,
22872284
with_disabled_scheduler_publisher: mock.Mock,
22882285
mocked_dask_client: mock.MagicMock,
@@ -2300,13 +2297,21 @@ async def mocked_get_tasks_status(job_ids: list[str]) -> list[RunningState]:
23002297
return [RunningState.SUCCESS for j in job_ids]
23012298

23022299
mocked_dask_client.get_tasks_status.side_effect = mocked_get_tasks_status
2303-
call_count = 0
23042300

2305-
async def mocked_get_task_result(_job_id: str) -> TaskOutputData:
2306-
nonlocal call_count
2307-
call_count += 1
2308-
if call_count > 1:
2309-
raise exception_type
2301+
computational_tasks = [
2302+
t for t in running_project.tasks if t.node_class is NodeClass.COMPUTATIONAL
2303+
]
2304+
expected_timeouted_tasks = random.choices( # noqa: S311
2305+
computational_tasks, k=len(computational_tasks) - 1
2306+
)
2307+
successful_tasks = [
2308+
t for t in computational_tasks if t not in expected_timeouted_tasks
2309+
]
2310+
2311+
async def mocked_get_task_result(job_id: str) -> TaskOutputData:
2312+
nonlocal expected_timeouted_tasks
2313+
if job_id in [t.job_id for t in expected_timeouted_tasks]:
2314+
raise ComputationalBackendTaskResultsNotReadyError(job_id=job_id)
23102315
return TaskOutputData.model_validate({"whatever_output": 123})
23112316

23122317
mocked_dask_client.get_task_result.side_effect = mocked_get_task_result
@@ -2317,10 +2322,49 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData:
23172322
project_id=running_project.project.uuid,
23182323
iteration=1,
23192324
)
2320-
assert mocked_dask_client.get_task_result.call_count == len(
2321-
[t for t in running_project.tasks if t.node_class is NodeClass.COMPUTATIONAL]
2322-
)
2325+
assert mocked_dask_client.get_task_result.call_count == len(computational_tasks)
23232326
mocked_dask_client.get_task_result.reset_mock()
2327+
2328+
# check the tasks in the DB, the error shall be set there and the task state is set back to STARTED
2329+
comp_tasks, _ = await assert_comp_tasks_and_comp_run_snapshot_tasks(
2330+
sqlalchemy_async_engine,
2331+
project_uuid=running_project.project.uuid,
2332+
task_ids=[t.node_id for t in expected_timeouted_tasks],
2333+
expected_state=RunningState.STARTED,
2334+
expected_progress=0,
2335+
run_id=running_project.runs.run_id,
2336+
)
2337+
# we should have an error in all these comp_tasks
2338+
retrieval_times = []
2339+
for t in comp_tasks:
2340+
assert t.errors
2341+
assert len(t.errors) == 1
2342+
error_dict = TypeAdapter(ErrorDict).validate_python(t.errors[0])
2343+
assert error_dict["type"] == _TASK_RETRIEVAL_ERROR_TYPE
2344+
assert "ctx" in error_dict
2345+
assert _TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY in error_dict["ctx"]
2346+
retrieval_times.append(
2347+
error_dict["ctx"][_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY]
2348+
)
2349+
2350+
await assert_comp_tasks_and_comp_run_snapshot_tasks(
2351+
sqlalchemy_async_engine,
2352+
project_uuid=running_project.project.uuid,
2353+
task_ids=[t.node_id for t in successful_tasks],
2354+
expected_state=RunningState.SUCCESS,
2355+
expected_progress=1.0,
2356+
run_id=running_project.runs.run_id,
2357+
)
2358+
await assert_comp_runs(
2359+
sqlalchemy_async_engine,
2360+
expected_total=1,
2361+
expected_state=RunningState.STARTED,
2362+
where_statement=and_(
2363+
comp_runs.c.user_id == running_project.project.prj_owner,
2364+
comp_runs.c.project_uuid == f"{running_project.project.uuid}",
2365+
),
2366+
)
2367+
23242368
# calling again should not raise neither but try again
23252369
assert running_project.project.prj_owner
23262370
for _ in range(3):
@@ -2341,3 +2385,32 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData:
23412385
)
23422386
mocked_dask_client.get_task_result.reset_mock()
23432387
await asyncio.sleep(0.5) # wait a bit to ensure the retry decorator has reset
2388+
comp_tasks, _ = await assert_comp_tasks_and_comp_run_snapshot_tasks(
2389+
sqlalchemy_async_engine,
2390+
project_uuid=running_project.project.uuid,
2391+
task_ids=[t.node_id for t in expected_timeouted_tasks],
2392+
expected_state=RunningState.STARTED,
2393+
expected_progress=0,
2394+
run_id=running_project.runs.run_id,
2395+
)
2396+
# the times shall remain the same
2397+
for t in comp_tasks:
2398+
assert t.errors
2399+
assert len(t.errors) == 1
2400+
error_dict = TypeAdapter(ErrorDict).validate_python(t.errors[0])
2401+
assert error_dict["type"] == _TASK_RETRIEVAL_ERROR_TYPE
2402+
assert "ctx" in error_dict
2403+
assert _TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY in error_dict["ctx"]
2404+
# the time shall be the same as before
2405+
assert (
2406+
error_dict["ctx"][_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY] in retrieval_times
2407+
)
2408+
await assert_comp_runs(
2409+
sqlalchemy_async_engine,
2410+
expected_total=1,
2411+
expected_state=RunningState.STARTED,
2412+
where_statement=and_(
2413+
comp_runs.c.user_id == running_project.project.prj_owner,
2414+
comp_runs.c.project_uuid == f"{running_project.project.uuid}",
2415+
),
2416+
)

0 commit comments

Comments
 (0)