Skip to content

Commit 991eab8

Browse files
GitHKAndrei Neaguodeimaiz
authored
🐛 LRT redis cleanup ⚠️🚨 (ITISFoundation#8539)
Co-authored-by: Andrei Neagu <[email protected]> Co-authored-by: Odei Maiz <[email protected]>
1 parent ffe52c1 commit 991eab8

File tree

4 files changed

+65
-12
lines changed

4 files changed

+65
-12
lines changed

packages/service-library/src/servicelib/long_running_tasks/models.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,16 @@ class TaskData(BaseModel):
7171
),
7272
] = None
7373

74+
detected_as_done_at: Annotated[
75+
datetime | None,
76+
Field(
77+
description=(
78+
"used to remove the task when it's first detected as done "
79+
"if a task was started as fire_and_forget=True"
80+
)
81+
),
82+
] = None
83+
7484
is_done: Annotated[
7585
bool,
7686
Field(description="True when the task finished running with or without errors"),

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,33 @@ async def _get_tasks_to_remove(
113113

114114
for tracked_task in await tracked_tasks.list_tasks_data():
115115
if tracked_task.fire_and_forget:
116-
continue
116+
# fire and forget tasks also need to be remove from tracking
117+
# when detectes as done, start counting how much time has elapsed
118+
# if over stale_task_detect_timeout_s remove the task
119+
120+
# wait for task to complete
121+
if not tracked_task.is_done:
122+
continue
123+
124+
# mark detected as done
125+
if tracked_task.detected_as_done_at is None:
126+
await tracked_tasks.update_task_data(
127+
tracked_task.task_id,
128+
updates={
129+
"detected_as_done_at": datetime.datetime.now(tz=datetime.UTC)
130+
},
131+
)
132+
continue
133+
134+
# if enough time passes remove the task
135+
elapsed_since_done = (
136+
utc_now - tracked_task.detected_as_done_at
137+
).total_seconds()
138+
if elapsed_since_done > stale_task_detect_timeout_s:
139+
tasks_to_remove.append(
140+
(tracked_task.task_id, tracked_task.task_context)
141+
)
142+
continue
117143

118144
if tracked_task.last_status_check is None:
119145
# the task just added or never received a poll request
@@ -317,6 +343,7 @@ async def _tasks_monitor(self) -> None: # noqa: C901
317343
"""
318344
self._started_event_task_tasks_monitor.set()
319345
task_id: TaskId
346+
320347
for task_id in set(self._created_tasks.keys()):
321348
if task := self._created_tasks.get(task_id, None):
322349
is_done = task.done()

packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ def _get_resutlt(result_field: ResultField) -> Any:
199199
return loads(result_field.str_result)
200200

201201

202-
async def test_fire_and_forget_task_is_not_auto_removed(
202+
async def test_fire_and_forget_task_is_not_auto_removed_while_running(
203203
long_running_manager: LongRunningManager, empty_context: TaskContext
204204
):
205205
task_id = await lrt_api.start_task(
@@ -211,19 +211,32 @@ async def test_fire_and_forget_task_is_not_auto_removed(
211211
fire_and_forget=True,
212212
task_context=empty_context,
213213
)
214+
214215
await asyncio.sleep(3 * TEST_CHECK_STALE_INTERVAL_S)
215216
# the task shall still be present even if we did not check the status before
216217
status = await long_running_manager.tasks_manager.get_task_status(
217218
task_id, with_task_context=empty_context
218219
)
219220
assert not status.done, "task was removed although it is fire and forget"
220-
# the task shall finish
221-
await asyncio.sleep(4 * TEST_CHECK_STALE_INTERVAL_S)
222-
# get the result
223-
task_result = await long_running_manager.tasks_manager.get_task_result(
224-
task_id, with_task_context=empty_context
225-
)
226-
assert _get_resutlt(task_result) == 42
221+
222+
async for attempt in AsyncRetrying(**_RETRY_PARAMS):
223+
with attempt:
224+
try:
225+
await long_running_manager.tasks_manager.get_task_status(
226+
task_id, with_task_context=empty_context
227+
)
228+
raise TryAgain
229+
except TaskNotFoundError:
230+
pass
231+
232+
with pytest.raises(TaskNotFoundError):
233+
await long_running_manager.tasks_manager.get_task_status(
234+
task_id, with_task_context=empty_context
235+
)
236+
with pytest.raises(TaskNotFoundError):
237+
await long_running_manager.tasks_manager.get_task_result(
238+
task_id, with_task_context=empty_context
239+
)
227240

228241

229242
async def test_get_result_of_unfinished_task_raises(

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from models_library.rabbitmq_messages import InstrumentationRabbitMessage
1414
from models_library.rpc.webserver.auth.api_keys import generate_unique_api_key
1515
from models_library.service_settings_labels import SimcoreServiceLabels
16+
from models_library.services_types import ServiceRunID
1617
from models_library.shared_user_preferences import (
1718
AllowMetricsCollectionFrontendUserPreference,
1819
)
@@ -287,17 +288,19 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes(
287288
scheduler_data.node_uuid
288289
)
289290

290-
await _cleanup_long_running_tasks(app, scheduler_data.node_uuid)
291+
await _cleanup_long_running_tasks(app, scheduler_data.run_id)
291292

292293
await task_progress.update(
293294
message="finished removing resources", percent=ProgressPercent(1)
294295
)
295296

296297

297-
async def _cleanup_long_running_tasks(app: FastAPI, node_id: NodeID) -> None:
298+
async def _cleanup_long_running_tasks(
299+
app: FastAPI, service_run_id: ServiceRunID
300+
) -> None:
298301
long_running_client_helper = get_long_running_client_helper(app)
299302

300-
sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{node_id}"
303+
sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{service_run_id}"
301304
await long_running_client_helper.cleanup(sidecar_namespace)
302305

303306

0 commit comments

Comments
 (0)