Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions backend/infrahub/cli/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import typer
from infrahub_sdk.async_typer import AsyncTyper
from prefect.client.orchestration import get_client
from prefect.client.schemas.objects import StateType

from infrahub import config
from infrahub.services.adapters.workflow.worker import WorkflowWorkerExecution
from infrahub.task_manager.task import PrefectTask
from infrahub.tasks.dummy import DUMMY_FLOW, DummyInput
from infrahub.workflows.initialization import setup_task_manager
from infrahub.workflows.models import WorkerPoolDefinition
Expand Down Expand Up @@ -50,3 +52,47 @@ async def execute(
workflow=DUMMY_FLOW, parameters={"data": DummyInput(firstname="John", lastname="Doe")}
) # type: ignore[var-annotated]
print(result)


flush_app = AsyncTyper()

app.add_typer(flush_app, name="flush")


@flush_app.command()
async def flow_runs(
ctx: typer.Context, # noqa: ARG001
config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
days_to_keep: int = 30,
batch_size: int = 100,
) -> None:
"""Flush old task runs"""
logging.getLogger("infrahub").setLevel(logging.WARNING)
logging.getLogger("neo4j").setLevel(logging.ERROR)
logging.getLogger("prefect").setLevel(logging.ERROR)

config.load_and_exit(config_file_name=config_file)

await PrefectTask.delete_flow_runs(
days_to_keep=days_to_keep,
batch_size=batch_size,
)
Comment on lines +62 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Expand docstring and consider input validation

The docstring is missing the Args section required by coding guidelines. Additionally, there's no validation for days_to_keep or batch_size, which could lead to unexpected behavior with negative or excessively large values.

[As per coding guidelines]

Apply this diff to expand the docstring:

 async def flow_runs(
     ctx: typer.Context,  # noqa: ARG001
     config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
     days_to_keep: int = 30,
     batch_size: int = 100,
 ) -> None:
-    """Flush old task runs"""
+    """Flush old task runs by deleting completed, failed, and cancelled flows.
+
+    Args:
+        ctx: Typer context (unused).
+        config_file: Path to the Infrahub configuration file.
+        days_to_keep: Number of days to retain flow runs. Older flows are deleted.
+        batch_size: Number of flow runs to process per batch.
+    """

Consider adding input validation:

Add validation before calling delete_flow_runs:

async def flow_runs(
    ctx: typer.Context,  # noqa: ARG001
    config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
    days_to_keep: int = 30,
    batch_size: int = 100,
) -> None:
    """Flush old task runs by deleting completed, failed, and cancelled flows.

    Args:
        ctx: Typer context (unused).
        config_file: Path to the Infrahub configuration file.
        days_to_keep: Number of days to retain flow runs. Older flows are deleted.
        batch_size: Number of flow runs to process per batch.
    """
    if days_to_keep < 0:
        raise ValueError("days_to_keep must be non-negative")
    if batch_size < 1:
        raise ValueError("batch_size must be at least 1")
    
    logging.getLogger("infrahub").setLevel(logging.WARNING)
    # ... rest of the function
🤖 Prompt for AI Agents
In backend/infrahub/cli/tasks.py around lines 62 to 79, expand the function
docstring to include an Args section describing ctx, config_file, days_to_keep,
and batch_size (briefly state purpose and defaults) and add input validation
before calling PrefectTask.delete_flow_runs: raise ValueError if days_to_keep is
negative and if batch_size is less than 1; keep the rest of the function
(logging and config.load_and_exit) unchanged and ensure validation happens prior
to invoking delete_flow_runs.



@flush_app.command()
async def stale_runs(
ctx: typer.Context, # noqa: ARG001
config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
days_to_keep: int = 2,
batch_size: int = 100,
) -> None:
"""Flush stale task runs"""
logging.getLogger("infrahub").setLevel(logging.WARNING)
logging.getLogger("neo4j").setLevel(logging.ERROR)
logging.getLogger("prefect").setLevel(logging.ERROR)

config.load_and_exit(config_file_name=config_file)

await PrefectTask.delete_flow_runs(
states=[StateType.RUNNING], delete=False, days_to_keep=days_to_keep, batch_size=batch_size
)
Comment on lines +82 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Expand docstring to clarify the distinct behavior

This command behaves differently from flow_runs: it targets RUNNING states and marks them as CRASHED instead of deleting them. The docstring should clearly document this distinction and the parameters.

[As per coding guidelines]

Apply this diff:

 async def stale_runs(
     ctx: typer.Context,  # noqa: ARG001
     config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
     days_to_keep: int = 2,
     batch_size: int = 100,
 ) -> None:
-    """Flush stale task runs"""
+    """Mark stale RUNNING flows as CRASHED without deleting them.
+
+    This command targets flows stuck in the RUNNING state and marks them as
+    CRASHED to prevent them from blocking resources. Unlike flow_runs, this
+    does not delete the flow runs.
+
+    Args:
+        ctx: Typer context (unused).
+        config_file: Path to the Infrahub configuration file.
+        days_to_keep: Number of days to retain RUNNING flows. Older flows are marked as CRASHED. Default is 2.
+        batch_size: Number of flow runs to process per batch.
+    """

Consider adding the same input validation as in flow_runs:

    if days_to_keep < 0:
        raise ValueError("days_to_keep must be non-negative")
    if batch_size < 1:
        raise ValueError("batch_size must be at least 1")
🤖 Prompt for AI Agents
In backend/infrahub/cli/tasks.py around lines 82 to 98, the stale_runs command
docstring is too vague and doesn't document that it targets RUNNING states and
marks them as CRASHED rather than deleting them; update the docstring to state
this behavior and enumerate parameters (config_file, days_to_keep, batch_size)
and their effects; additionally add input validation matching flow_runs: raise
ValueError if days_to_keep < 0 and if batch_size < 1, placed at the start of the
function before loading config or performing operations.

70 changes: 69 additions & 1 deletion backend/infrahub/task_manager/task.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import UUID

Expand All @@ -12,13 +14,14 @@
FlowRunFilter,
FlowRunFilterId,
FlowRunFilterName,
FlowRunFilterStartTime,
FlowRunFilterState,
FlowRunFilterStateType,
FlowRunFilterTags,
LogFilter,
LogFilterFlowRunId,
)
from prefect.client.schemas.objects import Flow, FlowRun, StateType
from prefect.client.schemas.objects import Flow, FlowRun, State, StateType
from prefect.client.schemas.sorting import (
FlowRunSort,
)
Expand Down Expand Up @@ -311,3 +314,68 @@ async def query(
)

return {"count": count or 0, "edges": nodes}

@classmethod
async def delete_flow_runs(
cls,
states: list[StateType] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you give this the default used below

states: list[StateType] =  [StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED],

and then delete the if states is None: conditional?

delete: bool = True,
days_to_keep: int = 2,
batch_size: int = 100,
) -> None:
"""Delete flow runs in the specified states and older than specified days."""
Comment on lines +318 to +326
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Expand docstring to follow Google-style format

The docstring is missing the Args, Returns, and Raises sections required by the coding guidelines for Python functions.

[As per coding guidelines]

Apply this diff to add the complete docstring:

-    async def delete_flow_runs(
-        cls,
-        states: list[StateType] | None = None,
-        delete: bool = True,
-        days_to_keep: int = 2,
-        batch_size: int = 100,
-    ) -> None:
-        """Delete flow runs in the specified states and older than specified days."""
+    async def delete_flow_runs(
+        cls,
+        states: list[StateType] | None = None,
+        delete: bool = True,
+        days_to_keep: int = 2,
+        batch_size: int = 100,
+    ) -> None:
+        """Delete or crash flow runs in specified states older than the retention period.
+
+        Args:
+            states: List of StateType values to filter flow runs. Defaults to COMPLETED, FAILED, and CANCELLED.
+            delete: Whether to delete flow runs (True) or set them to CRASHED (False). Defaults to True.
+            days_to_keep: Number of days to retain flow runs. Flow runs older than this are processed. Defaults to 2.
+            batch_size: Number of flow runs to process per batch. Defaults to 100.
+
+        Raises:
+            Exception: Logs warnings for individual flow run processing failures but continues processing.
+        """
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@classmethod
async def delete_flow_runs(
cls,
states: list[StateType] | None = None,
delete: bool = True,
days_to_keep: int = 2,
batch_size: int = 100,
) -> None:
"""Delete flow runs in the specified states and older than specified days."""
@classmethod
async def delete_flow_runs(
cls,
states: list[StateType] | None = None,
delete: bool = True,
days_to_keep: int = 2,
batch_size: int = 100,
) -> None:
"""Delete or crash flow runs in specified states older than the retention period.
Args:
states: List of StateType values to filter flow runs. Defaults to COMPLETED, FAILED, and CANCELLED.
delete: Whether to delete flow runs (True) or set them to CRASHED (False). Defaults to True.
days_to_keep: Number of days to retain flow runs. Flow runs older than this are processed. Defaults to 2.
batch_size: Number of flow runs to process per batch. Defaults to 100.
Raises:
Exception: Logs warnings for individual flow run processing failures but continues processing.
"""
🤖 Prompt for AI Agents
In backend/infrahub/task_manager/task.py around lines 318 to 326, the method
delete_flow_runs has a one-line docstring; expand it into a Google-style
docstring including Args, Returns, and Raises. Document each parameter (states:
list[StateType] | None — states to filter; delete: bool — whether to delete or
only mark; days_to_keep: int — cutoff age in days; batch_size: int — number of
records per batch) with types and default values, state that the function
returns None, and add a Raises section listing possible exceptions (e.g.,
database/IO errors or ValueError if inputs are invalid). Keep wording concise
and consistent with other project docstrings.


if states is None:
states = [StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED]

logger = get_logger()

async with get_client(sync_client=False) as client:
cutoff = datetime.now(timezone.utc) - timedelta(days=days_to_keep)

flow_run_filter = FlowRunFilter(
start_time=FlowRunFilterStartTime(before_=cutoff), # type: ignore[arg-type]
state=FlowRunFilterState(type=FlowRunFilterStateType(any_=states)),
)

# Get flow runs to delete
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter, limit=batch_size)

deleted_total = 0

while flow_runs:
batch_deleted = 0
failed_deletes = []

# Delete each flow run through the API
for flow_run in flow_runs:
try:
if delete:
await client.delete_flow_run(flow_run_id=flow_run.id)
else:
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=State(type=StateType.CRASHED),
force=True,
)
deleted_total += 1
batch_deleted += 1
except Exception as e:
logger.warning(f"Failed to delete flow run {flow_run.id}: {e}")
failed_deletes.append(flow_run.id)

# Rate limiting
if batch_deleted % 10 == 0:
await asyncio.sleep(0.5)

logger.info(f"Delete {batch_deleted}/{len(flow_runs)} flow runs (total: {deleted_total})")
if failed_deletes:
logger.warning(f"Failed to delete {len(failed_deletes)} flow runs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this mostly duplicates the logger message on 364
not sure if you need both


# Get next batch
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter, limit=batch_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are flows that fail to delete or set to CRASHED will this query keep retrieving them and trying to delete/CRASH them?
maybe that is expected and fine, but if there are enough runs that cannot be deleted/CRASHed and the batch size is small enough, then I think this could end up in an infinite loop where it keeps retrieving the same runs and failing to delete them


# Delay between batches to avoid overwhelming the API
await asyncio.sleep(1.0)

logger.info(f"Retention complete. Total deleted tasks: {deleted_total}")