Skip to content

Conversation

fatih-acar
Copy link
Contributor

@fatih-acar fatih-acar commented Oct 15, 2025

This will be used by the ops tools (infrahub-taskmanager).

This can be included in an internal scheduled workflow later on, if required.

Summary by CodeRabbit

  • New Features
    • Added a new CLI subcommand group: flush.
      • flush flow-runs: bulk-remove flow runs older than a configurable retention (days-to-keep, default 30) with batching (batch-size, default 100).
      • flush stale-runs: target stale RUNNING flow runs (days-to-keep, default 2; batch-size, default 100) and mark them appropriately instead of deleting.
    • Both commands load configuration and allow adjustable logging for visibility during cleanup.

@github-actions github-actions bot added the group/backend Issue related to the backend (API Server, Git Agent) label Oct 15, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 15, 2025

Walkthrough

Adds a CLI sub-application flush_app with two commands, flow_runs and stale_runs, which load configuration, adjust log levels, and call PrefectTask.delete_flow_runs with days_to_keep and batch_size. stale_runs filters for RUNNING states with delete=False. Introduces StateType usage. Adds PrefectTask.delete_flow_runs, which computes a UTC cutoff from days_to_keep, builds a FlowRunFilter (start_time and state), iterates via read_flow_runs in batches, deletes or marks runs as CRASHED, and includes pagination, per-run error handling, batch logging, and rate limiting.

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title succinctly conveys that this pull request introduces new CLI commands for flushing tasks, matching the primary changes in the code which add “flush” commands under the CLI interface for task management. It is clear, concise, and follows the conventional commit style used in the repository.
Docstring Coverage ✅ Passed Docstring coverage is 80.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fac-add-taskmgr-helpers

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codspeed-hq
Copy link

codspeed-hq bot commented Oct 15, 2025

CodSpeed Performance Report

Merging #7390 will not alter performance

Comparing fac-add-taskmgr-helpers (001d10b) with stable (b036efb)1

Summary

✅ 10 untouched

Footnotes

  1. No successful run was found on stable (aedb4ac) during the generation of this report, so b036efb was used instead as the comparison base. There might be some changes unrelated to this pull request in this report.

@fatih-acar fatih-acar force-pushed the fac-add-taskmgr-helpers branch from 41443f2 to 58ad3bf Compare October 15, 2025 13:04
@fatih-acar fatih-acar marked this pull request as ready for review October 15, 2025 13:46
@fatih-acar fatih-acar requested a review from a team as a code owner October 15, 2025 13:46
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
backend/infrahub/task_manager/task.py (1)

326-340: Docstring should follow project’s Google-style format

Expand the docstring with Args/Returns sections to meet guidelines.

As per coding guidelines

-        """Delete flow runs in the specified states and older than specified days."""
+        """
+        Delete or mark Prefect flow runs older than a retention window.
+
+        Args:
+            states: Flow run states to target. Defaults to COMPLETED, FAILED, CANCELLED.
+            delete: If True, delete runs; otherwise mark them as CRASHED.
+            days_to_keep: Retention window in days; only runs older than this are processed.
+            batch_size: Maximum number of runs to process per API page.
+
+        Returns:
+            None
+        """
backend/infrahub/cli/tasks.py (2)

62-80: Add option help for clarity; consider an explicit confirmation flag

Provide helpful descriptions for days_to_keep and batch_size, and optionally add a --yes flag to avoid accidental deletions in interactive use.

-async def flow_runs(
+async def flow_runs(
     ctx: typer.Context,  # noqa: ARG001
-    config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
-    days_to_keep: int = 30,
-    batch_size: int = 100,
+    config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
+    days_to_keep: int = typer.Option(30, help="Keep this many days; older runs are pruned."),
+    batch_size: int = typer.Option(100, help="Max runs processed per API page."),
 ) -> None:

Optionally add: yes: bool = typer.Option(False, help="Proceed without interactive confirmation.")


82-98: Stale RUNNING to CRASHED: add help text and caution

Great targeted cleanup. Improve UX with help text; consider a confirmation flag since it changes state on RUNNING flows.

-async def stale_runs(
+async def stale_runs(
     ctx: typer.Context,  # noqa: ARG001
-    config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
-    days_to_keep: int = 2,
-    batch_size: int = 100,
+    config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
+    days_to_keep: int = typer.Option(2, help="Mark RUNNING flows older than this as CRASHED."),
+    batch_size: int = typer.Option(100, help="Max runs processed per API page."),
 ) -> None:

Add an explicit note in --help that this operation is irreversible and should target the correct Prefect environment.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b036efb and 58ad3bf.

📒 Files selected for processing (2)
  • backend/infrahub/cli/tasks.py (2 hunks)
  • backend/infrahub/task_manager/task.py (3 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
backend/**/*

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Run backend tests with pytest or via invoke tasks

Files:

  • backend/infrahub/cli/tasks.py
  • backend/infrahub/task_manager/task.py
**/*.py

📄 CodeRabbit inference engine (.github/instructions/python-docstring.instructions.md)

**/*.py: Use triple double quotes (""") for all Python docstrings
Write docstrings in Google-style format
Include a brief one-line description at the top of each docstring
Add a detailed description section when additional context is needed
Document function/method parameters under an Args/Parameters section without typing information
Include a Returns section describing the return value
Include a Raises section listing possible exceptions
Provide an Examples section demonstrating usage when helpful

Use ruff and mypy to validate and lint Python files

**/*.py: Use type hints for all Python function parameters and return values
Prefer asynchronous code in Python when feasible
Define asynchronous Python functions with async def
Use await for asynchronous calls in Python
Use Pydantic models instead of standard dataclasses
Use ruff and mypy for linting and type checking

**/*.py: Use type hints for all Python function parameters and return values
Use async/await whenever possible in Python code
Define asynchronous functions with async def
Await asynchronous calls with await
Use Pydantic models instead of standard dataclasses for data modeling
Use triple quotes (""") for all Python docstrings
Write docstrings in Google-style format
Include docstring sections when applicable: one-line summary, optional details, Args (without types), Returns, Raises, Examples
Validate and lint Python with ruff and mypy

Files:

  • backend/infrahub/cli/tasks.py
  • backend/infrahub/task_manager/task.py
🧬 Code graph analysis (2)
backend/infrahub/cli/tasks.py (2)
backend/infrahub/task_manager/task.py (1)
  • delete_flow_runs (319-381)
backend/infrahub/config.py (1)
  • load_and_exit (985-1002)
backend/infrahub/task_manager/task.py (2)
backend/tests/conftest.py (1)
  • prefect (382-397)
backend/infrahub/cli/tasks.py (1)
  • flow_runs (63-79)
🔇 Additional comments (1)
backend/infrahub/cli/tasks.py (1)

57-60: Good CLI grouping

Adding a flush sub-app keeps destructive ops scoped and discoverable. LGTM.

Comment on lines 351 to 381
for flow_run in flow_runs:
try:
if delete:
await client.delete_flow_run(flow_run_id=flow_run.id)
else:
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=State(type=StateType.CRASHED),
force=True,
)
deleted_total += 1
batch_deleted += 1
except Exception as e:
logger.warning(f"Failed to set flow run {flow_run.id} to CRASHED: {e}")
failed_deletes.append(flow_run.id)

# Rate limiting
if batch_deleted % 10 == 0:
await asyncio.sleep(0.5)

logger.info(f"Set {batch_deleted}/{len(flow_runs)} flow runs to CRASHED (total: {deleted_total})")
if failed_deletes:
logger.warning(f"Failed to set {len(failed_deletes)} CRASHED flow runs")

# Get next batch
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter, limit=batch_size)

# Delay between batches to avoid overwhelming the API
await asyncio.sleep(1.0)

logger.info(f"Retention complete. Total deleted tasks: {deleted_total}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Logging messages don’t match the operation (delete vs CRASHED)

Messages always mention “CRASHED” and “deleted tasks” even when deleting or crashing. Fix wording based on the delete flag.

-            deleted_total = 0
+            deleted_total = 0
+            action_past = "deleted" if delete else "set to CRASHED"

@@
-                    except Exception as e:
-                        logger.warning(f"Failed to set flow run {flow_run.id} to CRASHED: {e}")
+                    except Exception as e:
+                        logger.warning(
+                            f"Failed to {'delete' if delete else 'set to CRASHED'} flow run {flow_run.id}: {e}"
+                        )
                         failed_deletes.append(flow_run.id)
@@
-                logger.info(f"Set {batch_deleted}/{len(flow_runs)} flow runs to CRASHED (total: {deleted_total})")
+                logger.info(f"{action_past.capitalize()} {batch_deleted}/{len(flow_runs)} flow runs (total: {deleted_total})")
                 if failed_deletes:
-                    logger.warning(f"Failed to set {len(failed_deletes)} CRASHED flow runs")
+                    logger.warning(
+                        f"Failed to {'delete' if delete else 'set to CRASHED'} {len(failed_deletes)} flow runs"
+                    )
@@
-            logger.info(f"Retention complete. Total deleted tasks: {deleted_total}")
+            logger.info(
+                f"Retention complete. Total {'deleted' if delete else 'crashed'} flow runs: {deleted_total}"
+            )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for flow_run in flow_runs:
try:
if delete:
await client.delete_flow_run(flow_run_id=flow_run.id)
else:
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=State(type=StateType.CRASHED),
force=True,
)
deleted_total += 1
batch_deleted += 1
except Exception as e:
logger.warning(f"Failed to set flow run {flow_run.id} to CRASHED: {e}")
failed_deletes.append(flow_run.id)
# Rate limiting
if batch_deleted % 10 == 0:
await asyncio.sleep(0.5)
logger.info(f"Set {batch_deleted}/{len(flow_runs)} flow runs to CRASHED (total: {deleted_total})")
if failed_deletes:
logger.warning(f"Failed to set {len(failed_deletes)} CRASHED flow runs")
# Get next batch
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter, limit=batch_size)
# Delay between batches to avoid overwhelming the API
await asyncio.sleep(1.0)
logger.info(f"Retention complete. Total deleted tasks: {deleted_total}")
action_past = "deleted" if delete else "set to CRASHED"
for flow_run in flow_runs:
try:
if delete:
await client.delete_flow_run(flow_run_id=flow_run.id)
else:
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=State(type=StateType.CRASHED),
force=True,
)
deleted_total += 1
batch_deleted += 1
except Exception as e:
logger.warning(
f"Failed to {'delete' if delete else 'set to CRASHED'} flow run {flow_run.id}: {e}"
)
failed_deletes.append(flow_run.id)
# Rate limiting
if batch_deleted % 10 == 0:
await asyncio.sleep(0.5)
logger.info(f"{action_past.capitalize()} {batch_deleted}/{len(flow_runs)} flow runs (total: {deleted_total})")
if failed_deletes:
logger.warning(
f"Failed to {'delete' if delete else 'set to CRASHED'} {len(failed_deletes)} flow runs"
)
# Get next batch
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter, limit=batch_size)
# Delay between batches to avoid overwhelming the API
await asyncio.sleep(1.0)
logger.info(
f"Retention complete. Total {'deleted' if delete else 'crashed'} flow runs: {deleted_total}"
)
🤖 Prompt for AI Agents
In backend/infrahub/task_manager/task.py around lines 351 to 381, the log
messages always refer to "CRASHED" and "deleted tasks" regardless of the delete
flag; update the logging to reflect the actual operation: when delete=True log
"Deleted X/Y flow runs (total: Z)" and "Failed to delete N flow runs", otherwise
log "Set X/Y flow runs to CRASHED (total: Z)" and "Failed to set N flow runs to
CRASHED"; also update the per-run warning inside the exception handler to say
"Failed to delete flow run {id}: {e}" or "Failed to set flow run {id} to
CRASHED: {e}" based on the flag, ensuring pluralization and counts
(batch_deleted, deleted_total, failed_deletes) remain correct.

@fatih-acar fatih-acar force-pushed the fac-add-taskmgr-helpers branch from 58ad3bf to 001d10b Compare October 15, 2025 14:04
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
backend/infrahub/task_manager/task.py (2)

333-333: Add sync_client=False to async get_client call

Elsewhere in this file (line 243) the async client is created with get_client(sync_client=False). Use the same explicit parameter here for consistency.

Apply this diff:

-        async with get_client() as client:
+        async with get_client(sync_client=False) as client:

346-381: Fix logging messages to reflect the actual operation

The log messages always refer to "delete/deleted" operations regardless of the delete flag value. When delete=False, the operation marks flows as CRASHED, not deletes them. This makes logs misleading and harder to troubleshoot.

Apply this diff to fix the logging:

         deleted_total = 0
+        action_verb = "delete" if delete else "set to CRASHED"
+        action_past = "deleted" if delete else "set to CRASHED"
 
         while flow_runs:
             batch_deleted = 0
@@ -361,7 +363,7 @@
                     deleted_total += 1
                     batch_deleted += 1
                 except Exception as e:
-                    logger.warning(f"Failed to delete flow run {flow_run.id}: {e}")
+                    logger.warning(f"Failed to {action_verb} flow run {flow_run.id}: {e}")
                     failed_deletes.append(flow_run.id)
 
                 # Rate limiting
                 if batch_deleted % 10 == 0:
                     await asyncio.sleep(0.5)
 
-            logger.info(f"Delete {batch_deleted}/{len(flow_runs)} flow runs (total: {deleted_total})")
+            logger.info(f"{action_past.capitalize()} {batch_deleted}/{len(flow_runs)} flow runs (total: {deleted_total})")
             if failed_deletes:
-                logger.warning(f"Failed to delete {len(failed_deletes)} flow runs")
+                logger.warning(f"Failed to {action_verb} {len(failed_deletes)} flow runs")
 
             # Get next batch
             flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter, limit=batch_size)
@@ -378,4 +380,4 @@
             # Delay between batches to avoid overwhelming the API
             await asyncio.sleep(1.0)
 
-        logger.info(f"Retention complete. Total deleted tasks: {deleted_total}")
+        logger.info(f"Retention complete. Total {action_past} flow runs: {deleted_total}")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 58ad3bf and 001d10b.

📒 Files selected for processing (2)
  • backend/infrahub/cli/tasks.py (2 hunks)
  • backend/infrahub/task_manager/task.py (3 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
backend/**/*

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Run backend tests with pytest or via invoke tasks

Files:

  • backend/infrahub/cli/tasks.py
  • backend/infrahub/task_manager/task.py
**/*.py

📄 CodeRabbit inference engine (.github/instructions/python-docstring.instructions.md)

**/*.py: Use triple double quotes (""") for all Python docstrings
Write docstrings in Google-style format
Include a brief one-line description at the top of each docstring
Add a detailed description section when additional context is needed
Document function/method parameters under an Args/Parameters section without typing information
Include a Returns section describing the return value
Include a Raises section listing possible exceptions
Provide an Examples section demonstrating usage when helpful

Use ruff and mypy to validate and lint Python files

**/*.py: Use type hints for all Python function parameters and return values
Prefer asynchronous code in Python when feasible
Define asynchronous Python functions with async def
Use await for asynchronous calls in Python
Use Pydantic models instead of standard dataclasses
Use ruff and mypy for linting and type checking

**/*.py: Use type hints for all Python function parameters and return values
Use async/await whenever possible in Python code
Define asynchronous functions with async def
Await asynchronous calls with await
Use Pydantic models instead of standard dataclasses for data modeling
Use triple quotes (""") for all Python docstrings
Write docstrings in Google-style format
Include docstring sections when applicable: one-line summary, optional details, Args (without types), Returns, Raises, Examples
Validate and lint Python with ruff and mypy

Files:

  • backend/infrahub/cli/tasks.py
  • backend/infrahub/task_manager/task.py
🧬 Code graph analysis (2)
backend/infrahub/cli/tasks.py (2)
backend/infrahub/task_manager/task.py (2)
  • PrefectTask (45-381)
  • delete_flow_runs (319-381)
backend/infrahub/config.py (1)
  • load_and_exit (985-1002)
backend/infrahub/task_manager/task.py (2)
backend/tests/unit/graphql/queries/test_task.py (1)
  • delete_flow_runs (147-150)
backend/infrahub/cli/tasks.py (1)
  • flow_runs (63-79)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: E2E-testing-version-upgrade / From 1.3.0
  • GitHub Check: backend-benchmark
  • GitHub Check: E2E-testing-invoke-demo-start
  • GitHub Check: E2E-testing-playwright
  • GitHub Check: backend-tests-functional
  • GitHub Check: backend-docker-integration
  • GitHub Check: backend-tests-unit
  • GitHub Check: backend-tests-integration
  • GitHub Check: Cloudflare Pages
🔇 Additional comments (4)
backend/infrahub/task_manager/task.py (2)

1-1: LGTM: Imports are appropriate for the new functionality

The new imports (asyncio, timezone, FlowRunFilterStartTime, StateType) are all used in the delete_flow_runs method and support the rate limiting, UTC time handling, filtering, and type hinting requirements.

Also applies to: 3-3, 17-17, 24-24


334-342: Confirm necessity of # type: ignore[arg-type] on before_
Verify Prefect SDK’s FlowRunFilterStartTime.before_ signature—if it rejects datetime, retain the ignore or update your stubs; otherwise remove it.

backend/infrahub/cli/tasks.py (2)

6-6: LGTM: New imports support the flush commands

The StateType import is used to filter RUNNING states in stale_runs, and PrefectTask provides the deletion logic for both new commands.

Also applies to: 10-10


57-59: LGTM: Sub-application structure is appropriate

The flush_app sub-application correctly uses AsyncTyper for async commands and is properly registered with the main app.

Comment on lines +62 to +79
@flush_app.command()
async def flow_runs(
ctx: typer.Context, # noqa: ARG001
config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
days_to_keep: int = 30,
batch_size: int = 100,
) -> None:
"""Flush old task runs"""
logging.getLogger("infrahub").setLevel(logging.WARNING)
logging.getLogger("neo4j").setLevel(logging.ERROR)
logging.getLogger("prefect").setLevel(logging.ERROR)

config.load_and_exit(config_file_name=config_file)

await PrefectTask.delete_flow_runs(
days_to_keep=days_to_keep,
batch_size=batch_size,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Expand docstring and consider input validation

The docstring is missing the Args section required by coding guidelines. Additionally, there's no validation for days_to_keep or batch_size, which could lead to unexpected behavior with negative or excessively large values.

[As per coding guidelines]

Apply this diff to expand the docstring:

 async def flow_runs(
     ctx: typer.Context,  # noqa: ARG001
     config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
     days_to_keep: int = 30,
     batch_size: int = 100,
 ) -> None:
-    """Flush old task runs"""
+    """Flush old task runs by deleting completed, failed, and cancelled flows.
+
+    Args:
+        ctx: Typer context (unused).
+        config_file: Path to the Infrahub configuration file.
+        days_to_keep: Number of days to retain flow runs. Older flows are deleted.
+        batch_size: Number of flow runs to process per batch.
+    """

Consider adding input validation:

Add validation before calling delete_flow_runs:

async def flow_runs(
    ctx: typer.Context,  # noqa: ARG001
    config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
    days_to_keep: int = 30,
    batch_size: int = 100,
) -> None:
    """Flush old task runs by deleting completed, failed, and cancelled flows.

    Args:
        ctx: Typer context (unused).
        config_file: Path to the Infrahub configuration file.
        days_to_keep: Number of days to retain flow runs. Older flows are deleted.
        batch_size: Number of flow runs to process per batch.
    """
    if days_to_keep < 0:
        raise ValueError("days_to_keep must be non-negative")
    if batch_size < 1:
        raise ValueError("batch_size must be at least 1")
    
    logging.getLogger("infrahub").setLevel(logging.WARNING)
    # ... rest of the function
🤖 Prompt for AI Agents
In backend/infrahub/cli/tasks.py around lines 62 to 79, expand the function
docstring to include an Args section describing ctx, config_file, days_to_keep,
and batch_size (briefly state purpose and defaults) and add input validation
before calling PrefectTask.delete_flow_runs: raise ValueError if days_to_keep is
negative and if batch_size is less than 1; keep the rest of the function
(logging and config.load_and_exit) unchanged and ensure validation happens prior
to invoking delete_flow_runs.

Comment on lines +82 to +98
@flush_app.command()
async def stale_runs(
ctx: typer.Context, # noqa: ARG001
config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
days_to_keep: int = 2,
batch_size: int = 100,
) -> None:
"""Flush stale task runs"""
logging.getLogger("infrahub").setLevel(logging.WARNING)
logging.getLogger("neo4j").setLevel(logging.ERROR)
logging.getLogger("prefect").setLevel(logging.ERROR)

config.load_and_exit(config_file_name=config_file)

await PrefectTask.delete_flow_runs(
states=[StateType.RUNNING], delete=False, days_to_keep=days_to_keep, batch_size=batch_size
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Expand docstring to clarify the distinct behavior

This command behaves differently from flow_runs: it targets RUNNING states and marks them as CRASHED instead of deleting them. The docstring should clearly document this distinction and the parameters.

[As per coding guidelines]

Apply this diff:

 async def stale_runs(
     ctx: typer.Context,  # noqa: ARG001
     config_file: str = typer.Argument("infrahub.toml", envvar="INFRAHUB_CONFIG"),
     days_to_keep: int = 2,
     batch_size: int = 100,
 ) -> None:
-    """Flush stale task runs"""
+    """Mark stale RUNNING flows as CRASHED without deleting them.
+
+    This command targets flows stuck in the RUNNING state and marks them as
+    CRASHED to prevent them from blocking resources. Unlike flow_runs, this
+    does not delete the flow runs.
+
+    Args:
+        ctx: Typer context (unused).
+        config_file: Path to the Infrahub configuration file.
+        days_to_keep: Number of days to retain RUNNING flows. Older flows are marked as CRASHED. Default is 2.
+        batch_size: Number of flow runs to process per batch.
+    """

Consider adding the same input validation as in flow_runs:

    if days_to_keep < 0:
        raise ValueError("days_to_keep must be non-negative")
    if batch_size < 1:
        raise ValueError("batch_size must be at least 1")
🤖 Prompt for AI Agents
In backend/infrahub/cli/tasks.py around lines 82 to 98, the stale_runs command
docstring is too vague and doesn't document that it targets RUNNING states and
marks them as CRASHED rather than deleting them; update the docstring to state
this behavior and enumerate parameters (config_file, days_to_keep, batch_size)
and their effects; additionally add input validation matching flow_runs: raise
ValueError if days_to_keep < 0 and if batch_size < 1, placed at the start of the
function before loading config or performing operations.

Comment on lines +318 to +326
@classmethod
async def delete_flow_runs(
cls,
states: list[StateType] | None = None,
delete: bool = True,
days_to_keep: int = 2,
batch_size: int = 100,
) -> None:
"""Delete flow runs in the specified states and older than specified days."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Expand docstring to follow Google-style format

The docstring is missing the Args, Returns, and Raises sections required by the coding guidelines for Python functions.

[As per coding guidelines]

Apply this diff to add the complete docstring:

-    async def delete_flow_runs(
-        cls,
-        states: list[StateType] | None = None,
-        delete: bool = True,
-        days_to_keep: int = 2,
-        batch_size: int = 100,
-    ) -> None:
-        """Delete flow runs in the specified states and older than specified days."""
+    async def delete_flow_runs(
+        cls,
+        states: list[StateType] | None = None,
+        delete: bool = True,
+        days_to_keep: int = 2,
+        batch_size: int = 100,
+    ) -> None:
+        """Delete or crash flow runs in specified states older than the retention period.
+
+        Args:
+            states: List of StateType values to filter flow runs. Defaults to COMPLETED, FAILED, and CANCELLED.
+            delete: Whether to delete flow runs (True) or set them to CRASHED (False). Defaults to True.
+            days_to_keep: Number of days to retain flow runs. Flow runs older than this are processed. Defaults to 2.
+            batch_size: Number of flow runs to process per batch. Defaults to 100.
+
+        Raises:
+            Exception: Logs warnings for individual flow run processing failures but continues processing.
+        """
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@classmethod
async def delete_flow_runs(
cls,
states: list[StateType] | None = None,
delete: bool = True,
days_to_keep: int = 2,
batch_size: int = 100,
) -> None:
"""Delete flow runs in the specified states and older than specified days."""
@classmethod
async def delete_flow_runs(
cls,
states: list[StateType] | None = None,
delete: bool = True,
days_to_keep: int = 2,
batch_size: int = 100,
) -> None:
"""Delete or crash flow runs in specified states older than the retention period.
Args:
states: List of StateType values to filter flow runs. Defaults to COMPLETED, FAILED, and CANCELLED.
delete: Whether to delete flow runs (True) or set them to CRASHED (False). Defaults to True.
days_to_keep: Number of days to retain flow runs. Flow runs older than this are processed. Defaults to 2.
batch_size: Number of flow runs to process per batch. Defaults to 100.
Raises:
Exception: Logs warnings for individual flow run processing failures but continues processing.
"""
🤖 Prompt for AI Agents
In backend/infrahub/task_manager/task.py around lines 318 to 326, the method
delete_flow_runs has a one-line docstring; expand it into a Google-style
docstring including Args, Returns, and Raises. Document each parameter (states:
list[StateType] | None — states to filter; delete: bool — whether to delete or
only mark; days_to_keep: int — cutoff age in days; batch_size: int — number of
records per batch) with types and default values, state that the function
returns None, and add a Raises section listing possible exceptions (e.g.,
database/IO errors or ValueError if inputs are invalid). Keep wording concise
and consistent with other project docstrings.

@classmethod
async def delete_flow_runs(
cls,
states: list[StateType] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you give this the default used below

states: list[StateType] =  [StateType.COMPLETED, StateType.FAILED, StateType.CANCELLED],

and then delete the if states is None: conditional?


logger.info(f"Delete {batch_deleted}/{len(flow_runs)} flow runs (total: {deleted_total})")
if failed_deletes:
logger.warning(f"Failed to delete {len(failed_deletes)} flow runs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this mostly duplicates the logger message on 364
not sure if you need both

logger.warning(f"Failed to delete {len(failed_deletes)} flow runs")

# Get next batch
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter, limit=batch_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are flows that fail to delete or set to CRASHED will this query keep retrieving them and trying to delete/CRASH them?
maybe that is expected and fine, but if there are enough runs that cannot be deleted/CRASHed and the batch size is small enough, then I think this could end up in an infinite loop where it keeps retrieving the same runs and failing to delete them

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

group/backend Issue related to the backend (API Server, Git Agent)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants