@@ -2273,3 +2273,46 @@ async def test_run_new_pipeline_called_twice_prevents_duplicate_runs(
22732273 0 , # No new messages expected
22742274 ComputationalPipelineStatusMessage .model_validate_json ,
22752275 )
2276+
2277+
2278+ @pytest .mark .parametrize (
2279+ "exception_type" ,
2280+ [
2281+ ComputationalBackendTaskResultsNotReadyError ,
2282+ ],
2283+ )
2284+ async def test_getting_task_result_raises_exception (
2285+ exception_type : Exception ,
2286+ with_disabled_auto_scheduling : mock .Mock ,
2287+ with_disabled_scheduler_publisher : mock .Mock ,
2288+ mocked_dask_client : mock .MagicMock ,
2289+ initialized_app : FastAPI ,
2290+ scheduler_api : BaseCompScheduler ,
2291+ sqlalchemy_async_engine : AsyncEngine ,
2292+ running_project : RunningProject ,
2293+ mocked_parse_output_data_fct : mock .Mock ,
2294+ ):
2295+ # this tests the behavior of the scheduling when the dask client cannot retrieve
2296+ # the result of a task because of some communication error. In this case the task
2297+ # it should be retrieved again in the next iteration and not marked as failed
2298+ # immediately.
2299+ async def mocked_get_tasks_status (job_ids : list [str ]) -> list [RunningState ]:
2300+ return [RunningState .SUCCESS for j in job_ids ]
2301+
2302+ mocked_dask_client .get_tasks_status .side_effect = mocked_get_tasks_status
2303+ mocked_dask_client .get_task_result .side_effect = exception_type
2304+
2305+ # calling apply should not raise, but log the error
2306+ assert running_project .project .prj_owner
2307+ await scheduler_api .apply (
2308+ user_id = running_project .project .prj_owner ,
2309+ project_id = running_project .project .uuid ,
2310+ iteration = 1 ,
2311+ )
2312+
2313+ assert running_project .project .prj_owner
2314+ await scheduler_api .apply (
2315+ user_id = running_project .project .prj_owner ,
2316+ project_id = running_project .project .uuid ,
2317+ iteration = 1 ,
2318+ )
0 commit comments