|
27 | 27 | assert_comp_runs_empty, |
28 | 28 | assert_comp_tasks_and_comp_run_snapshot_tasks, |
29 | 29 | ) |
30 | | -from dask_task_models_library.container_tasks.errors import TaskCancelledError |
| 30 | +from dask_task_models_library.container_tasks.errors import ( |
| 31 | + ServiceRuntimeError, |
| 32 | + TaskCancelledError, |
| 33 | +) |
31 | 34 | from dask_task_models_library.container_tasks.events import TaskProgressEvent |
32 | 35 | from dask_task_models_library.container_tasks.io import TaskOutputData |
33 | 36 | from dask_task_models_library.container_tasks.protocol import TaskOwner |
@@ -838,7 +841,13 @@ async def _return_2nd_task_failed(job_ids: list[str]) -> list[RunningState]: |
838 | 841 | ] |
839 | 842 |
|
840 | 843 | mocked_dask_client.get_tasks_status.side_effect = _return_2nd_task_failed |
841 | | - mocked_dask_client.get_task_result.side_effect = None |
| 844 | + mocked_dask_client.get_task_result.side_effect = ServiceRuntimeError( |
| 845 | + service_key="simcore/services/dynamic/some-service", |
| 846 | + service_version="1.0.0", |
| 847 | + container_id="some-container-id", |
| 848 | + exit_code=1, |
| 849 | + service_logs="simulated error", |
| 850 | + ) |
842 | 851 | await scheduler_api.apply( |
843 | 852 | user_id=run_in_db.user_id, |
844 | 853 | project_id=run_in_db.project_uuid, |
@@ -2301,18 +2310,17 @@ async def mocked_get_tasks_status(job_ids: list[str]) -> list[RunningState]: |
2301 | 2310 | computational_tasks = [ |
2302 | 2311 | t for t in running_project.tasks if t.node_class is NodeClass.COMPUTATIONAL |
2303 | 2312 | ] |
2304 | | - expected_timeouted_tasks = random.choices( # noqa: S311 |
| 2313 | + expected_timeouted_tasks = random.sample( |
2305 | 2314 | computational_tasks, k=len(computational_tasks) - 1 |
2306 | 2315 | ) |
2307 | 2316 | successful_tasks = [ |
2308 | 2317 | t for t in computational_tasks if t not in expected_timeouted_tasks |
2309 | 2318 | ] |
2310 | 2319 |
|
2311 | 2320 | async def mocked_get_task_result(job_id: str) -> TaskOutputData: |
2312 | | - nonlocal expected_timeouted_tasks |
2313 | | - if job_id in [t.job_id for t in expected_timeouted_tasks]: |
2314 | | - raise ComputationalBackendTaskResultsNotReadyError(job_id=job_id) |
2315 | | - return TaskOutputData.model_validate({"whatever_output": 123}) |
| 2321 | + if job_id in [t.job_id for t in successful_tasks]: |
| 2322 | + return TaskOutputData.model_validate({"whatever_output": 123}) |
| 2323 | + raise ComputationalBackendTaskResultsNotReadyError(job_id=job_id) |
2316 | 2324 |
|
2317 | 2325 | mocked_dask_client.get_task_result.side_effect = mocked_get_task_result |
2318 | 2326 | # calling apply should not raise |
@@ -2346,6 +2354,7 @@ async def mocked_get_task_result(job_id: str) -> TaskOutputData: |
2346 | 2354 | retrieval_times.append( |
2347 | 2355 | error_dict["ctx"][_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY] |
2348 | 2356 | ) |
| 2357 | + assert len(retrieval_times) == len(expected_timeouted_tasks) |
2349 | 2358 |
|
2350 | 2359 | await assert_comp_tasks_and_comp_run_snapshot_tasks( |
2351 | 2360 | sqlalchemy_async_engine, |
@@ -2374,14 +2383,7 @@ async def mocked_get_task_result(job_id: str) -> TaskOutputData: |
2374 | 2383 | iteration=1, |
2375 | 2384 | ) |
2376 | 2385 | assert mocked_dask_client.get_task_result.call_count == ( |
2377 | | - len( |
2378 | | - [ |
2379 | | - t |
2380 | | - for t in running_project.tasks |
2381 | | - if t.node_class is NodeClass.COMPUTATIONAL |
2382 | | - ] |
2383 | | - ) |
2384 | | - - 1 |
| 2386 | + len(expected_timeouted_tasks) |
2385 | 2387 | ) |
2386 | 2388 | mocked_dask_client.get_task_result.reset_mock() |
2387 | 2389 | await asyncio.sleep(0.5) # wait a bit to ensure the retry decorator has reset |
|
0 commit comments