@@ -321,6 +321,132 @@ async def _process_completed_tasks(
321321 if task_can_be_cleaned :
322322 await client .release_task_result (job_id )
323323
324+ async def _handle_succesfull_run (
325+ self ,
326+ task : CompTaskAtDB ,
327+ result : TaskOutputData ,
328+ log_error_context : dict [str , Any ],
329+ ) -> tuple [RunningState , SimcorePlatformStatus , list [ErrorDict ], bool ]:
330+ assert task .job_id # nosec
331+ try :
332+ await parse_output_data (
333+ self .db_engine ,
334+ task .job_id ,
335+ result ,
336+ )
337+ return RunningState .SUCCESS , SimcorePlatformStatus .OK , [], True
338+ except PortsValidationError as err :
339+ _logger .exception (
340+ ** create_troubleshootting_log_kwargs (
341+ "Unexpected error while parsing output data, comp_tasks/comp_pipeline is not in sync with what was started" ,
342+ error = err ,
343+ error_context = log_error_context ,
344+ )
345+ )
346+ # NOTE: simcore platform state is still OK as the task ran fine, the issue is likely due to the service labels
347+ return RunningState .FAILED , SimcorePlatformStatus .OK , err .get_errors (), True
348+
349+ async def _handle_computational_retrieval_error (
350+ self ,
351+ task : CompTaskAtDB ,
352+ user_id : UserID ,
353+ result : ComputationalBackendTaskResultsNotReadyError ,
354+ log_error_context : dict [str , Any ],
355+ ) -> tuple [RunningState , SimcorePlatformStatus , list [ErrorDict ], bool ]:
356+ assert task .job_id # nosec
357+ _logger .warning (
358+ ** create_troubleshootting_log_kwargs (
359+ f"Retrieval of task { task .job_id } result timed-out" ,
360+ error = result ,
361+ error_context = log_error_context ,
362+ tip = "This can happen if the computational backend is overloaded with requests. It will be automatically retried again." ,
363+ )
364+ )
365+ task_errors : list [ErrorDict ] = []
366+ if task .errors :
367+ for error in task .errors :
368+ if error ["type" ] == _TASK_RETRIEVAL_ERROR_TYPE :
369+ # already had a timeout error, let's keep it
370+ task_errors .append (error )
371+ break
372+ if not task_errors :
373+ # first time we have this error
374+ task_errors .append (
375+ ErrorDict (
376+ loc = (f"{ task .project_id } " , f"{ task .node_id } " ),
377+ msg = f"{ result } " ,
378+ type = _TASK_RETRIEVAL_ERROR_TYPE ,
379+ ctx = {
380+ _TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY : f"{ arrow .utcnow ()} " ,
381+ "user_id" : user_id ,
382+ "project_id" : f"{ task .project_id } " ,
383+ "node_id" : f"{ task .node_id } " ,
384+ "job_id" : task .job_id ,
385+ },
386+ )
387+ )
388+ # state is kept as STARTED so it will be retried
389+ return RunningState .STARTED , SimcorePlatformStatus .BAD , task_errors , False
390+
391+ async def _handle_computational_backend_not_connected_error (
392+ self ,
393+ task : CompTaskAtDB ,
394+ result : ComputationalBackendNotConnectedError ,
395+ log_error_context : dict [str , Any ],
396+ ) -> tuple [RunningState , SimcorePlatformStatus , list [ErrorDict ], bool ]:
397+ assert task .job_id # nosec
398+ _logger .warning (
399+ ** create_troubleshootting_log_kwargs (
400+ f"Computational backend disconnected when retrieving task { task .job_id } result" ,
401+ error = result ,
402+ error_context = log_error_context ,
403+ tip = "This can happen if the computational backend is temporarily disconnected. It will be automatically retried again." ,
404+ )
405+ )
406+ # NOTE: the task will be set to UNKNOWN on the next processing loop
407+
408+ # state is kept as STARTED so it will be retried
409+ return RunningState .STARTED , SimcorePlatformStatus .BAD , [], False
410+
411+ async def _handle_task_error (
412+ self ,
413+ task : CompTaskAtDB ,
414+ result : BaseException ,
415+ log_error_context : dict [str , Any ],
416+ ) -> tuple [RunningState , SimcorePlatformStatus , list [ErrorDict ], bool ]:
417+ assert task .job_id # nosec
418+
419+ # the task itself failed, check why
420+ if isinstance (result , TaskCancelledError ):
421+ _logger .info (
422+ ** create_troubleshootting_log_kwargs (
423+ f"Task { task .job_id } was cancelled" ,
424+ error = result ,
425+ error_context = log_error_context ,
426+ )
427+ )
428+ return RunningState .ABORTED , SimcorePlatformStatus .OK , [], True
429+
430+ _logger .info (
431+ ** create_troubleshootting_log_kwargs (
432+ f"Task { task .job_id } completed with errors" ,
433+ error = result ,
434+ error_context = log_error_context ,
435+ )
436+ )
437+ return (
438+ RunningState .FAILED ,
439+ SimcorePlatformStatus .OK ,
440+ [
441+ ErrorDict (
442+ loc = (f"{ task .project_id } " , f"{ task .node_id } " ),
443+ msg = f"{ result } " ,
444+ type = "runtime" ,
445+ )
446+ ],
447+ True ,
448+ )
449+
324450 async def _process_task_result (
325451 self ,
326452 task : CompTaskAtDB ,
@@ -355,92 +481,39 @@ async def _process_task_result(
355481 }
356482
357483 if isinstance (result , TaskOutputData ):
358- # That means the task successfully completed
359- try :
360- await parse_output_data (
361- self .db_engine ,
362- task .job_id ,
363- result ,
364- )
365- task_final_state = RunningState .SUCCESS
366- except PortsValidationError as err :
367- _logger .exception (
368- ** create_troubleshootting_log_kwargs (
369- "Unexpected error while parsing output data, comp_tasks/comp_pipeline is not in sync with what was started" ,
370- error = err ,
371- error_context = log_error_context ,
372- )
373- )
374- task_errors .extend (err .get_errors ())
375- task_final_state = RunningState .FAILED
376- # NOTE: simcore platform state is still OK as the task ran fine, the issue is likely due to the service labels
484+ (
485+ task_final_state ,
486+ simcore_platform_status ,
487+ task_errors ,
488+ task_completed ,
489+ ) = await self ._handle_succesfull_run (task , result , log_error_context )
490+
377491 elif isinstance (result , ComputationalBackendTaskResultsNotReadyError ):
378- # Task result retrieval failed due to communication error, task will be retried
379- # so we keep it as is
380- _logger .warning (
381- ** create_troubleshootting_log_kwargs (
382- f"Retrieval of task { task .job_id } result timed-out" ,
383- error = result ,
384- error_context = log_error_context ,
385- tip = "This can happen if the computational backend is overloaded with requests. It will be automatically retried again." ,
386- )
492+ (
493+ task_final_state ,
494+ simcore_platform_status ,
495+ task_errors ,
496+ task_completed ,
497+ ) = await self ._handle_computational_retrieval_error (
498+ task , user_id , result , log_error_context
499+ )
500+ elif isinstance (result , ComputationalBackendNotConnectedError ):
501+ (
502+ task_final_state ,
503+ simcore_platform_status ,
504+ task_errors ,
505+ task_completed ,
506+ ) = await self ._handle_computational_backend_not_connected_error (
507+ task , result , log_error_context
387508 )
388-
389- if task .errors :
390- for error in task .errors :
391- if error ["type" ] == _TASK_RETRIEVAL_ERROR_TYPE :
392- # already had a timeout error, let's keep it
393- task_errors .append (error )
394- break
395- if not task_errors :
396- # first time we have this error
397- task_errors .append (
398- ErrorDict (
399- loc = (f"{ task .project_id } " , f"{ task .node_id } " ),
400- msg = f"{ result } " ,
401- type = _TASK_RETRIEVAL_ERROR_TYPE ,
402- ctx = {
403- _TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY : f"{ arrow .utcnow ()} " ,
404- "user_id" : user_id ,
405- "project_id" : f"{ project_id } " ,
406- "node_id" : f"{ node_id } " ,
407- "job_id" : task .job_id ,
408- },
409- )
410- )
411-
412- task_completed = False
413509 else :
414- # the task itself failed, check why
415- if isinstance (result , TaskCancelledError ):
416- _logger .info (
417- ** create_troubleshootting_log_kwargs (
418- f"Task { task .job_id } was cancelled" ,
419- error = result ,
420- error_context = log_error_context ,
421- )
422- )
423- task_final_state = RunningState .ABORTED
424-
425- else :
426- _logger .info (
427- ** create_troubleshootting_log_kwargs (
428- f"Task { task .job_id } completed with errors" ,
429- error = result ,
430- error_context = log_error_context ,
431- )
432- )
433- task_final_state = RunningState .FAILED
434- task_errors .append (
435- ErrorDict (
436- loc = (f"{ task .project_id } " , f"{ task .node_id } " ),
437- msg = f"{ result } " ,
438- type = "runtime" ,
439- )
440- )
510+ (
511+ task_final_state ,
512+ simcore_platform_status ,
513+ task_errors ,
514+ task_completed ,
515+ ) = await self ._handle_task_error (task , result , log_error_context )
441516
442- if isinstance (result , ComputationalBackendNotConnectedError ):
443- simcore_platform_status = SimcorePlatformStatus .BAD
444517 # we need to remove any invalid files in the storage
445518 await clean_task_output_and_log_files_if_invalid (
446519 self .db_engine , user_id , project_id , node_id
0 commit comments