Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ class TaskData(BaseModel):
),
] = None

detected_as_done_at: Annotated[
datetime | None,
Field(
description=(
"used to remove the task when it's first detected as done "
"if a task was started as fire_and_forget=True"
)
),
] = None

is_done: Annotated[
bool,
Field(description="True when the task finished running with or without errors"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,33 @@ async def _get_tasks_to_remove(

for tracked_task in await tracked_tasks.list_tasks_data():
if tracked_task.fire_and_forget:
continue
# fire and forget tasks also need to be remove from tracking
# when detectes as done, start counting how much time has elapsed
# if over stale_task_detect_timeout_s remove the task

# wait for task to complete
if not tracked_task.is_done:
continue

# mark detected as done
if tracked_task.detected_as_done_at is None:
await tracked_tasks.update_task_data(
tracked_task.task_id,
updates={
"detected_as_done_at": datetime.datetime.now(tz=datetime.UTC)
},
)
continue

# if enough time passes remove the task
elapsed_since_done = (
utc_now - tracked_task.detected_as_done_at
).total_seconds()
if elapsed_since_done > stale_task_detect_timeout_s:
tasks_to_remove.append(
(tracked_task.task_id, tracked_task.task_context)
)
continue

if tracked_task.last_status_check is None:
# the task just added or never received a poll request
Expand Down Expand Up @@ -317,6 +343,7 @@ async def _tasks_monitor(self) -> None: # noqa: C901
"""
self._started_event_task_tasks_monitor.set()
task_id: TaskId

for task_id in set(self._created_tasks.keys()):
if task := self._created_tasks.get(task_id, None):
is_done = task.done()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def _get_resutlt(result_field: ResultField) -> Any:
return loads(result_field.str_result)


async def test_fire_and_forget_task_is_not_auto_removed(
async def test_fire_and_forget_task_is_not_auto_removed_while_running(
long_running_manager: LongRunningManager, empty_context: TaskContext
):
task_id = await lrt_api.start_task(
Expand All @@ -211,19 +211,32 @@ async def test_fire_and_forget_task_is_not_auto_removed(
fire_and_forget=True,
task_context=empty_context,
)

await asyncio.sleep(3 * TEST_CHECK_STALE_INTERVAL_S)
# the task shall still be present even if we did not check the status before
status = await long_running_manager.tasks_manager.get_task_status(
task_id, with_task_context=empty_context
)
assert not status.done, "task was removed although it is fire and forget"
# the task shall finish
await asyncio.sleep(4 * TEST_CHECK_STALE_INTERVAL_S)
# get the result
task_result = await long_running_manager.tasks_manager.get_task_result(
task_id, with_task_context=empty_context
)
assert _get_resutlt(task_result) == 42

async for attempt in AsyncRetrying(**_RETRY_PARAMS):
with attempt:
try:
await long_running_manager.tasks_manager.get_task_status(
task_id, with_task_context=empty_context
)
raise TryAgain
except TaskNotFoundError:
pass

with pytest.raises(TaskNotFoundError):
await long_running_manager.tasks_manager.get_task_status(
task_id, with_task_context=empty_context
)
with pytest.raises(TaskNotFoundError):
await long_running_manager.tasks_manager.get_task_result(
task_id, with_task_context=empty_context
)


async def test_get_result_of_unfinished_task_raises(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from models_library.rabbitmq_messages import InstrumentationRabbitMessage
from models_library.rpc.webserver.auth.api_keys import generate_unique_api_key
from models_library.service_settings_labels import SimcoreServiceLabels
from models_library.services_types import ServiceRunID
from models_library.shared_user_preferences import (
AllowMetricsCollectionFrontendUserPreference,
)
Expand Down Expand Up @@ -287,17 +288,19 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes(
scheduler_data.node_uuid
)

await _cleanup_long_running_tasks(app, scheduler_data.node_uuid)
await _cleanup_long_running_tasks(app, scheduler_data.run_id)

await task_progress.update(
message="finished removing resources", percent=ProgressPercent(1)
)


async def _cleanup_long_running_tasks(app: FastAPI, node_id: NodeID) -> None:
async def _cleanup_long_running_tasks(
app: FastAPI, service_run_id: ServiceRunID
) -> None:
long_running_client_helper = get_long_running_client_helper(app)

sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{node_id}"
sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{service_run_id}"
await long_running_client_helper.cleanup(sidecar_namespace)


Expand Down
Loading