Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ class TaskData(BaseModel):
),
] = None

detected_as_done_at: Annotated[
datetime | None,
Field(
description=(
"used to remove the task when it's first detected as done "
"if a task was started as fire_and_forget=True"
)
),
] = None

is_done: Annotated[
bool,
Field(description="True when the task finished running with or without errors"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,33 @@ async def _get_tasks_to_remove(

for tracked_task in await tracked_tasks.list_tasks_data():
if tracked_task.fire_and_forget:
continue
# fire and forget tasks also need to be remove from tracking
# when detectes as done, start counting how much time has elapsed
# if over stale_task_detect_timeout_s remove the task

# wait for task to complete
if not tracked_task.is_done:
continue

# mark detected as done
if tracked_task.detected_as_done_at is None:
await tracked_tasks.update_task_data(
tracked_task.task_id,
updates={
"detected_as_done_at": datetime.datetime.now(tz=datetime.UTC)
},
)
continue

# if enough time passes remove the task
elapsed_since_done = (
utc_now - tracked_task.detected_as_done_at
).total_seconds()
if elapsed_since_done > stale_task_detect_timeout_s:
tasks_to_remove.append(
(tracked_task.task_id, tracked_task.task_context)
)
continue

if tracked_task.last_status_check is None:
# the task just added or never received a poll request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from models_library.rabbitmq_messages import InstrumentationRabbitMessage
from models_library.rpc.webserver.auth.api_keys import generate_unique_api_key
from models_library.service_settings_labels import SimcoreServiceLabels
from models_library.services_types import ServiceRunID
from models_library.shared_user_preferences import (
AllowMetricsCollectionFrontendUserPreference,
)
Expand Down Expand Up @@ -287,17 +288,19 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes(
scheduler_data.node_uuid
)

await _cleanup_long_running_tasks(app, scheduler_data.node_uuid)
await _cleanup_long_running_tasks(app, scheduler_data.run_id)

await task_progress.update(
message="finished removing resources", percent=ProgressPercent(1)
)


async def _cleanup_long_running_tasks(app: FastAPI, node_id: NodeID) -> None:
async def _cleanup_long_running_tasks(
app: FastAPI, service_run_id: ServiceRunID
) -> None:
long_running_client_helper = get_long_running_client_helper(app)

sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{node_id}"
sidecar_namespace = f"SIMCORE-SERVICE-DYNAMIC-SIDECAR-{service_run_id}"
await long_running_client_helper.cleanup(sidecar_namespace)


Expand Down
Loading