diff --git a/packages/service-library/src/servicelib/long_running_tasks/models.py b/packages/service-library/src/servicelib/long_running_tasks/models.py index 193c5eadbde..1366ba23039 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/models.py +++ b/packages/service-library/src/servicelib/long_running_tasks/models.py @@ -71,6 +71,16 @@ class TaskData(BaseModel): ), ] = None + detected_as_done_at: Annotated[ + datetime | None, + Field( + description=( + "used to remove the task when it's first detected as done " + "if a task was started as fire_and_forget=True" + ) + ), + ] = None + is_done: Annotated[ bool, Field(description="True when the task finished running with or without errors"), diff --git a/packages/service-library/src/servicelib/long_running_tasks/task.py b/packages/service-library/src/servicelib/long_running_tasks/task.py index adb76dc0699..fe14553a066 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/task.py @@ -113,7 +113,33 @@ async def _get_tasks_to_remove( for tracked_task in await tracked_tasks.list_tasks_data(): if tracked_task.fire_and_forget: - continue + # fire and forget tasks also need to be remove from tracking + # when detectes as done, start counting how much time has elapsed + # if over stale_task_detect_timeout_s remove the task + + # wait for task to complete + if not tracked_task.is_done: + continue + + # mark detected as done + if tracked_task.detected_as_done_at is None: + await tracked_tasks.update_task_data( + tracked_task.task_id, + updates={ + "detected_as_done_at": datetime.datetime.now(tz=datetime.UTC) + }, + ) + continue + + # if enough time passes remove the task + elapsed_since_done = ( + utc_now - tracked_task.detected_as_done_at + ).total_seconds() + if elapsed_since_done > stale_task_detect_timeout_s: + tasks_to_remove.append( + (tracked_task.task_id, tracked_task.task_context) + ) + continue if tracked_task.last_status_check is None: # the task just added or never received a poll request @@ -317,6 +343,7 @@ async def _tasks_monitor(self) -> None: # noqa: C901 """ self._started_event_task_tasks_monitor.set() task_id: TaskId + for task_id in set(self._created_tasks.keys()): if task := self._created_tasks.get(task_id, None): is_done = task.done() diff --git a/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py index 0808878818a..fd99c9b2956 100644 --- a/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py +++ b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py @@ -199,7 +199,7 @@ def _get_resutlt(result_field: ResultField) -> Any: return loads(result_field.str_result) -async def test_fire_and_forget_task_is_not_auto_removed( +async def test_fire_and_forget_task_is_not_auto_removed_while_running( long_running_manager: LongRunningManager, empty_context: TaskContext ): task_id = await lrt_api.start_task( @@ -211,19 +211,32 @@ async def test_fire_and_forget_task_is_not_auto_removed( fire_and_forget=True, task_context=empty_context, ) + await asyncio.sleep(3 * TEST_CHECK_STALE_INTERVAL_S) # the task shall still be present even if we did not check the status before status = await long_running_manager.tasks_manager.get_task_status( task_id, with_task_context=empty_context ) assert not status.done, "task was removed although it is fire and forget" - # the task shall finish - await asyncio.sleep(4 * TEST_CHECK_STALE_INTERVAL_S) - # get the result - task_result = await long_running_manager.tasks_manager.get_task_result( - task_id, with_task_context=empty_context - ) - assert _get_resutlt(task_result) == 42 + + async for attempt in AsyncRetrying(**_RETRY_PARAMS): + with attempt: + try: + await long_running_manager.tasks_manager.get_task_status( + task_id, with_task_context=empty_context + ) + raise TryAgain + except TaskNotFoundError: + pass + + with pytest.raises(TaskNotFoundError): + await long_running_manager.tasks_manager.get_task_status( + task_id, with_task_context=empty_context + ) + with pytest.raises(TaskNotFoundError): + await long_running_manager.tasks_manager.get_task_result( + task_id, with_task_context=empty_context + ) async def test_get_result_of_unfinished_task_raises( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py index 3096418ebec..c9236448256 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py @@ -13,6 +13,7 @@ from models_library.rabbitmq_messages import InstrumentationRabbitMessage from models_library.rpc.webserver.auth.api_keys import generate_unique_api_key from models_library.service_settings_labels import SimcoreServiceLabels +from models_library.services_types import ServiceRunID from models_library.shared_user_preferences import ( AllowMetricsCollectionFrontendUserPreference, ) @@ -287,17 +288,19 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( scheduler_data.node_uuid ) - await _cleanup_long_running_tasks(app, scheduler_data.node_uuid) + await _cleanup_long_running_tasks(app, scheduler_data.run_id) await task_progress.update( message="finished removing resources", percent=ProgressPercent(1) ) -async def _cleanup_long_running_tasks(app: FastAPI, node_id: NodeID) -> None: +async def _cleanup_long_running_tasks( + app: FastAPI, service_run_id: ServiceRunID +) -> None: long_running_client_helper = get_long_running_client_helper(app) - sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{node_id}" + sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{service_run_id}" await long_running_client_helper.cleanup(sidecar_namespace)