Skip to content

Commit 51a684b

Browse files
committed
feat(cli): add tasks flush commands
Signed-off-by: Fatih Acar <[email protected]>
1 parent ac43c24 commit 51a684b

File tree

2 files changed

+119
-0
lines changed

2 files changed

+119
-0
lines changed

backend/infrahub/cli/tasks.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import typer
44
from infrahub_sdk.async_typer import AsyncTyper
55
from prefect.client.orchestration import get_client
6+
from prefect.client.schemas.objects import StateType
67

78
from infrahub import config
89
from infrahub.services.adapters.workflow.worker import WorkflowWorkerExecution
10+
from infrahub.task_manager.task import PrefectTask
911
from infrahub.tasks.dummy import DUMMY_FLOW, DummyInput
1012
from infrahub.workflows.initialization import setup_task_manager
1113
from infrahub.workflows.models import WorkerPoolDefinition
@@ -50,3 +52,47 @@ async def execute(
5052
workflow=DUMMY_FLOW, parameters={"data": DummyInput(firstname="John", lastname="Doe")}
5153
) # type: ignore[var-annotated]
5254
print(result)
55+
56+
57+
flush_app = AsyncTyper()
58+
59+
app.add_typer(flush_app, name="flush")
60+
61+
62+
@flush_app.command()
63+
async def flow_runs(
64+
ctx: typer.Context, # noqa: ARG001
65+
config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
66+
days_to_keep: int = 30,
67+
batch_size: int = 100,
68+
) -> None:
69+
"""Flush old task runs"""
70+
logging.getLogger("infrahub").setLevel(logging.WARNING)
71+
logging.getLogger("neo4j").setLevel(logging.ERROR)
72+
logging.getLogger("prefect").setLevel(logging.ERROR)
73+
74+
config.load_and_exit(config_file_name=config_file)
75+
76+
await PrefectTask.delete_flow_runs(
77+
days_to_keep=days_to_keep,
78+
batch_size=batch_size,
79+
)
80+
81+
82+
@flush_app.command()
83+
async def stale_runs(
84+
ctx: typer.Context, # noqa: ARG001
85+
config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
86+
days_to_keep: int = 2,
87+
batch_size: int = 100,
88+
) -> None:
89+
"""Flush stale task runs"""
90+
logging.getLogger("infrahub").setLevel(logging.WARNING)
91+
logging.getLogger("neo4j").setLevel(logging.ERROR)
92+
logging.getLogger("prefect").setLevel(logging.ERROR)
93+
94+
config.load_and_exit(config_file_name=config_file)
95+
96+
await PrefectTask.delete_flow_runs(
97+
states=[StateType.RUNNING], delete=False, days_to_keep=days_to_keep, batch_size=batch_size
98+
)

backend/infrahub/task_manager/task.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import asyncio
12
import uuid
3+
from datetime import datetime, timedelta, timezone
24
from typing import Any
35
from uuid import UUID
46

7+
from prefect import State
58
from prefect.client.orchestration import PrefectClient, get_client
69
from prefect.client.schemas.filters import (
710
ArtifactFilter,
@@ -12,6 +15,7 @@
1215
FlowRunFilter,
1316
FlowRunFilterId,
1417
FlowRunFilterName,
18+
FlowRunFilterStartTime,
1519
FlowRunFilterState,
1620
FlowRunFilterStateType,
1721
FlowRunFilterTags,
@@ -311,3 +315,72 @@ async def query(
311315
)
312316

313317
return {"count": count or 0, "edges": nodes}
318+
319+
@classmethod
320+
async def delete_flow_runs(
321+
cls,
322+
states: list[StateType] = [StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED], # noqa: B006
323+
delete: bool = True,
324+
days_to_keep: int = 2,
325+
batch_size: int = 100,
326+
) -> None:
327+
"""Delete flow runs in the specified states and older than specified days."""
328+
329+
logger = get_logger()
330+
331+
async with get_client(sync_client=False) as client:
332+
cutoff = datetime.now(timezone.utc) - timedelta(days=days_to_keep)
333+
334+
flow_run_filter = FlowRunFilter(
335+
start_time=FlowRunFilterStartTime(before_=cutoff), # type: ignore[arg-type]
336+
state=FlowRunFilterState(type=FlowRunFilterStateType(any_=states)),
337+
)
338+
339+
# Get flow runs to delete
340+
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter, limit=batch_size)
341+
342+
deleted_total = 0
343+
344+
while True:
345+
batch_deleted = 0
346+
failed_deletes = []
347+
348+
# Delete each flow run through the API
349+
for flow_run in flow_runs:
350+
try:
351+
if delete:
352+
await client.delete_flow_run(flow_run_id=flow_run.id)
353+
else:
354+
await client.set_flow_run_state(
355+
flow_run_id=flow_run.id,
356+
state=State(type=StateType.CRASHED),
357+
force=True,
358+
)
359+
deleted_total += 1
360+
batch_deleted += 1
361+
except Exception as e:
362+
logger.warning(f"Failed to delete flow run {flow_run.id}: {e}")
363+
failed_deletes.append(flow_run.id)
364+
365+
# Rate limiting
366+
if batch_deleted % 10 == 0:
367+
await asyncio.sleep(0.5)
368+
369+
logger.info(f"Delete {batch_deleted}/{len(flow_runs)} flow runs (total: {deleted_total})")
370+
371+
# Get next batch
372+
previous_flow_run_ids = [fr.id for fr in flow_runs]
373+
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter, limit=batch_size)
374+
375+
if not flow_runs:
376+
logger.info("No more flow runs to delete")
377+
break
378+
379+
if previous_flow_run_ids == [fr.id for fr in flow_runs]:
380+
logger.info("Found same flow runs to delete, aborting")
381+
break
382+
383+
# Delay between batches to avoid overwhelming the API
384+
await asyncio.sleep(1.0)
385+
386+
logger.info(f"Retention complete. Total deleted tasks: {deleted_total}")

0 commit comments

Comments
 (0)