Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 85 additions & 3 deletions src/prefect/server/database/query_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import sqlalchemy as sa
from cachetools import Cache, TTLCache
from jinja2 import Environment, PackageLoader, select_autoescape
from jinja2 import Environment, PackageLoader, Template, select_autoescape
from sqlalchemy import orm
from sqlalchemy.dialects import postgresql, sqlite
from sqlalchemy.exc import NoResultFound
Expand Down Expand Up @@ -114,6 +114,12 @@ def make_timestamp_intervals(
interval: datetime.timedelta,
) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]: ...

def _get_jinja_template(self, template_path: str) -> Template:
return jinja_env.get_template(self._get_query_template_path(template_path))

@abstractmethod
def _get_query_template_path(self, template_path: str) -> str: ...

@abstractmethod
def set_state_id_on_inserted_flow_runs_statement(
self,
Expand Down Expand Up @@ -538,6 +544,76 @@ async def _get_flow_run_graph_states(
GraphState.model_validate(state, from_attributes=True) for state in states
]

async def delete_old_flow_runs(
self, session: AsyncSession, before: datetime.datetime, limit: int
) -> tuple[int, int]:
"""
Delete old flow runs and return counts.

Args:
session: Database session
before: Delete flow runs created before this timestamp
limit: Maximum number of task runs to delete (determines flow run count)

Returns:
Tuple of (flow_run_count, task_run_count)
"""
query = sa.text(
self._get_jinja_template(
template_path="delete-old-flow-runs.sql.jinja"
).render()
).bindparams(
sa.bindparam("before", before, type_=Timestamp),
sa.bindparam("limit", limit, type_=sa.Integer),
)

result = await session.execute(query)

# The query returns a single row with two columns: flow_run_count, task_run_count
# If no rows were deleted, the query returns no results
row = result.one_or_none()
if row is None:
return (0, 0)
return (row.flow_run_count, row.task_run_count)

async def delete_old_logs(self, session: AsyncSession, limit: int) -> int:
"""
Delete old logs.

Args:
session: Database session
limit: Maximum number of logs to delete

Returns:
Number of logs deleted
"""
query = sa.text(
self._get_jinja_template(template_path="delete-old-logs.sql.jinja").render()
).bindparams(limit=limit)

result = await session.execute(query)
return result.scalar_one_or_none() or 0

async def delete_old_artifacts(self, session: AsyncSession, limit: int) -> int:
"""
Delete old artifacts that have no associated flow runs.

Args:
session: Database session
limit: Maximum number of artifacts to delete

Returns:
Number of artifacts deleted
"""
query = sa.text(
self._get_jinja_template(
template_path="delete-old-artifacts.sql.jinja"
).render()
).bindparams(limit=limit)

result = await session.execute(query)
return result.scalar_one_or_none() or 0


class AsyncPostgresQueryComponents(BaseQueryComponents):
# --- Postgres-specific SqlAlchemy bindings
Expand Down Expand Up @@ -585,6 +661,9 @@ def make_timestamp_intervals(
.limit(500) # grab at most 500 intervals
)

def _get_query_template_path(self, template_path: str) -> str:
return f"postgres/{template_path}"

@db_injector
def set_state_id_on_inserted_flow_runs_statement(
self,
Expand Down Expand Up @@ -614,7 +693,7 @@ def _get_scheduled_flow_runs_from_work_pool_template_path(self) -> str:
"""
Template for the query to get scheduled flow runs from a work pool
"""
return "postgres/get-runs-from-worker-queues.sql.jinja"
return self._get_query_template_path("get-runs-from-worker-queues.sql.jinja")

@db_injector
def _build_flow_run_graph_v2_query(
Expand Down Expand Up @@ -847,6 +926,9 @@ def make_timestamp_intervals(

return sa.select(cte.c.interval_start, cte.c.interval_end)

def _get_query_template_path(self, template_path: str) -> str:
return f"sqlite/{template_path}"

@db_injector
def set_state_id_on_inserted_flow_runs_statement(
self,
Expand Down Expand Up @@ -936,7 +1018,7 @@ def _get_scheduled_flow_runs_from_work_pool_template_path(self) -> str:
"""
Template for the query to get scheduled flow runs from a work pool
"""
return "sqlite/get-runs-from-worker-queues.sql.jinja"
return self._get_query_template_path("get-runs-from-worker-queues.sql.jinja")

@db_injector
def _build_flow_run_graph_v2_query(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- Deletes artifacts in batch that have no associated flow runs with them - meaning their associated flow runs have been deleted.
--
-- The query attempts to make this deletion efficient by:
-- - utilizing a FOR UPDATE SKIP LOCKED for concurrent worker safety
-- - DELETE...USING for efficient JOIN (faster than WHERE IN)
--
-- Parameters:
-- :limit - Maximum number of artifacts to delete

WITH doomed_artifacts AS (
SELECT artifact.id as id
FROM artifact
LEFT OUTER JOIN flow_run ON artifact.flow_run_id = flow_run.id
WHERE flow_run.id IS NULL
ORDER BY artifact.flow_run_id, artifact.created ASC
LIMIT :limit
FOR UPDATE SKIP LOCKED
)
DELETE FROM artifact
USING doomed_artifacts
WHERE artifact.id = doomed_artifacts.id
RETURNING COUNT(*)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- Deletes old flow runs before the cutoff date.
--
-- Because flow runs cascade delete child task runs, the idea is to limit the amount of tasks deleted by this query.
-- If the query limited the amount of flow runs deleted, this could potentially delete an unbounded amount of task runs.
-- Therefore, the query joins flow runs to their children tasks, sorts by the flow run id, and takes children task runs
-- up to the limit, and only then takes the distinct flow run ids for deletion. For example, a limit of 1000 may only
-- delete 500 flow runs if there are 2 children task runs per flow run, but it will delete 1000 task runs for a total
-- of 1500 records.
--
-- Features:
-- - CTE with FOR UPDATE SKIP LOCKED for concurrent worker safety
-- - DELETE...USING for efficient JOIN (faster than WHERE IN)
-- - Returns deleted flow run count and cascade deleted task run count
--
-- Parameters:
-- :before - Delete flow runs before this timestamp
-- :limit - Maximum number of flow runs to delete in this batch

WITH doomed_flow_runs AS (
SELECT flow_run.id AS flow_run_id, task_run.id AS task_run_id
FROM flow_run
LEFT JOIN task_run ON task_run.flow_run_id = flow_run.id
WHERE flow_run.created < :before AND flow_run.parent_task_run_id IS NULL
ORDER BY flow_run.id
FOR UPDATE SKIP LOCKED LIMIT :limit
), flow_run_ids AS (
SELECT DISTINCT flow_run_id AS id
FROM doomed_flow_runs
), doomed_task_runs AS (
SELECT id
FROM task_run
USING flow_run_ids AS flow_run
WHERE task_run.flow_run_id = flow_run.id
), deleted_flow_runs AS (
DELETE FROM flow_run
USING flow_run_ids AS doomed_flow_run
WHERE flow_run.id = doomed_flow_run.id
RETURNING flow_run.id
)
SELECT
(COUNT(*) FROM deleted_flow_runs) AS flow_run_count,
(COUNT(*) FROM doomed_task_runs) AS task_run_count
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- Deletes logs in batch that have no associated flow runs with them - meaning their associated flow runs have been deleted.
--
-- The query attempts to make this deletion efficient by:
-- - utilizing a FOR UPDATE SKIP LOCKED for concurrent worker safety
-- - DELETE...USING for efficient JOIN (faster than WHERE IN)
--
-- Parameters:
-- :limit - Maximum number of logs to delete

WITH doomed_logs AS (
SELECT logs.id as id
FROM logs
LEFT OUTER JOIN flow_run ON logs.flow_run_id = flow_run.id
WHERE flow_run.id IS NULL
LIMIT :limit
ORDER BY logs.flow_run_id, created ASC
FOR UPDATE SKIP LOCKED
),
DELETE FROM logs
USING doomed_logs
WHERE logs.id = doomed_logs.id
RETURNING COUNT(*)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Deletes artifacts in batch that have no associated flow runs with them - meaning their associated flow runs have been deleted.
--
-- SQLite version - has some limitations compared to PostgreSQL:
-- - No DELETE...USING (uses WHERE IN with subquery instead)
-- - No FOR UPDATE SKIP LOCKED (not needed for SQLite's locking model)
-- - RETURNING is supported in SQLite 3.35+
--
-- Parameters:
-- :limit - Maximum number of artifacts to delete

WITH doomed_artifacts AS (
SELECT artifact.id as id
FROM artifact
LEFT OUTER JOIN flow_run ON artifact.flow_run_id = flow_run.id
WHERE flow_run.id IS NULL
ORDER BY artifact.flow_run_id, artifact.created ASC
LIMIT :limit
)
DELETE FROM artifact
WHERE id IN (SELECT id FROM doomed_artifacts)
RETURNING (SELECT COUNT(*) FROM doomed_artifacts)
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- Deletes old flow runs before the cutoff date (SQLite version)
--
-- Because flow runs cascade delete child task runs, the idea is to limit the amount of tasks deleted by this query.
-- If the query limited the amount of flow runs deleted, this could potentially delete an unbounded amount of task runs.
-- Therefore, the query joins flow runs to their children tasks, sorts by the flow run id, and takes children task runs
-- up to the limit, and only then takes the distinct flow run ids for deletion. For example, a limit of 1000 may only
-- delete 500 flow runs if there are 2 children task runs per flow run, but it will delete 1000 task runs for a total
-- of 1500 records.
--
-- SQLite limitations:
-- - No DELETE...USING (use WHERE IN with subquery instead)
-- - No FOR UPDATE SKIP LOCKED (not needed for SQLite's locking model)
-- - RETURNING is supported in SQLite 3.35+
-- - Returns deleted flow run count and cascade deleted task run count
--
-- Parameters:
-- :before - Delete flow runs before this timestamp
-- :limit - Maximum number of flow runs to delete in this batch

WITH doomed_flow_runs AS (
SELECT flow_run.id AS flow_run_id, task_run.id AS task_run_id
FROM flow_run
LEFT JOIN task_run ON task_run.flow_run_id = flow_run.id
WHERE flow_run.created < :before AND flow_run.parent_task_run_id IS NULL
ORDER BY flow_run.id
LIMIT :limit
), flow_run_ids AS (
SELECT DISTINCT flow_run_id AS id
FROM doomed_flow_runs
), doomed_task_runs AS (
SELECT task_run.id
FROM task_run
WHERE task_run.flow_run_id IN (SELECT id FROM flow_run_ids)
), counts AS (
SELECT
(SELECT COUNT(*) FROM flow_run_ids) AS flow_run_count,
(SELECT COUNT(*) FROM doomed_task_runs) AS task_run_count
)
DELETE FROM flow_run
WHERE id IN (SELECT id FROM flow_run_ids)
RETURNING (SELECT flow_run_count FROM counts) AS flow_run_count, (SELECT task_run_count FROM counts) AS task_run_count
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Deletes logs in batch that have no associated flow runs with them - meaning their associated flow runs have been deleted.
--
-- SQLite version - has some limitations compared to PostgreSQL:
-- - No DELETE...USING (uses WHERE IN with subquery instead)
-- - No FOR UPDATE SKIP LOCKED (not needed for SQLite's locking model)
-- - RETURNING is supported in SQLite 3.35+
--
-- Parameters:
-- :limit - Maximum number of logs to delete

WITH doomed_logs AS (
SELECT log.id as id
FROM log
LEFT OUTER JOIN flow_run ON log.flow_run_id = flow_run.id
WHERE flow_run.id IS NULL
ORDER BY log.flow_run_id, log.created ASC
LIMIT :limit
)
DELETE FROM log
WHERE id IN (SELECT id FROM doomed_logs)
RETURNING (SELECT COUNT(*) FROM doomed_logs)
1 change: 1 addition & 0 deletions src/prefect/server/services/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import prefect.server.services.cancellation_cleanup
import prefect.server.services.db_vacuum
import prefect.server.services.foreman
import prefect.server.services.late_runs
import prefect.server.services.pause_expirations
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/server/services/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def _known_service_modules() -> list[ModuleType]:
from prefect.server.logs import stream as logs_stream
from prefect.server.services import (
cancellation_cleanup,
db_vacuum,
foreman,
late_runs,
pause_expirations,
Expand All @@ -51,6 +52,7 @@ def _known_service_modules() -> list[ModuleType]:
return [
# Orchestration services
cancellation_cleanup,
db_vacuum,
foreman,
late_runs,
pause_expirations,
Expand Down
Loading
Loading