Skip to content

Commit 548bed0

Browse files
committed
create db vacuum service to periodically delete old prefect resources
1 parent 7fcfb7e commit 548bed0

File tree

12 files changed

+950
-3
lines changed

12 files changed

+950
-3
lines changed

src/prefect/server/database/query_components.py

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import sqlalchemy as sa
1818
from cachetools import Cache, TTLCache
19-
from jinja2 import Environment, PackageLoader, select_autoescape
19+
from jinja2 import Environment, PackageLoader, Template, select_autoescape
2020
from sqlalchemy import orm
2121
from sqlalchemy.dialects import postgresql, sqlite
2222
from sqlalchemy.exc import NoResultFound
@@ -114,6 +114,12 @@ def make_timestamp_intervals(
114114
interval: datetime.timedelta,
115115
) -> sa.Select[tuple[datetime.datetime, datetime.datetime]]: ...
116116

117+
def _get_jinja_template(self, template_path: str) -> Template:
118+
return jinja_env.get_template(self._get_query_template_path(template_path))
119+
120+
@abstractmethod
121+
def _get_query_template_path(self, template_path: str) -> str: ...
122+
117123
@abstractmethod
118124
def set_state_id_on_inserted_flow_runs_statement(
119125
self,
@@ -538,6 +544,76 @@ async def _get_flow_run_graph_states(
538544
GraphState.model_validate(state, from_attributes=True) for state in states
539545
]
540546

547+
async def delete_old_flow_runs(
548+
self, session: AsyncSession, before: datetime.datetime, limit: int
549+
) -> tuple[int, int]:
550+
"""
551+
Delete old flow runs and return counts.
552+
553+
Args:
554+
session: Database session
555+
before: Delete flow runs created before this timestamp
556+
limit: Maximum number of task runs to delete (determines flow run count)
557+
558+
Returns:
559+
Tuple of (flow_run_count, task_run_count)
560+
"""
561+
query = sa.text(
562+
self._get_jinja_template(
563+
template_path="delete-old-flow-runs.sql.jinja"
564+
).render()
565+
).bindparams(
566+
sa.bindparam("before", before, type_=Timestamp),
567+
sa.bindparam("limit", limit, type_=sa.Integer),
568+
)
569+
570+
result = await session.execute(query)
571+
572+
# The query returns a single row with two columns: flow_run_count, task_run_count
573+
# If no rows were deleted, the query returns no results
574+
row = result.one_or_none()
575+
if row is None:
576+
return (0, 0)
577+
return (row.flow_run_count, row.task_run_count)
578+
579+
async def delete_old_logs(self, session: AsyncSession, limit: int) -> int:
580+
"""
581+
Delete old logs.
582+
583+
Args:
584+
session: Database session
585+
limit: Maximum number of logs to delete
586+
587+
Returns:
588+
Number of logs deleted
589+
"""
590+
query = sa.text(
591+
self._get_jinja_template(template_path="delete-old-logs.sql.jinja").render()
592+
).bindparams(limit=limit)
593+
594+
result = await session.execute(query)
595+
return result.scalar_one_or_none() or 0
596+
597+
async def delete_old_artifacts(self, session: AsyncSession, limit: int) -> int:
598+
"""
599+
Delete old artifacts that have no associated flow runs.
600+
601+
Args:
602+
session: Database session
603+
limit: Maximum number of artifacts to delete
604+
605+
Returns:
606+
Number of artifacts deleted
607+
"""
608+
query = sa.text(
609+
self._get_jinja_template(
610+
template_path="delete-old-artifacts.sql.jinja"
611+
).render()
612+
).bindparams(limit=limit)
613+
614+
result = await session.execute(query)
615+
return result.scalar_one_or_none() or 0
616+
541617

542618
class AsyncPostgresQueryComponents(BaseQueryComponents):
543619
# --- Postgres-specific SqlAlchemy bindings
@@ -585,6 +661,9 @@ def make_timestamp_intervals(
585661
.limit(500) # grab at most 500 intervals
586662
)
587663

664+
def _get_query_template_path(self, template_path: str) -> str:
665+
return f"postgres/{template_path}"
666+
588667
@db_injector
589668
def set_state_id_on_inserted_flow_runs_statement(
590669
self,
@@ -614,7 +693,7 @@ def _get_scheduled_flow_runs_from_work_pool_template_path(self) -> str:
614693
"""
615694
Template for the query to get scheduled flow runs from a work pool
616695
"""
617-
return "postgres/get-runs-from-worker-queues.sql.jinja"
696+
return self._get_query_template_path("get-runs-from-worker-queues.sql.jinja")
618697

619698
@db_injector
620699
def _build_flow_run_graph_v2_query(
@@ -847,6 +926,9 @@ def make_timestamp_intervals(
847926

848927
return sa.select(cte.c.interval_start, cte.c.interval_end)
849928

929+
def _get_query_template_path(self, template_path: str) -> str:
930+
return f"sqlite/{template_path}"
931+
850932
@db_injector
851933
def set_state_id_on_inserted_flow_runs_statement(
852934
self,
@@ -936,7 +1018,7 @@ def _get_scheduled_flow_runs_from_work_pool_template_path(self) -> str:
9361018
"""
9371019
Template for the query to get scheduled flow runs from a work pool
9381020
"""
939-
return "sqlite/get-runs-from-worker-queues.sql.jinja"
1021+
return self._get_query_template_path("get-runs-from-worker-queues.sql.jinja")
9401022

9411023
@db_injector
9421024
def _build_flow_run_graph_v2_query(
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- Deletes artifacts in batch that have no associated flow runs with them - meaning their associated flow runs have been deleted.
2+
--
3+
-- The query attempts to make this deletion efficient by:
4+
-- - utilizing a FOR UPDATE SKIP LOCKED for concurrent worker safety
5+
-- - DELETE...USING for efficient JOIN (faster than WHERE IN)
6+
--
7+
-- Parameters:
8+
-- :limit - Maximum number of artifacts to delete
9+
10+
WITH doomed_artifacts AS (
11+
SELECT artifact.id as id
12+
FROM artifact
13+
LEFT OUTER JOIN flow_run ON artifact.flow_run_id = flow_run.id
14+
WHERE flow_run.id IS NULL
15+
ORDER BY artifact.flow_run_id, artifact.created ASC
16+
LIMIT :limit
17+
FOR UPDATE SKIP LOCKED
18+
)
19+
DELETE FROM artifact
20+
USING doomed_artifacts
21+
WHERE artifact.id = doomed_artifacts.id
22+
RETURNING COUNT(*)
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
-- Deletes old flow runs before the cutoff date.
2+
--
3+
-- Because flow runs cascade delete child task runs, the idea is to limit the amount of tasks deleted by this query.
4+
-- If the query limited the amount of flow runs deleted, this could potentially delete an unbounded amount of task runs.
5+
-- Therefore, the query joins flow runs to their children tasks, sorts by the flow run id, and takes children task runs
6+
-- up to the limit, and only then takes the distinct flow run ids for deletion. For example, a limit of 1000 may only
7+
-- delete 500 flow runs if there are 2 children task runs per flow run, but it will delete 1000 task runs for a total
8+
-- of 1500 records.
9+
--
10+
-- Features:
11+
-- - CTE with FOR UPDATE SKIP LOCKED for concurrent worker safety
12+
-- - DELETE...USING for efficient JOIN (faster than WHERE IN)
13+
-- - Returns deleted flow run count and cascade deleted task run count
14+
--
15+
-- Parameters:
16+
-- :before - Delete flow runs before this timestamp
17+
-- :limit - Maximum number of flow runs to delete in this batch
18+
19+
WITH doomed_flow_runs AS (
20+
SELECT flow_run.id AS flow_run_id, task_run.id AS task_run_id
21+
FROM flow_run
22+
LEFT JOIN task_run ON task_run.flow_run_id = flow_run.id
23+
WHERE flow_run.created < :before AND flow_run.parent_task_run_id IS NULL
24+
ORDER BY flow_run.id
25+
FOR UPDATE SKIP LOCKED LIMIT :limit
26+
), flow_run_ids AS (
27+
SELECT DISTINCT flow_run_id AS id
28+
FROM doomed_flow_runs
29+
), doomed_task_runs AS (
30+
SELECT id
31+
FROM task_run
32+
USING flow_run_ids AS flow_run
33+
WHERE task_run.flow_run_id = flow_run.id
34+
), deleted_flow_runs AS (
35+
DELETE FROM flow_run
36+
USING flow_run_ids AS doomed_flow_run
37+
WHERE flow_run.id = doomed_flow_run.id
38+
RETURNING flow_run.id
39+
)
40+
SELECT
41+
(COUNT(*) FROM deleted_flow_runs) AS flow_run_count,
42+
(COUNT(*) FROM doomed_task_runs) AS task_run_count
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- Deletes logs in batch that have no associated flow runs with them - meaning their associated flow runs have been deleted.
2+
--
3+
-- The query attempts to make this deletion efficient by:
4+
-- - utilizing a FOR UPDATE SKIP LOCKED for concurrent worker safety
5+
-- - DELETE...USING for efficient JOIN (faster than WHERE IN)
6+
--
7+
-- Parameters:
8+
-- :limit - Maximum number of logs to delete
9+
10+
WITH doomed_logs AS (
11+
SELECT logs.id as id
12+
FROM logs
13+
LEFT OUTER JOIN flow_run ON logs.flow_run_id = flow_run.id
14+
WHERE flow_run.id IS NULL
15+
LIMIT :limit
16+
ORDER BY logs.flow_run_id, created ASC
17+
FOR UPDATE SKIP LOCKED
18+
),
19+
DELETE FROM logs
20+
USING doomed_logs
21+
WHERE logs.id = doomed_logs.id
22+
RETURNING COUNT(*)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-- Deletes artifacts in batch that have no associated flow runs with them - meaning their associated flow runs have been deleted.
2+
--
3+
-- SQLite version - has some limitations compared to PostgreSQL:
4+
-- - No DELETE...USING (uses WHERE IN with subquery instead)
5+
-- - No FOR UPDATE SKIP LOCKED (not needed for SQLite's locking model)
6+
-- - RETURNING is supported in SQLite 3.35+
7+
--
8+
-- Parameters:
9+
-- :limit - Maximum number of artifacts to delete
10+
11+
WITH doomed_artifacts AS (
12+
SELECT artifact.id as id
13+
FROM artifact
14+
LEFT OUTER JOIN flow_run ON artifact.flow_run_id = flow_run.id
15+
WHERE flow_run.id IS NULL
16+
ORDER BY artifact.flow_run_id, artifact.created ASC
17+
LIMIT :limit
18+
)
19+
DELETE FROM artifact
20+
WHERE id IN (SELECT id FROM doomed_artifacts)
21+
RETURNING (SELECT COUNT(*) FROM doomed_artifacts)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
-- Deletes old flow runs before the cutoff date (SQLite version)
2+
--
3+
-- Because flow runs cascade delete child task runs, the idea is to limit the amount of tasks deleted by this query.
4+
-- If the query limited the amount of flow runs deleted, this could potentially delete an unbounded amount of task runs.
5+
-- Therefore, the query joins flow runs to their children tasks, sorts by the flow run id, and takes children task runs
6+
-- up to the limit, and only then takes the distinct flow run ids for deletion. For example, a limit of 1000 may only
7+
-- delete 500 flow runs if there are 2 children task runs per flow run, but it will delete 1000 task runs for a total
8+
-- of 1500 records.
9+
--
10+
-- SQLite limitations:
11+
-- - No DELETE...USING (use WHERE IN with subquery instead)
12+
-- - No FOR UPDATE SKIP LOCKED (not needed for SQLite's locking model)
13+
-- - RETURNING is supported in SQLite 3.35+
14+
-- - Returns deleted flow run count and cascade deleted task run count
15+
--
16+
-- Parameters:
17+
-- :before - Delete flow runs before this timestamp
18+
-- :limit - Maximum number of flow runs to delete in this batch
19+
20+
WITH doomed_flow_runs AS (
21+
SELECT flow_run.id AS flow_run_id, task_run.id AS task_run_id
22+
FROM flow_run
23+
LEFT JOIN task_run ON task_run.flow_run_id = flow_run.id
24+
WHERE flow_run.created < :before AND flow_run.parent_task_run_id IS NULL
25+
ORDER BY flow_run.id
26+
LIMIT :limit
27+
), flow_run_ids AS (
28+
SELECT DISTINCT flow_run_id AS id
29+
FROM doomed_flow_runs
30+
), doomed_task_runs AS (
31+
SELECT task_run.id
32+
FROM task_run
33+
WHERE task_run.flow_run_id IN (SELECT id FROM flow_run_ids)
34+
), counts AS (
35+
SELECT
36+
(SELECT COUNT(*) FROM flow_run_ids) AS flow_run_count,
37+
(SELECT COUNT(*) FROM doomed_task_runs) AS task_run_count
38+
)
39+
DELETE FROM flow_run
40+
WHERE id IN (SELECT id FROM flow_run_ids)
41+
RETURNING (SELECT flow_run_count FROM counts) AS flow_run_count, (SELECT task_run_count FROM counts) AS task_run_count
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-- Deletes logs in batch that have no associated flow runs with them - meaning their associated flow runs have been deleted.
2+
--
3+
-- SQLite version - has some limitations compared to PostgreSQL:
4+
-- - No DELETE...USING (uses WHERE IN with subquery instead)
5+
-- - No FOR UPDATE SKIP LOCKED (not needed for SQLite's locking model)
6+
-- - RETURNING is supported in SQLite 3.35+
7+
--
8+
-- Parameters:
9+
-- :limit - Maximum number of logs to delete
10+
11+
WITH doomed_logs AS (
12+
SELECT log.id as id
13+
FROM log
14+
LEFT OUTER JOIN flow_run ON log.flow_run_id = flow_run.id
15+
WHERE flow_run.id IS NULL
16+
ORDER BY log.flow_run_id, log.created ASC
17+
LIMIT :limit
18+
)
19+
DELETE FROM log
20+
WHERE id IN (SELECT id FROM doomed_logs)
21+
RETURNING (SELECT COUNT(*) FROM doomed_logs)

src/prefect/server/services/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import prefect.server.services.cancellation_cleanup
2+
import prefect.server.services.db_vacuum
23
import prefect.server.services.foreman
34
import prefect.server.services.late_runs
45
import prefect.server.services.pause_expirations

src/prefect/server/services/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def _known_service_modules() -> list[ModuleType]:
3939
from prefect.server.logs import stream as logs_stream
4040
from prefect.server.services import (
4141
cancellation_cleanup,
42+
db_vacuum,
4243
foreman,
4344
late_runs,
4445
pause_expirations,
@@ -51,6 +52,7 @@ def _known_service_modules() -> list[ModuleType]:
5152
return [
5253
# Orchestration services
5354
cancellation_cleanup,
55+
db_vacuum,
5456
foreman,
5557
late_runs,
5658
pause_expirations,

0 commit comments

Comments
 (0)