Skip to content

Commit 8a082ac

Browse files
committed
refactor
1 parent 6a37dd9 commit 8a082ac

File tree

1 file changed

+155
-82
lines changed
  • services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler

1 file changed

+155
-82
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 155 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,132 @@ async def _process_completed_tasks(
321321
if task_can_be_cleaned:
322322
await client.release_task_result(job_id)
323323

324+
async def _handle_succesfull_run(
325+
self,
326+
task: CompTaskAtDB,
327+
result: TaskOutputData,
328+
log_error_context: dict[str, Any],
329+
) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]:
330+
assert task.job_id # nosec
331+
try:
332+
await parse_output_data(
333+
self.db_engine,
334+
task.job_id,
335+
result,
336+
)
337+
return RunningState.SUCCESS, SimcorePlatformStatus.OK, [], True
338+
except PortsValidationError as err:
339+
_logger.exception(
340+
**create_troubleshootting_log_kwargs(
341+
"Unexpected error while parsing output data, comp_tasks/comp_pipeline is not in sync with what was started",
342+
error=err,
343+
error_context=log_error_context,
344+
)
345+
)
346+
# NOTE: simcore platform state is still OK as the task ran fine, the issue is likely due to the service labels
347+
return RunningState.FAILED, SimcorePlatformStatus.OK, err.get_errors(), True
348+
349+
async def _handle_computational_retrieval_error(
350+
self,
351+
task: CompTaskAtDB,
352+
user_id: UserID,
353+
result: ComputationalBackendTaskResultsNotReadyError,
354+
log_error_context: dict[str, Any],
355+
) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]:
356+
assert task.job_id # nosec
357+
_logger.warning(
358+
**create_troubleshootting_log_kwargs(
359+
f"Retrieval of task {task.job_id} result timed-out",
360+
error=result,
361+
error_context=log_error_context,
362+
tip="This can happen if the computational backend is overloaded with requests. It will be automatically retried again.",
363+
)
364+
)
365+
task_errors: list[ErrorDict] = []
366+
if task.errors:
367+
for error in task.errors:
368+
if error["type"] == _TASK_RETRIEVAL_ERROR_TYPE:
369+
# already had a timeout error, let's keep it
370+
task_errors.append(error)
371+
break
372+
if not task_errors:
373+
# first time we have this error
374+
task_errors.append(
375+
ErrorDict(
376+
loc=(f"{task.project_id}", f"{task.node_id}"),
377+
msg=f"{result}",
378+
type=_TASK_RETRIEVAL_ERROR_TYPE,
379+
ctx={
380+
_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: f"{arrow.utcnow()}",
381+
"user_id": user_id,
382+
"project_id": f"{task.project_id}",
383+
"node_id": f"{task.node_id}",
384+
"job_id": task.job_id,
385+
},
386+
)
387+
)
388+
# state is kept as STARTED so it will be retried
389+
return RunningState.STARTED, SimcorePlatformStatus.BAD, task_errors, False
390+
391+
async def _handle_computational_backend_not_connected_error(
392+
self,
393+
task: CompTaskAtDB,
394+
result: ComputationalBackendNotConnectedError,
395+
log_error_context: dict[str, Any],
396+
) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]:
397+
assert task.job_id # nosec
398+
_logger.warning(
399+
**create_troubleshootting_log_kwargs(
400+
f"Computational backend disconnected when retrieving task {task.job_id} result",
401+
error=result,
402+
error_context=log_error_context,
403+
tip="This can happen if the computational backend is temporarily disconnected. It will be automatically retried again.",
404+
)
405+
)
406+
# NOTE: the task will be set to UNKNOWN on the next processing loop
407+
408+
# state is kept as STARTED so it will be retried
409+
return RunningState.STARTED, SimcorePlatformStatus.BAD, [], False
410+
411+
async def _handle_task_error(
412+
self,
413+
task: CompTaskAtDB,
414+
result: BaseException,
415+
log_error_context: dict[str, Any],
416+
) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]:
417+
assert task.job_id # nosec
418+
419+
# the task itself failed, check why
420+
if isinstance(result, TaskCancelledError):
421+
_logger.info(
422+
**create_troubleshootting_log_kwargs(
423+
f"Task {task.job_id} was cancelled",
424+
error=result,
425+
error_context=log_error_context,
426+
)
427+
)
428+
return RunningState.ABORTED, SimcorePlatformStatus.OK, [], True
429+
430+
_logger.info(
431+
**create_troubleshootting_log_kwargs(
432+
f"Task {task.job_id} completed with errors",
433+
error=result,
434+
error_context=log_error_context,
435+
)
436+
)
437+
return (
438+
RunningState.FAILED,
439+
SimcorePlatformStatus.OK,
440+
[
441+
ErrorDict(
442+
loc=(f"{task.project_id}", f"{task.node_id}"),
443+
msg=f"{result}",
444+
type="runtime",
445+
)
446+
],
447+
True,
448+
)
449+
324450
async def _process_task_result(
325451
self,
326452
task: CompTaskAtDB,
@@ -355,92 +481,39 @@ async def _process_task_result(
355481
}
356482

357483
if isinstance(result, TaskOutputData):
358-
# That means the task successfully completed
359-
try:
360-
await parse_output_data(
361-
self.db_engine,
362-
task.job_id,
363-
result,
364-
)
365-
task_final_state = RunningState.SUCCESS
366-
except PortsValidationError as err:
367-
_logger.exception(
368-
**create_troubleshootting_log_kwargs(
369-
"Unexpected error while parsing output data, comp_tasks/comp_pipeline is not in sync with what was started",
370-
error=err,
371-
error_context=log_error_context,
372-
)
373-
)
374-
task_errors.extend(err.get_errors())
375-
task_final_state = RunningState.FAILED
376-
# NOTE: simcore platform state is still OK as the task ran fine, the issue is likely due to the service labels
484+
(
485+
task_final_state,
486+
simcore_platform_status,
487+
task_errors,
488+
task_completed,
489+
) = await self._handle_succesfull_run(task, result, log_error_context)
490+
377491
elif isinstance(result, ComputationalBackendTaskResultsNotReadyError):
378-
# Task result retrieval failed due to communication error, task will be retried
379-
# so we keep it as is
380-
_logger.warning(
381-
**create_troubleshootting_log_kwargs(
382-
f"Retrieval of task {task.job_id} result timed-out",
383-
error=result,
384-
error_context=log_error_context,
385-
tip="This can happen if the computational backend is overloaded with requests. It will be automatically retried again.",
386-
)
492+
(
493+
task_final_state,
494+
simcore_platform_status,
495+
task_errors,
496+
task_completed,
497+
) = await self._handle_computational_retrieval_error(
498+
task, user_id, result, log_error_context
499+
)
500+
elif isinstance(result, ComputationalBackendNotConnectedError):
501+
(
502+
task_final_state,
503+
simcore_platform_status,
504+
task_errors,
505+
task_completed,
506+
) = await self._handle_computational_backend_not_connected_error(
507+
task, result, log_error_context
387508
)
388-
389-
if task.errors:
390-
for error in task.errors:
391-
if error["type"] == _TASK_RETRIEVAL_ERROR_TYPE:
392-
# already had a timeout error, let's keep it
393-
task_errors.append(error)
394-
break
395-
if not task_errors:
396-
# first time we have this error
397-
task_errors.append(
398-
ErrorDict(
399-
loc=(f"{task.project_id}", f"{task.node_id}"),
400-
msg=f"{result}",
401-
type=_TASK_RETRIEVAL_ERROR_TYPE,
402-
ctx={
403-
_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: f"{arrow.utcnow()}",
404-
"user_id": user_id,
405-
"project_id": f"{project_id}",
406-
"node_id": f"{node_id}",
407-
"job_id": task.job_id,
408-
},
409-
)
410-
)
411-
412-
task_completed = False
413509
else:
414-
# the task itself failed, check why
415-
if isinstance(result, TaskCancelledError):
416-
_logger.info(
417-
**create_troubleshootting_log_kwargs(
418-
f"Task {task.job_id} was cancelled",
419-
error=result,
420-
error_context=log_error_context,
421-
)
422-
)
423-
task_final_state = RunningState.ABORTED
424-
425-
else:
426-
_logger.info(
427-
**create_troubleshootting_log_kwargs(
428-
f"Task {task.job_id} completed with errors",
429-
error=result,
430-
error_context=log_error_context,
431-
)
432-
)
433-
task_final_state = RunningState.FAILED
434-
task_errors.append(
435-
ErrorDict(
436-
loc=(f"{task.project_id}", f"{task.node_id}"),
437-
msg=f"{result}",
438-
type="runtime",
439-
)
440-
)
510+
(
511+
task_final_state,
512+
simcore_platform_status,
513+
task_errors,
514+
task_completed,
515+
) = await self._handle_task_error(task, result, log_error_context)
441516

442-
if isinstance(result, ComputationalBackendNotConnectedError):
443-
simcore_platform_status = SimcorePlatformStatus.BAD
444517
# we need to remove any invalid files in the storage
445518
await clean_task_output_and_log_files_if_invalid(
446519
self.db_engine, user_id, project_id, node_id

0 commit comments

Comments
 (0)