From 2946f2735835189f150c2a4340186aa40eb5a066 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 21 Oct 2025 12:45:31 +0200 Subject: [PATCH 1/9] used wrong node_id instead of service_run_id --- .../dynamic_sidecar/scheduler/_core/_events_utils.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py index 3096418ebec..c9236448256 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py @@ -13,6 +13,7 @@ from models_library.rabbitmq_messages import InstrumentationRabbitMessage from models_library.rpc.webserver.auth.api_keys import generate_unique_api_key from models_library.service_settings_labels import SimcoreServiceLabels +from models_library.services_types import ServiceRunID from models_library.shared_user_preferences import ( AllowMetricsCollectionFrontendUserPreference, ) @@ -287,17 +288,19 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( scheduler_data.node_uuid ) - await _cleanup_long_running_tasks(app, scheduler_data.node_uuid) + await _cleanup_long_running_tasks(app, scheduler_data.run_id) await task_progress.update( message="finished removing resources", percent=ProgressPercent(1) ) -async def _cleanup_long_running_tasks(app: FastAPI, node_id: NodeID) -> None: +async def _cleanup_long_running_tasks( + app: FastAPI, service_run_id: ServiceRunID +) -> None: long_running_client_helper = get_long_running_client_helper(app) - sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{node_id}" + sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{service_run_id}" await long_running_client_helper.cleanup(sidecar_namespace) From c08ae0e36911937a4207f4c238b7d3b1f3f8efd5 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 21 Oct 2025 13:16:52 +0200 Subject: [PATCH 2/9] fixed fire and forget removal --- .../servicelib/long_running_tasks/models.py | 10 +++++++ .../src/servicelib/long_running_tasks/task.py | 28 ++++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/packages/service-library/src/servicelib/long_running_tasks/models.py b/packages/service-library/src/servicelib/long_running_tasks/models.py index 193c5eadbde..0958d00099f 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/models.py +++ b/packages/service-library/src/servicelib/long_running_tasks/models.py @@ -71,6 +71,16 @@ class TaskData(BaseModel): ), ] = None + detected_as_done_at: Annotated[ + datetime | None, + Field( + description=( + "used to remove the task when it's first detectted as done " + "if a task was started as fire_and_forget=True" + ) + ), + ] = None + is_done: Annotated[ bool, Field(description="True when the task finished running with or without errors"), diff --git a/packages/service-library/src/servicelib/long_running_tasks/task.py b/packages/service-library/src/servicelib/long_running_tasks/task.py index adb76dc0699..23b9145bc1c 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/task.py @@ -113,7 +113,33 @@ async def _get_tasks_to_remove( for tracked_task in await tracked_tasks.list_tasks_data(): if tracked_task.fire_and_forget: - continue + # fire and forget tasks also need to be remove form tracking + # when detectes ad done, start counting how much time has elapsed + # if over stale_task_detect_timeout_s remove the task + + # wait for task to complete + if not tracked_task.is_done: + continue + + # mark detected as done + if tracked_task.detected_as_done_at is None: + await tracked_tasks.update_task_data( + tracked_task.task_id, + updates={ + "detected_as_done_at": datetime.datetime.now(tz=datetime.UTC) + }, + ) + continue + + # if enough time passes remove the task + elpased_since_done = ( + utc_now - tracked_task.detected_as_done_at + ).total_seconds() + if elpased_since_done > stale_task_detect_timeout_s: + tasks_to_remove.append( + (tracked_task.task_id, tracked_task.task_context) + ) + continue if tracked_task.last_status_check is None: # the task just added or never received a poll request From 2e1e689b098cdfd3de33d504c534e101d9d14a30 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 21 Oct 2025 13:20:23 +0200 Subject: [PATCH 3/9] typos --- .../src/servicelib/long_running_tasks/models.py | 2 +- .../src/servicelib/long_running_tasks/task.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/service-library/src/servicelib/long_running_tasks/models.py b/packages/service-library/src/servicelib/long_running_tasks/models.py index 0958d00099f..1366ba23039 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/models.py +++ b/packages/service-library/src/servicelib/long_running_tasks/models.py @@ -75,7 +75,7 @@ class TaskData(BaseModel): datetime | None, Field( description=( - "used to remove the task when it's first detectted as done " + "used to remove the task when it's first detected as done " "if a task was started as fire_and_forget=True" ) ), diff --git a/packages/service-library/src/servicelib/long_running_tasks/task.py b/packages/service-library/src/servicelib/long_running_tasks/task.py index 23b9145bc1c..757b3a43d94 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/task.py @@ -113,8 +113,8 @@ async def _get_tasks_to_remove( for tracked_task in await tracked_tasks.list_tasks_data(): if tracked_task.fire_and_forget: - # fire and forget tasks also need to be remove form tracking - # when detectes ad done, start counting how much time has elapsed + # fire and forget tasks also need to be remove from tracking + # when detectes as done, start counting how much time has elapsed # if over stale_task_detect_timeout_s remove the task # wait for task to complete @@ -132,10 +132,10 @@ async def _get_tasks_to_remove( continue # if enough time passes remove the task - elpased_since_done = ( + elapsed_since_done = ( utc_now - tracked_task.detected_as_done_at ).total_seconds() - if elpased_since_done > stale_task_detect_timeout_s: + if elapsed_since_done > stale_task_detect_timeout_s: tasks_to_remove.append( (tracked_task.task_id, tracked_task.task_context) ) From 9d96a8d573cbf855bd37a985850cd6c797ac12dc Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 24 Oct 2025 11:18:24 +0200 Subject: [PATCH 4/9] remove fire_and_forget when done --- .../src/servicelib/long_running_tasks/task.py | 8 ++++++ .../test_long_running_tasks_task.py | 28 +++++++++++++------ 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/packages/service-library/src/servicelib/long_running_tasks/task.py b/packages/service-library/src/servicelib/long_running_tasks/task.py index 757b3a43d94..27931ee377b 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/task.py @@ -423,6 +423,14 @@ async def _tasks_monitor(self) -> None: # noqa: C901 updates["result_field"] = result_field await self._tasks_data.update_task_data(task_id, updates=updates) + # always remove fire and forget when done, no need to keep them around + if task_data and task_data.fire_and_forget: + await self.remove_task( + task_data.task_id, + task_data.task_context, + wait_for_removal=False, + ) + async def list_tasks(self, with_task_context: TaskContext | None) -> list[TaskBase]: if not with_task_context: return [ diff --git a/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py index 0808878818a..7e9a3856e70 100644 --- a/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py +++ b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py @@ -199,7 +199,7 @@ def _get_resutlt(result_field: ResultField) -> Any: return loads(result_field.str_result) -async def test_fire_and_forget_task_is_not_auto_removed( +async def test_fire_and_forget_task_is_not_auto_removed_while_running( long_running_manager: LongRunningManager, empty_context: TaskContext ): task_id = await lrt_api.start_task( @@ -217,13 +217,25 @@ async def test_fire_and_forget_task_is_not_auto_removed( task_id, with_task_context=empty_context ) assert not status.done, "task was removed although it is fire and forget" - # the task shall finish - await asyncio.sleep(4 * TEST_CHECK_STALE_INTERVAL_S) - # get the result - task_result = await long_running_manager.tasks_manager.get_task_result( - task_id, with_task_context=empty_context - ) - assert _get_resutlt(task_result) == 42 + + async for attempt in AsyncRetrying(**_RETRY_PARAMS): + with attempt: + try: + await long_running_manager.tasks_manager.get_task_status( + task_id, with_task_context=empty_context + ) + raise TryAgain + except TaskNotFoundError: + pass + + with pytest.raises(TaskNotFoundError): + await long_running_manager.tasks_manager.get_task_status( + task_id, with_task_context=empty_context + ) + with pytest.raises(TaskNotFoundError): + await long_running_manager.tasks_manager.get_task_result( + task_id, with_task_context=empty_context + ) async def test_get_result_of_unfinished_task_raises( From 97fe69092dacdc31dd8c5beddebe3a522bc5fcab Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 24 Oct 2025 11:20:46 +0200 Subject: [PATCH 5/9] remove unrequired any longer --- .../servicelib/long_running_tasks/models.py | 10 ------- .../src/servicelib/long_running_tasks/task.py | 29 ------------------- 2 files changed, 39 deletions(-) diff --git a/packages/service-library/src/servicelib/long_running_tasks/models.py b/packages/service-library/src/servicelib/long_running_tasks/models.py index 1366ba23039..193c5eadbde 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/models.py +++ b/packages/service-library/src/servicelib/long_running_tasks/models.py @@ -71,16 +71,6 @@ class TaskData(BaseModel): ), ] = None - detected_as_done_at: Annotated[ - datetime | None, - Field( - description=( - "used to remove the task when it's first detected as done " - "if a task was started as fire_and_forget=True" - ) - ), - ] = None - is_done: Annotated[ bool, Field(description="True when the task finished running with or without errors"), diff --git a/packages/service-library/src/servicelib/long_running_tasks/task.py b/packages/service-library/src/servicelib/long_running_tasks/task.py index 27931ee377b..be2c00e7482 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/task.py @@ -112,35 +112,6 @@ async def _get_tasks_to_remove( tasks_to_remove: list[tuple[TaskId, TaskContext]] = [] for tracked_task in await tracked_tasks.list_tasks_data(): - if tracked_task.fire_and_forget: - # fire and forget tasks also need to be remove from tracking - # when detectes as done, start counting how much time has elapsed - # if over stale_task_detect_timeout_s remove the task - - # wait for task to complete - if not tracked_task.is_done: - continue - - # mark detected as done - if tracked_task.detected_as_done_at is None: - await tracked_tasks.update_task_data( - tracked_task.task_id, - updates={ - "detected_as_done_at": datetime.datetime.now(tz=datetime.UTC) - }, - ) - continue - - # if enough time passes remove the task - elapsed_since_done = ( - utc_now - tracked_task.detected_as_done_at - ).total_seconds() - if elapsed_since_done > stale_task_detect_timeout_s: - tasks_to_remove.append( - (tracked_task.task_id, tracked_task.task_context) - ) - continue - if tracked_task.last_status_check is None: # the task just added or never received a poll request elapsed_from_start = (utc_now - tracked_task.started).total_seconds() From 2fab109cc27ea5de967ffeeafc8d9cc8c64de42a Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 24 Oct 2025 11:27:49 +0200 Subject: [PATCH 6/9] fixed fire_and_forget removal --- .../servicelib/long_running_tasks/models.py | 10 ------ .../src/servicelib/long_running_tasks/task.py | 31 +------------------ .../test_long_running_tasks_task.py | 6 ---- 3 files changed, 1 insertion(+), 46 deletions(-) diff --git a/packages/service-library/src/servicelib/long_running_tasks/models.py b/packages/service-library/src/servicelib/long_running_tasks/models.py index 1366ba23039..193c5eadbde 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/models.py +++ b/packages/service-library/src/servicelib/long_running_tasks/models.py @@ -71,16 +71,6 @@ class TaskData(BaseModel): ), ] = None - detected_as_done_at: Annotated[ - datetime | None, - Field( - description=( - "used to remove the task when it's first detected as done " - "if a task was started as fire_and_forget=True" - ) - ), - ] = None - is_done: Annotated[ bool, Field(description="True when the task finished running with or without errors"), diff --git a/packages/service-library/src/servicelib/long_running_tasks/task.py b/packages/service-library/src/servicelib/long_running_tasks/task.py index 27931ee377b..2c0b1675f27 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/task.py @@ -112,35 +112,6 @@ async def _get_tasks_to_remove( tasks_to_remove: list[tuple[TaskId, TaskContext]] = [] for tracked_task in await tracked_tasks.list_tasks_data(): - if tracked_task.fire_and_forget: - # fire and forget tasks also need to be remove from tracking - # when detectes as done, start counting how much time has elapsed - # if over stale_task_detect_timeout_s remove the task - - # wait for task to complete - if not tracked_task.is_done: - continue - - # mark detected as done - if tracked_task.detected_as_done_at is None: - await tracked_tasks.update_task_data( - tracked_task.task_id, - updates={ - "detected_as_done_at": datetime.datetime.now(tz=datetime.UTC) - }, - ) - continue - - # if enough time passes remove the task - elapsed_since_done = ( - utc_now - tracked_task.detected_as_done_at - ).total_seconds() - if elapsed_since_done > stale_task_detect_timeout_s: - tasks_to_remove.append( - (tracked_task.task_id, tracked_task.task_context) - ) - continue - if tracked_task.last_status_check is None: # the task just added or never received a poll request elapsed_from_start = (utc_now - tracked_task.started).total_seconds() @@ -424,7 +395,7 @@ async def _tasks_monitor(self) -> None: # noqa: C901 await self._tasks_data.update_task_data(task_id, updates=updates) # always remove fire and forget when done, no need to keep them around - if task_data and task_data.fire_and_forget: + if task_data.fire_and_forget: await self.remove_task( task_data.task_id, task_data.task_context, diff --git a/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py index 7e9a3856e70..5b2f0b30333 100644 --- a/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py +++ b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py @@ -211,12 +211,6 @@ async def test_fire_and_forget_task_is_not_auto_removed_while_running( fire_and_forget=True, task_context=empty_context, ) - await asyncio.sleep(3 * TEST_CHECK_STALE_INTERVAL_S) - # the task shall still be present even if we did not check the status before - status = await long_running_manager.tasks_manager.get_task_status( - task_id, with_task_context=empty_context - ) - assert not status.done, "task was removed although it is fire and forget" async for attempt in AsyncRetrying(**_RETRY_PARAMS): with attempt: From 979e55ab4525426437eace0932342f679721d48a Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 24 Oct 2025 12:17:49 +0200 Subject: [PATCH 7/9] fixed issue --- .../src/servicelib/long_running_tasks/task.py | 25 ++++++++++++------- .../test_long_running_tasks_task.py | 7 ++++++ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/packages/service-library/src/servicelib/long_running_tasks/task.py b/packages/service-library/src/servicelib/long_running_tasks/task.py index 2c0b1675f27..953349a6e6b 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/task.py @@ -112,6 +112,10 @@ async def _get_tasks_to_remove( tasks_to_remove: list[tuple[TaskId, TaskContext]] = [] for tracked_task in await tracked_tasks.list_tasks_data(): + if tracked_task.fire_and_forget: + # fire and forget tasks are not considered stale + continue + if tracked_task.last_status_check is None: # the task just added or never received a poll request elapsed_from_start = (utc_now - tracked_task.started).total_seconds() @@ -314,6 +318,8 @@ async def _tasks_monitor(self) -> None: # noqa: C901 """ self._started_event_task_tasks_monitor.set() task_id: TaskId + to_remove: list[tuple[TaskId, TaskContext]] = [] + for task_id in set(self._created_tasks.keys()): if task := self._created_tasks.get(task_id, None): is_done = task.done() @@ -323,14 +329,19 @@ async def _tasks_monitor(self) -> None: # noqa: C901 # write to redis only when done task_data = await self._tasks_data.get_task_data(task_id) - if task_data is None or task_data.is_done: - # already done and updatet data in redis + if task_data is None: + continue + + # already done and updatet data in redis + if task_data.is_done: continue result_field: ResultField | None = None # get task result try: result_field = ResultField(str_result=dumps(task.result())) + if task_data.fire_and_forget: + to_remove.append((task_data.task_id, task_data.task_context)) except asyncio.InvalidStateError: # task was not completed try again next time and see if it is done continue @@ -394,13 +405,9 @@ async def _tasks_monitor(self) -> None: # noqa: C901 updates["result_field"] = result_field await self._tasks_data.update_task_data(task_id, updates=updates) - # always remove fire and forget when done, no need to keep them around - if task_data.fire_and_forget: - await self.remove_task( - task_data.task_id, - task_data.task_context, - wait_for_removal=False, - ) + # always remove fire and forget when done, no need to keep them around + for task_id, task_context in to_remove: + await self.remove_task(task_id, task_context, wait_for_removal=False) async def list_tasks(self, with_task_context: TaskContext | None) -> list[TaskBase]: if not with_task_context: diff --git a/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py index 5b2f0b30333..fd99c9b2956 100644 --- a/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py +++ b/packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py @@ -212,6 +212,13 @@ async def test_fire_and_forget_task_is_not_auto_removed_while_running( task_context=empty_context, ) + await asyncio.sleep(3 * TEST_CHECK_STALE_INTERVAL_S) + # the task shall still be present even if we did not check the status before + status = await long_running_manager.tasks_manager.get_task_status( + task_id, with_task_context=empty_context + ) + assert not status.done, "task was removed although it is fire and forget" + async for attempt in AsyncRetrying(**_RETRY_PARAMS): with attempt: try: From 7fae11af8a3f58abde4e7a8bf9eeb07f470c6247 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 24 Oct 2025 12:53:43 +0200 Subject: [PATCH 8/9] pylint mypy --- .../service-library/src/servicelib/long_running_tasks/task.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/service-library/src/servicelib/long_running_tasks/task.py b/packages/service-library/src/servicelib/long_running_tasks/task.py index 953349a6e6b..4ae65ef2481 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/task.py @@ -311,7 +311,9 @@ async def _cancelled_tasks_removal(self) -> None: limit=_PARALLEL_TASKS_CANCELLATION, ) - async def _tasks_monitor(self) -> None: # noqa: C901 + async def _tasks_monitor( # pylint:disable=too-many-branches # noqa: C901, PLR0912 + self, + ) -> None: """ A task which monitors locally running tasks and updates their status in the Redis store when they are done. From 15376c913bd9924a19b764ce365000f58c75a0d6 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 24 Oct 2025 16:45:56 +0200 Subject: [PATCH 9/9] revert to correct implementation --- .../servicelib/long_running_tasks/models.py | 10 ++++ .../src/servicelib/long_running_tasks/task.py | 47 ++++++++++++------- 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/packages/service-library/src/servicelib/long_running_tasks/models.py b/packages/service-library/src/servicelib/long_running_tasks/models.py index 193c5eadbde..1366ba23039 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/models.py +++ b/packages/service-library/src/servicelib/long_running_tasks/models.py @@ -71,6 +71,16 @@ class TaskData(BaseModel): ), ] = None + detected_as_done_at: Annotated[ + datetime | None, + Field( + description=( + "used to remove the task when it's first detected as done " + "if a task was started as fire_and_forget=True" + ) + ), + ] = None + is_done: Annotated[ bool, Field(description="True when the task finished running with or without errors"), diff --git a/packages/service-library/src/servicelib/long_running_tasks/task.py b/packages/service-library/src/servicelib/long_running_tasks/task.py index 4ae65ef2481..fe14553a066 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/task.py @@ -113,8 +113,33 @@ async def _get_tasks_to_remove( for tracked_task in await tracked_tasks.list_tasks_data(): if tracked_task.fire_and_forget: - # fire and forget tasks are not considered stale - continue + # fire and forget tasks also need to be remove from tracking + # when detectes as done, start counting how much time has elapsed + # if over stale_task_detect_timeout_s remove the task + + # wait for task to complete + if not tracked_task.is_done: + continue + + # mark detected as done + if tracked_task.detected_as_done_at is None: + await tracked_tasks.update_task_data( + tracked_task.task_id, + updates={ + "detected_as_done_at": datetime.datetime.now(tz=datetime.UTC) + }, + ) + continue + + # if enough time passes remove the task + elapsed_since_done = ( + utc_now - tracked_task.detected_as_done_at + ).total_seconds() + if elapsed_since_done > stale_task_detect_timeout_s: + tasks_to_remove.append( + (tracked_task.task_id, tracked_task.task_context) + ) + continue if tracked_task.last_status_check is None: # the task just added or never received a poll request @@ -311,16 +336,13 @@ async def _cancelled_tasks_removal(self) -> None: limit=_PARALLEL_TASKS_CANCELLATION, ) - async def _tasks_monitor( # pylint:disable=too-many-branches # noqa: C901, PLR0912 - self, - ) -> None: + async def _tasks_monitor(self) -> None: # noqa: C901 """ A task which monitors locally running tasks and updates their status in the Redis store when they are done. """ self._started_event_task_tasks_monitor.set() task_id: TaskId - to_remove: list[tuple[TaskId, TaskContext]] = [] for task_id in set(self._created_tasks.keys()): if task := self._created_tasks.get(task_id, None): @@ -331,19 +353,14 @@ async def _tasks_monitor( # pylint:disable=too-many-branches # noqa: C901, PLR0 # write to redis only when done task_data = await self._tasks_data.get_task_data(task_id) - if task_data is None: - continue - - # already done and updatet data in redis - if task_data.is_done: + if task_data is None or task_data.is_done: + # already done and updatet data in redis continue result_field: ResultField | None = None # get task result try: result_field = ResultField(str_result=dumps(task.result())) - if task_data.fire_and_forget: - to_remove.append((task_data.task_id, task_data.task_context)) except asyncio.InvalidStateError: # task was not completed try again next time and see if it is done continue @@ -407,10 +424,6 @@ async def _tasks_monitor( # pylint:disable=too-many-branches # noqa: C901, PLR0 updates["result_field"] = result_field await self._tasks_data.update_task_data(task_id, updates=updates) - # always remove fire and forget when done, no need to keep them around - for task_id, task_context in to_remove: - await self.remove_task(task_id, task_context, wait_for_removal=False) - async def list_tasks(self, with_task_context: TaskContext | None) -> list[TaskBase]: if not with_task_context: return [