|
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 |
|
| 15 | +# Note: This sample is designed for Airflow 1 and 2. |
| 16 | + |
15 | 17 | # [START composer_metadb_cleanup] |
16 | | -""" |
17 | | -A maintenance workflow that you can deploy into Airflow to periodically clean |
| 18 | +"""A maintenance workflow that you can deploy into Airflow to periodically clean |
18 | 19 | out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid |
19 | 20 | having too much data in your Airflow MetaStore. |
20 | 21 |
|
|
68 | 69 | from sqlalchemy import desc, sql, text |
69 | 70 | from sqlalchemy.exc import ProgrammingError |
70 | 71 |
|
| 72 | + |
| 73 | +def parse_airflow_version(version: str) -> tuple[int]: |
| 74 | + # TODO(developer): Update this function if you are using a version |
| 75 | + # with non-numerical characters such as "2.9.3rc1". |
| 76 | + COMPOSER_SUFFIX = "+composer" |
| 77 | + if version.endswith(COMPOSER_SUFFIX): |
| 78 | + airflow_version_without_suffix = version[:-len(COMPOSER_SUFFIX)] |
| 79 | + else: |
| 80 | + airflow_version_without_suffix = version |
| 81 | + airflow_version_str = airflow_version_without_suffix.split(".") |
| 82 | + |
| 83 | + return tuple([int(s) for s in airflow_version_str]) |
| 84 | + |
| 85 | + |
71 | 86 | now = timezone.utcnow |
72 | 87 |
|
73 | 88 | # airflow-db-cleanup |
74 | 89 | DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "") |
| 90 | + |
75 | 91 | START_DATE = airflow.utils.dates.days_ago(1) |
76 | | -# How often to Run. @daily - Once a day at Midnight (UTC) |
| 92 | + |
| 93 | +# How often to Run. @daily - Once a day at Midnight (UTC). |
77 | 94 | SCHEDULE_INTERVAL = "@daily" |
78 | | -# Who is listed as the owner of this DAG in the Airflow Web Server |
| 95 | + |
| 96 | +# Who is listed as the owner of this DAG in the Airflow Web Server. |
79 | 97 | DAG_OWNER_NAME = "operations" |
80 | | -# List of email address to send email alerts to if this job fails |
| 98 | + |
| 99 | +# List of email address to send email alerts to if this job fails. |
81 | 100 | ALERT_EMAIL_ADDRESSES = [] |
82 | | -# Airflow version used by the environment in list form, value stored in |
83 | | -# airflow_version is in format e.g "2.3.4+composer" |
84 | | -AIRFLOW_VERSION = airflow_version[: -len("+composer")].split(".") |
85 | | -# Length to retain the log files if not already provided in the conf. If this |
86 | | -# is set to 30, the job will remove those files that arE 30 days old or older. |
| 101 | + |
| 102 | +# Airflow version used by the environment as a tuple of integers. |
| 103 | +# For example: (2, 9, 2) |
| 104 | +# |
| 105 | +# Value in `airflow_version` is in format e.g. "2.9.2+composer" |
| 106 | +# It's converted to facilitate version comparison. |
| 107 | +AIRFLOW_VERSION = parse_airflow_version(airflow_version) |
| 108 | + |
| 109 | +# Length to retain the log files if not already provided in the configuration. |
| 110 | +# If this is set to 30, the job will remove those files |
| 111 | +# that are 30 days old or older. |
87 | 112 | DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int( |
88 | 113 | Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30) |
89 | 114 | ) |
90 | | -# Prints the database entries which will be getting deleted; set to False |
91 | | -# to avoid printing large lists and slowdown process |
| 115 | + |
| 116 | +# Prints the database entries which will be getting deleted; |
| 117 | +# set to False to avoid printing large lists and slowdown the process. |
92 | 118 | PRINT_DELETES = False |
93 | | -# Whether the job should delete the db entries or not. Included if you want to |
94 | | -# temporarily avoid deleting the db entries. |
| 119 | + |
| 120 | +# Whether the job should delete the DB entries or not. |
| 121 | +# Included if you want to temporarily avoid deleting the DB entries. |
95 | 122 | ENABLE_DELETE = True |
96 | | -# List of all the objects that will be deleted. Comment out the DB objects you |
97 | | -# want to skip. |
| 123 | + |
| 124 | +# List of all the objects that will be deleted. |
| 125 | +# Comment out the DB objects you want to skip. |
98 | 126 | DATABASE_OBJECTS = [ |
99 | 127 | { |
100 | 128 | "airflow_db_model": DagRun, |
|
105 | 133 | }, |
106 | 134 | { |
107 | 135 | "airflow_db_model": TaskInstance, |
108 | | - "age_check_column": TaskInstance.start_date |
109 | | - if AIRFLOW_VERSION < ["2", "2", "0"] |
110 | | - else TaskInstance.start_date, |
| 136 | + "age_check_column": TaskInstance.start_date, |
111 | 137 | "keep_last": False, |
112 | 138 | "keep_last_filters": None, |
113 | 139 | "keep_last_group_by": None, |
|
122 | 148 | { |
123 | 149 | "airflow_db_model": XCom, |
124 | 150 | "age_check_column": XCom.execution_date |
125 | | - if AIRFLOW_VERSION < ["2", "2", "5"] |
| 151 | + if AIRFLOW_VERSION < (2, 2, 5) |
126 | 152 | else XCom.timestamp, |
127 | 153 | "keep_last": False, |
128 | 154 | "keep_last_filters": None, |
|
144 | 170 | }, |
145 | 171 | ] |
146 | 172 |
|
147 | | -# Check for TaskReschedule model |
| 173 | +# Check for TaskReschedule model. |
148 | 174 | try: |
149 | 175 | from airflow.models import TaskReschedule |
150 | 176 |
|
151 | 177 | DATABASE_OBJECTS.append( |
152 | 178 | { |
153 | 179 | "airflow_db_model": TaskReschedule, |
154 | 180 | "age_check_column": TaskReschedule.execution_date |
155 | | - if AIRFLOW_VERSION < ["2", "2", "0"] |
| 181 | + if AIRFLOW_VERSION < (2, 2, 0) |
156 | 182 | else TaskReschedule.start_date, |
157 | 183 | "keep_last": False, |
158 | 184 | "keep_last_filters": None, |
|
163 | 189 | except Exception as e: |
164 | 190 | logging.error(e) |
165 | 191 |
|
166 | | -# Check for TaskFail model |
| 192 | +# Check for TaskFail model. |
167 | 193 | try: |
168 | 194 | from airflow.models import TaskFail |
169 | 195 |
|
|
180 | 206 | except Exception as e: |
181 | 207 | logging.error(e) |
182 | 208 |
|
183 | | -# Check for RenderedTaskInstanceFields model |
184 | | -if AIRFLOW_VERSION < ["2", "4", "0"]: |
| 209 | +# Check for RenderedTaskInstanceFields model. |
| 210 | +if AIRFLOW_VERSION < (2, 4, 0): |
185 | 211 | try: |
186 | 212 | from airflow.models import RenderedTaskInstanceFields |
187 | 213 |
|
|
198 | 224 | except Exception as e: |
199 | 225 | logging.error(e) |
200 | 226 |
|
201 | | -# Check for ImportError model |
| 227 | +# Check for ImportError model. |
202 | 228 | try: |
203 | 229 | from airflow.models import ImportError |
204 | 230 |
|
|
216 | 242 | except Exception as e: |
217 | 243 | logging.error(e) |
218 | 244 |
|
219 | | -if AIRFLOW_VERSION < ["2", "6", "0"]: |
| 245 | +if AIRFLOW_VERSION < (2, 6, 0): |
220 | 246 | try: |
221 | 247 | from airflow.jobs.base_job import BaseJob |
222 | 248 |
|
@@ -530,5 +556,4 @@ def analyze_db(): |
530 | 556 |
|
531 | 557 | print_configuration.set_downstream(cleanup_op) |
532 | 558 | cleanup_op.set_downstream(analyze_op) |
533 | | - |
534 | 559 | # [END composer_metadb_cleanup] |
0 commit comments