Skip to content

Commit 3299567

Browse files
authored
Handle Serialized DAG Format from v3 to v2 when downgrading Airflow (apache#55975)
This migration enables Airflow downgrades by converting v3 serialized DAGs back to v2 format. The `upgrade()` is a no-op since the server handles v1/v2/v3 at runtime, but `downgrade()` removes client_defaults sections and updates version numbers to ensure compatibility with older Airflow versions. closes apache#55949
1 parent 3ad61d8 commit 3299567

File tree

5 files changed

+207
-5
lines changed

5 files changed

+207
-5
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ebc15a6ad6529f10a59633d8adff009dd3b526e6a97f1b578436c275b98e177b
1+
5bb169c46e4f09f7e2c78c1233dddb3e320d4a4aa5a202b9cadec87ff1da6ae4

airflow-core/docs/migrations-ref.rst

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are executed via when you ru
3939
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
4040
| Revision ID | Revises ID | Airflow Version | Description |
4141
+=========================+==================+===================+==============================================================+
42-
| ``15d84ca19038`` (head) | ``eaf332f43c7c`` | ``3.2.0`` | replace asset_trigger table with asset_watcher. |
42+
| ``15d84ca19038`` (head) | ``cc92b33c6709`` | ``3.2.0`` | replace asset_trigger table with asset_watcher. |
43+
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
44+
| ``cc92b33c6709`` | ``eaf332f43c7c`` | ``3.1.0`` | Add backward compatibility for serialized DAG format v3 to |
45+
| | | | v2. |
4346
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
4447
| ``eaf332f43c7c`` | ``a3c7f2b18d4e`` | ``3.1.0`` | add last_parse_duration to dag model. |
4548
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Add backward compatibility for serialized DAG format v3 to v2.
21+
22+
Revision ID: cc92b33c6709
23+
Revises: eaf332f43c7c
24+
Create Date: 2025-09-22 22:50:48.035121
25+
26+
"""
27+
28+
from __future__ import annotations
29+
30+
from textwrap import dedent
31+
32+
import sqlalchemy as sa
33+
from alembic import context, op
34+
35+
# revision identifiers, used by Alembic.
36+
revision = "cc92b33c6709"
37+
down_revision = "eaf332f43c7c"
38+
branch_labels = None
39+
depends_on = None
40+
airflow_version = "3.1.0"
41+
42+
43+
def upgrade():
44+
"""Apply Downgrade Serialized Dag version to v2."""
45+
# No-op: Server handles v1/v2/v3 DAGs at runtime via conversion functions
46+
pass
47+
48+
49+
def downgrade():
50+
"""Convert v3 serialized DAGs back to v2 format for compatibility with older Airflow versions."""
51+
if context.is_offline_mode():
52+
print(
53+
dedent("""
54+
------------
55+
-- Manual v3 to v2 DAG conversion required (offline mode)
56+
--
57+
-- PostgreSQL:
58+
-- UPDATE serialized_dag SET data = jsonb_set((data::jsonb - 'client_defaults'), '{__version}', '2')::json
59+
-- WHERE id IN (SELECT id FROM serialized_dag WHERE data->>'__version' = '3' AND data_compressed IS NULL);
60+
--
61+
-- MySQL/SQLite:
62+
-- UPDATE serialized_dag SET data = JSON_SET(JSON_REMOVE(data, '$.client_defaults'), '$.__version', 2)
63+
-- WHERE JSON_EXTRACT(data, '$.__version') = '3' AND data_compressed IS NULL;
64+
--
65+
-- For compressed DAGs: run online migration.
66+
------------
67+
""")
68+
)
69+
return
70+
71+
import gzip
72+
import json
73+
74+
connection = op.get_bind()
75+
dialect = connection.dialect.name
76+
77+
if dialect == "postgresql":
78+
# PostgreSQL - pre-filter v3 DAGs to avoid parsing all rows
79+
connection.execute(
80+
sa.text("""
81+
UPDATE serialized_dag
82+
SET data = jsonb_set(
83+
(data::jsonb - 'client_defaults'),
84+
'{__version}',
85+
'2'
86+
)::json
87+
WHERE id IN (
88+
SELECT id FROM serialized_dag
89+
WHERE data->>'__version' = '3'
90+
AND data_compressed IS NULL
91+
)
92+
""")
93+
)
94+
elif dialect == "mysql":
95+
connection.execute(
96+
sa.text("""
97+
UPDATE serialized_dag
98+
SET data = JSON_SET(
99+
JSON_REMOVE(data, '$.client_defaults'),
100+
'$.__version',
101+
2
102+
)
103+
WHERE JSON_EXTRACT(data, '$.__version') = '3'
104+
AND data_compressed IS NULL
105+
""")
106+
)
107+
else:
108+
json_functions_available = False
109+
try:
110+
connection.execute(sa.text("SELECT JSON_SET('{}', '$.test', 'value')")).fetchone()
111+
json_functions_available = True
112+
print("SQLite JSON functions detected, using optimized SQL approach")
113+
except Exception:
114+
print("SQLite JSON functions not available, using Python fallback for JSON processing")
115+
116+
if json_functions_available:
117+
connection.execute(
118+
sa.text("""
119+
UPDATE serialized_dag
120+
SET data = JSON_SET(
121+
JSON_REMOVE(data, '$.client_defaults'),
122+
'$.__version',
123+
2
124+
)
125+
WHERE JSON_EXTRACT(data, '$.__version') = '3'
126+
AND data_compressed IS NULL
127+
""")
128+
)
129+
else:
130+
result = connection.execute(
131+
sa.text("""
132+
SELECT id, data
133+
FROM serialized_dag
134+
WHERE data_compressed IS NULL
135+
""")
136+
)
137+
138+
for row in result:
139+
dag_id, data_json = row
140+
try:
141+
if data_json is None:
142+
continue
143+
144+
dag_data = json.loads(data_json)
145+
146+
if dag_data.get("__version") != 3:
147+
continue
148+
149+
if "client_defaults" in dag_data:
150+
del dag_data["client_defaults"]
151+
dag_data["__version"] = 2
152+
153+
new_json = json.dumps(dag_data)
154+
connection.execute(
155+
sa.text("UPDATE serialized_dag SET data = :data WHERE id = :id"),
156+
{"data": new_json, "id": dag_id},
157+
)
158+
159+
except Exception as e:
160+
print(f"Failed to downgrade uncompressed DAG {dag_id}: {e}")
161+
continue
162+
try:
163+
result = connection.execute(
164+
sa.text("""
165+
SELECT id, data_compressed
166+
FROM serialized_dag
167+
WHERE data_compressed IS NOT NULL
168+
""")
169+
)
170+
171+
for row in result:
172+
dag_id, compressed_data = row
173+
try:
174+
if compressed_data is None:
175+
continue
176+
177+
decompressed = gzip.decompress(compressed_data)
178+
dag_data = json.loads(decompressed)
179+
180+
if dag_data.get("__version") != 3:
181+
continue
182+
183+
if "client_defaults" in dag_data:
184+
del dag_data["client_defaults"]
185+
dag_data["__version"] = 2
186+
187+
new_compressed = gzip.compress(json.dumps(dag_data).encode("utf-8"))
188+
connection.execute(
189+
sa.text("UPDATE serialized_dag SET data_compressed = :data WHERE id = :id"),
190+
{"data": new_compressed, "id": dag_id},
191+
)
192+
193+
except Exception as e:
194+
print(f"Failed to downgrade compressed DAG {dag_id}: {e}")
195+
continue
196+
197+
except Exception as e:
198+
print(f"Failed to process compressed DAGs during downgrade: {e}")
199+
raise

airflow-core/src/airflow/migrations/versions/0085_3_2_0_replace_asset_trigger_table_with_asset.py renamed to airflow-core/src/airflow/migrations/versions/0086_3_2_0_replace_asset_trigger_table_with_asset.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
replace asset_trigger table with asset_watcher.
2121
2222
Revision ID: 15d84ca19038
23-
Revises: eaf332f43c7c
23+
Revises: cc92b33c6709
2424
Create Date: 2025-09-14 01:34:40.423767
2525
2626
"""
@@ -32,7 +32,7 @@
3232

3333
# revision identifiers, used by Alembic.
3434
revision = "15d84ca19038"
35-
down_revision = "eaf332f43c7c"
35+
down_revision = "cc92b33c6709"
3636
branch_labels = None
3737
depends_on = None
3838
airflow_version = "3.2.0"

airflow-core/src/airflow/utils/db.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class MappedClassProtocol(Protocol):
110110
"2.10.3": "5f2621c13b39",
111111
"3.0.0": "29ce7909c52b",
112112
"3.0.3": "fe199e1abd77",
113-
"3.1.0": "eaf332f43c7c",
113+
"3.1.0": "cc92b33c6709",
114114
"3.2.0": "15d84ca19038",
115115
}
116116

0 commit comments

Comments
 (0)