Skip to content

Commit eb60174

Browse files
authored
Add new arguments to db_clean to explicitly include or exclude DAGs (#56663)
* Specify DAG IDs to include or exclude from db_clean * Fix _confirm_delete * Add documentation * Fix tables without a dag_id_column, and keep last for dag_version * Formatting, add more tests * Fix deprecated sqlalchemy usage
1 parent c680d8e commit eb60174

File tree

6 files changed

+235
-15
lines changed

6 files changed

+235
-15
lines changed

airflow-core/docs/howto/usage-cli.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ The ``db clean`` command works by deleting from each table the records older tha
213213

214214
You can optionally provide a list of tables to perform deletes on. If no list of tables is supplied, all tables will be included.
215215

216+
You can filter cleanup to specific DAGs using ``--dag-ids`` (comma-separated list), or exclude specific DAGs using ``--exclude-dag-ids`` (comma-separated list). These options allow you to target or avoid cleanup for particular DAGs.
217+
216218
You can use the ``--dry-run`` option to print the row counts in the primary tables to be cleaned.
217219

218220
By default, ``db clean`` will archive purged rows in tables of the form ``_airflow_deleted__<table>__<timestamp>``. If you don't want the data preserved in this way, you may supply argument ``--skip-archive``.

airflow-core/src/airflow/cli/cli_config.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,18 @@ def string_lower_type(val):
520520
"Lower values reduce long-running locks but increase the number of batches."
521521
),
522522
)
523+
ARG_DAG_IDS = Arg(
524+
("--dag-ids",),
525+
default=None,
526+
help="Only cleanup data related to the given dag_id",
527+
type=string_list_type,
528+
)
529+
ARG_EXCLUDE_DAG_IDS = Arg(
530+
("--exclude-dag-ids",),
531+
default=None,
532+
help="Avoid cleaning up data related to the given dag_ids",
533+
type=string_list_type,
534+
)
523535

524536
# pool
525537
ARG_POOL_NAME = Arg(("pool",), metavar="NAME", help="Pool name")
@@ -1527,6 +1539,8 @@ class GroupCommand(NamedTuple):
15271539
ARG_YES,
15281540
ARG_DB_SKIP_ARCHIVE,
15291541
ARG_DB_BATCH_SIZE,
1542+
ARG_DAG_IDS,
1543+
ARG_EXCLUDE_DAG_IDS,
15301544
),
15311545
),
15321546
ActionCommand(

airflow-core/src/airflow/cli/commands/db_command.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,8 @@ def cleanup_tables(args):
301301
confirm=not args.yes,
302302
skip_archive=args.skip_archive,
303303
batch_size=args.batch_size,
304+
dag_ids=args.dag_ids,
305+
exclude_dag_ids=args.exclude_dag_ids,
304306
)
305307

306308

airflow-core/src/airflow/utils/db_cleanup.py

Lines changed: 73 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ class _TableConfig:
7979
table_name: str
8080
recency_column_name: str
8181
extra_columns: list[str] | None = None
82+
dag_id_column_name: str | None = None
8283
keep_last: bool = False
8384
keep_last_filters: Any | None = None
8485
keep_last_group_by: Any | None = None
@@ -89,9 +90,19 @@ class _TableConfig:
8990

9091
def __post_init__(self):
9192
self.recency_column = column(self.recency_column_name)
92-
self.orm_model: Base = table(
93-
self.table_name, *[column(x) for x in self.extra_columns or []], self.recency_column
94-
)
93+
if self.dag_id_column_name is None:
94+
self.dag_id_column = None
95+
self.orm_model: Base = table(
96+
self.table_name, *[column(x) for x in self.extra_columns or []], self.recency_column
97+
)
98+
else:
99+
self.dag_id_column = column(self.dag_id_column_name)
100+
self.orm_model: Base = table(
101+
self.table_name,
102+
*[column(x) for x in self.extra_columns or []],
103+
self.dag_id_column,
104+
self.recency_column,
105+
)
95106

96107
def __lt__(self, other):
97108
return self.table_name < other.table_name
@@ -101,41 +112,47 @@ def readable_config(self):
101112
return {
102113
"table": self.orm_model.name,
103114
"recency_column": str(self.recency_column),
115+
"dag_id_column": str(self.dag_id_column),
104116
"keep_last": self.keep_last,
105117
"keep_last_filters": [str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
106118
"keep_last_group_by": str(self.keep_last_group_by),
107119
}
108120

109121

110122
config_list: list[_TableConfig] = [
111-
_TableConfig(table_name="job", recency_column_name="latest_heartbeat"),
123+
_TableConfig(table_name="job", recency_column_name="latest_heartbeat", dag_id_column_name="dag_id"),
112124
_TableConfig(
113125
table_name="dag",
114126
recency_column_name="last_parsed_time",
115127
dependent_tables=["dag_version", "deadline"],
128+
dag_id_column_name="dag_id",
116129
),
117130
_TableConfig(
118131
table_name="dag_run",
119132
recency_column_name="start_date",
133+
dag_id_column_name="dag_id",
120134
extra_columns=["dag_id", "run_type"],
121135
keep_last=True,
122136
keep_last_filters=[column("run_type") != DagRunType.MANUAL],
123137
keep_last_group_by=["dag_id"],
124138
dependent_tables=["task_instance", "deadline"],
125139
),
126-
_TableConfig(table_name="asset_event", recency_column_name="timestamp"),
140+
_TableConfig(table_name="asset_event", recency_column_name="timestamp", dag_id_column_name="dag_id"),
127141
_TableConfig(table_name="import_error", recency_column_name="timestamp"),
128-
_TableConfig(table_name="log", recency_column_name="dttm"),
129-
_TableConfig(table_name="sla_miss", recency_column_name="timestamp"),
142+
_TableConfig(table_name="log", recency_column_name="dttm", dag_id_column_name="dag_id"),
143+
_TableConfig(table_name="sla_miss", recency_column_name="timestamp", dag_id_column_name="dag_id"),
130144
_TableConfig(
131145
table_name="task_instance",
132146
recency_column_name="start_date",
133147
dependent_tables=["task_instance_history", "xcom"],
148+
dag_id_column_name="dag_id",
149+
),
150+
_TableConfig(
151+
table_name="task_instance_history", recency_column_name="start_date", dag_id_column_name="dag_id"
134152
),
135-
_TableConfig(table_name="task_instance_history", recency_column_name="start_date"),
136-
_TableConfig(table_name="task_reschedule", recency_column_name="start_date"),
137-
_TableConfig(table_name="xcom", recency_column_name="timestamp"),
138-
_TableConfig(table_name="_xcom_archive", recency_column_name="timestamp"),
153+
_TableConfig(table_name="task_reschedule", recency_column_name="start_date", dag_id_column_name="dag_id"),
154+
_TableConfig(table_name="xcom", recency_column_name="timestamp", dag_id_column_name="dag_id"),
155+
_TableConfig(table_name="_xcom_archive", recency_column_name="timestamp", dag_id_column_name="dag_id"),
139156
_TableConfig(table_name="callback_request", recency_column_name="created_at"),
140157
_TableConfig(table_name="celery_taskmeta", recency_column_name="date_done"),
141158
_TableConfig(table_name="celery_tasksetmeta", recency_column_name="date_done"),
@@ -148,8 +165,11 @@ def readable_config(self):
148165
table_name="dag_version",
149166
recency_column_name="created_at",
150167
dependent_tables=["task_instance", "dag_run"],
168+
dag_id_column_name="dag_id",
169+
keep_last=True,
170+
keep_last_group_by=["dag_id"],
151171
),
152-
_TableConfig(table_name="deadline", recency_column_name="deadline_time"),
172+
_TableConfig(table_name="deadline", recency_column_name="deadline_time", dag_id_column_name="dag_id"),
153173
]
154174

155175
# We need to have `fallback="database"` because this is executed at top level code and provider configuration
@@ -308,13 +328,25 @@ def _build_query(
308328
keep_last_group_by,
309329
clean_before_timestamp: DateTime,
310330
session: Session,
331+
dag_id_column=None,
332+
dag_ids: list[str] | None = None,
333+
exclude_dag_ids: list[str] | None = None,
311334
**kwargs,
312335
) -> Query:
313336
base_table_alias = "base"
314337
base_table = aliased(orm_model, name=base_table_alias)
315338
query = session.query(base_table).with_entities(text(f"{base_table_alias}.*"))
316339
base_table_recency_col = base_table.c[recency_column.name]
317340
conditions = [base_table_recency_col < clean_before_timestamp]
341+
342+
if (dag_ids or exclude_dag_ids) and dag_id_column is not None:
343+
base_table_dag_id_col = base_table.c[dag_id_column.name]
344+
345+
if dag_ids:
346+
conditions.append(base_table_dag_id_col.in_(dag_ids))
347+
if exclude_dag_ids:
348+
conditions.append(base_table_dag_id_col.not_in(exclude_dag_ids))
349+
318350
if keep_last:
319351
max_date_col_name = "max_date_per_group"
320352
group_by_columns: list[Any] = [column(x) for x in keep_last_group_by]
@@ -345,6 +377,9 @@ def _cleanup_table(
345377
keep_last_filters,
346378
keep_last_group_by,
347379
clean_before_timestamp: DateTime,
380+
dag_id_column=None,
381+
dag_ids=None,
382+
exclude_dag_ids=None,
348383
dry_run: bool = True,
349384
verbose: bool = False,
350385
skip_archive: bool = False,
@@ -358,6 +393,9 @@ def _cleanup_table(
358393
query = _build_query(
359394
orm_model=orm_model,
360395
recency_column=recency_column,
396+
dag_id_column=dag_id_column,
397+
dag_ids=dag_ids,
398+
exclude_dag_ids=exclude_dag_ids,
361399
keep_last=keep_last,
362400
keep_last_filters=keep_last_filters,
363401
keep_last_group_by=keep_last_group_by,
@@ -380,10 +418,18 @@ def _cleanup_table(
380418
session.commit()
381419

382420

383-
def _confirm_delete(*, date: DateTime, tables: list[str]) -> None:
421+
def _confirm_delete(
422+
*,
423+
date: DateTime,
424+
tables: list[str],
425+
dag_ids: list[str] | None = None,
426+
exclude_dag_ids: list[str] | None = None,
427+
) -> None:
384428
for_tables = f" for tables {tables!r}" if tables else ""
429+
for_dags = f" for the following dags: {dag_ids!r}" if dag_ids else ""
430+
excluding_dags = f" excluding the following dags: {exclude_dag_ids!r}" if exclude_dag_ids else ""
385431
question = (
386-
f"You have requested that we purge all data prior to {date}{for_tables}.\n"
432+
f"You have requested that we purge all data prior to {date}{for_tables}{for_dags}{excluding_dags}.\n"
387433
f"This is irreversible. Consider backing up the tables first and / or doing a dry run "
388434
f"with option --dry-run.\n"
389435
f"Enter 'delete rows' (without quotes) to proceed."
@@ -493,6 +539,8 @@ def run_cleanup(
493539
*,
494540
clean_before_timestamp: DateTime,
495541
table_names: list[str] | None = None,
542+
dag_ids: list[str] | None = None,
543+
exclude_dag_ids: list[str] | None = None,
496544
dry_run: bool = False,
497545
verbose: bool = False,
498546
confirm: bool = True,
@@ -513,6 +561,9 @@ def run_cleanup(
513561
:param clean_before_timestamp: The timestamp before which data should be purged
514562
:param table_names: Optional. List of table names to perform maintenance on. If list not provided,
515563
will perform maintenance on all tables.
564+
:param dag_ids: Optional. List of dag ids to perform maintenance on. If list not provided,
565+
will perform maintenance on all dags.
566+
:param exclude_dag_ids: Optional. List of dag ids to exclude from maintenance.
516567
:param dry_run: If true, print rows meeting deletion criteria
517568
:param verbose: If true, may provide more detailed output.
518569
:param confirm: Require user input to confirm before processing deletions.
@@ -532,14 +583,21 @@ def run_cleanup(
532583
)
533584
_print_config(configs=effective_config_dict)
534585
if not dry_run and confirm:
535-
_confirm_delete(date=clean_before_timestamp, tables=sorted(effective_table_names))
586+
_confirm_delete(
587+
date=clean_before_timestamp,
588+
tables=sorted(effective_table_names),
589+
dag_ids=dag_ids,
590+
exclude_dag_ids=exclude_dag_ids,
591+
)
536592
existing_tables = reflect_tables(tables=None, session=session).tables
537593

538594
for table_name, table_config in effective_config_dict.items():
539595
if table_name in existing_tables:
540596
with _suppress_with_logging(table_name, session):
541597
_cleanup_table(
542598
clean_before_timestamp=clean_before_timestamp,
599+
dag_ids=dag_ids,
600+
exclude_dag_ids=exclude_dag_ids,
543601
dry_run=dry_run,
544602
verbose=verbose,
545603
**table_config.__dict__,

airflow-core/tests/unit/cli/commands/test_db_command.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,8 @@ def test_date_timezone_omitted(self, run_cleanup_mock, timezone):
701701
db_command.cleanup_tables(args)
702702
run_cleanup_mock.assert_called_once_with(
703703
table_names=None,
704+
dag_ids=None,
705+
exclude_dag_ids=None,
704706
dry_run=False,
705707
clean_before_timestamp=pendulum.parse(timestamp, tz=timezone),
706708
verbose=False,
@@ -722,6 +724,8 @@ def test_date_timezone_supplied(self, run_cleanup_mock, timezone):
722724

723725
run_cleanup_mock.assert_called_once_with(
724726
table_names=None,
727+
dag_ids=None,
728+
exclude_dag_ids=None,
725729
dry_run=False,
726730
clean_before_timestamp=pendulum.parse(timestamp),
727731
verbose=False,
@@ -749,6 +753,8 @@ def test_confirm(self, run_cleanup_mock, confirm_arg, expected):
749753

750754
run_cleanup_mock.assert_called_once_with(
751755
table_names=None,
756+
dag_ids=None,
757+
exclude_dag_ids=None,
752758
dry_run=False,
753759
clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
754760
verbose=False,
@@ -776,6 +782,8 @@ def test_skip_archive(self, run_cleanup_mock, extra_arg, expected):
776782

777783
run_cleanup_mock.assert_called_once_with(
778784
table_names=None,
785+
dag_ids=None,
786+
exclude_dag_ids=None,
779787
dry_run=False,
780788
clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
781789
verbose=False,
@@ -803,6 +811,8 @@ def test_dry_run(self, run_cleanup_mock, dry_run_arg, expected):
803811

804812
run_cleanup_mock.assert_called_once_with(
805813
table_names=None,
814+
dag_ids=None,
815+
exclude_dag_ids=None,
806816
dry_run=expected,
807817
clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
808818
verbose=False,
@@ -832,6 +842,8 @@ def test_tables(self, run_cleanup_mock, extra_args, expected):
832842

833843
run_cleanup_mock.assert_called_once_with(
834844
table_names=expected,
845+
dag_ids=None,
846+
exclude_dag_ids=None,
835847
dry_run=False,
836848
clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
837849
verbose=False,
@@ -859,6 +871,8 @@ def test_verbose(self, run_cleanup_mock, extra_args, expected):
859871

860872
run_cleanup_mock.assert_called_once_with(
861873
table_names=None,
874+
dag_ids=None,
875+
exclude_dag_ids=None,
862876
dry_run=False,
863877
clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
864878
verbose=expected,
@@ -886,6 +900,8 @@ def test_batch_size(self, run_cleanup_mock, extra_args, expected):
886900

887901
run_cleanup_mock.assert_called_once_with(
888902
table_names=None,
903+
dag_ids=None,
904+
exclude_dag_ids=None,
889905
dry_run=False,
890906
clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
891907
verbose=False,
@@ -894,6 +910,68 @@ def test_batch_size(self, run_cleanup_mock, extra_args, expected):
894910
batch_size=expected,
895911
)
896912

913+
@pytest.mark.parametrize(
914+
("extra_args", "expected"), [(["--dag-ids", "dag1, dag2"], ["dag1", "dag2"]), ([], None)]
915+
)
916+
@patch("airflow.cli.commands.db_command.run_cleanup")
917+
def test_dag_ids(self, run_cleanup_mock, extra_args, expected):
918+
"""
919+
When dag_ids are included in the args then dag_ids should be passed in, or None otherwise
920+
"""
921+
args = self.parser.parse_args(
922+
[
923+
"db",
924+
"clean",
925+
"--clean-before-timestamp",
926+
"2021-01-01",
927+
*extra_args,
928+
]
929+
)
930+
db_command.cleanup_tables(args)
931+
932+
run_cleanup_mock.assert_called_once_with(
933+
table_names=None,
934+
dry_run=False,
935+
dag_ids=expected,
936+
exclude_dag_ids=None,
937+
clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
938+
verbose=False,
939+
confirm=True,
940+
skip_archive=False,
941+
batch_size=None,
942+
)
943+
944+
@pytest.mark.parametrize(
945+
("extra_args", "expected"), [(["--exclude-dag-ids", "dag1, dag2"], ["dag1", "dag2"]), ([], None)]
946+
)
947+
@patch("airflow.cli.commands.db_command.run_cleanup")
948+
def test_exclude_dag_ids(self, run_cleanup_mock, extra_args, expected):
949+
"""
950+
When exclude_dag_ids are included in the args then exclude_dag_ids should be passed in, or None otherwise
951+
"""
952+
args = self.parser.parse_args(
953+
[
954+
"db",
955+
"clean",
956+
"--clean-before-timestamp",
957+
"2021-01-01",
958+
*extra_args,
959+
]
960+
)
961+
db_command.cleanup_tables(args)
962+
963+
run_cleanup_mock.assert_called_once_with(
964+
table_names=None,
965+
dry_run=False,
966+
dag_ids=None,
967+
exclude_dag_ids=expected,
968+
clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
969+
verbose=False,
970+
confirm=True,
971+
skip_archive=False,
972+
batch_size=None,
973+
)
974+
897975
@patch("airflow.cli.commands.db_command.export_archived_records")
898976
@patch("airflow.cli.commands.db_command.os.path.isdir", return_value=True)
899977
def test_export_archived_records(self, os_mock, export_archived_mock):

0 commit comments

Comments
 (0)