Skip to content

Commit 3a87490

Browse files
authored
Add unique UUID try_id column to TaskInstance and TaskInstanceHistory (apache#47065)
* Add unique UUID `try_id` column to TaskInstance and TaskInstanceHistory This change introduces a new `try_id` UUID column that uniquely identifies each task instance attempt across both TaskInstance and TaskInstanceHistory tables. - Creates a unique identifier for each task execution attempt - Allows retrieving task instances or history with a single key - Maintains try_id uniqueness when tasks are retried or cleared - Adds necessary database migration for adding the columns and constraints - Updates relevant task instance functions to generate new try_ids when tasks are retried Tested manually and unittest to see that the behaviour works well. * Make try_id a primary key of task_instance_history table * fixup! Make try_id a primary key of task_instance_history table * fixup! fixup! Make try_id a primary key of task_instance_history table * fixup! fixup! fixup! Make try_id a primary key of task_instance_history table * fixup! fixup! fixup! fixup! Make try_id a primary key of task_instance_history table * Update task_instance_history_orm * fixup! Update task_instance_history_orm * fixup! fixup! Update task_instance_history_orm * fix test * improve test query * fixup! improve test query * fixup! fixup! improve test query
1 parent 8e3d25f commit 3a87490

File tree

11 files changed

+591
-371
lines changed

11 files changed

+591
-371
lines changed

airflow/api_fastapi/execution_api/routes/task_instances.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,12 @@ def ti_update_state(
295295
updated_state = TaskInstanceState.FAILED
296296
elif ti_patch_payload.state == TaskInstanceState.FAILED:
297297
if _is_eligible_to_retry(previous_state, try_number, max_tries):
298+
from airflow.models.taskinstance import uuid7
298299
from airflow.models.taskinstancehistory import TaskInstanceHistory
299300

300301
ti = session.get(TI, ti_id_str)
301302
TaskInstanceHistory.record_ti(ti, session=session)
303+
ti.try_id = uuid7()
302304
updated_state = TaskInstanceState.UP_FOR_RETRY
303305
else:
304306
updated_state = TaskInstanceState.FAILED
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Add try_id to TI and TIH.
21+
22+
Revision ID: 7645189f3479
23+
Revises: e00344393f31
24+
Create Date: 2025-02-24 18:18:12.063106
25+
26+
"""
27+
28+
from __future__ import annotations
29+
30+
import sqlalchemy as sa
31+
from alembic import op
32+
from sqlalchemy.dialects import postgresql
33+
from sqlalchemy_utils import UUIDType
34+
35+
from airflow.models.taskinstance import uuid7
36+
37+
# revision identifiers, used by Alembic.
38+
revision = "7645189f3479"
39+
down_revision = "e00344393f31"
40+
branch_labels = None
41+
depends_on = None
42+
airflow_version = "3.0.0"
43+
44+
45+
def upgrade():
46+
"""Apply Add try_id to TI and TIH."""
47+
dialect_name = op.get_bind().dialect.name
48+
with op.batch_alter_table("task_instance", schema=None) as batch_op:
49+
batch_op.add_column(sa.Column("try_id", UUIDType(binary=False), nullable=True))
50+
51+
stmt = sa.text("SELECT id FROM task_instance WHERE try_id IS NULL")
52+
conn = op.get_bind()
53+
null_rows = conn.execute(stmt)
54+
if null_rows:
55+
null_rows = null_rows.fetchall()
56+
else:
57+
null_rows = []
58+
59+
stmt = sa.text("""
60+
UPDATE task_instance
61+
SET try_id = :uuid
62+
WHERE id = :row_id AND try_id IS NULL
63+
""")
64+
65+
# Update each row with a unique UUID
66+
for row in null_rows:
67+
uuid_value = uuid7()
68+
conn.execute(stmt.bindparams(uuid=uuid_value, row_id=row.id))
69+
with op.batch_alter_table("task_instance", schema=None) as batch_op:
70+
batch_op.alter_column("try_id", nullable=False, existing_type=UUIDType(binary=False))
71+
batch_op.create_unique_constraint(batch_op.f("task_instance_try_id_uq"), ["try_id"])
72+
73+
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
74+
batch_op.add_column(
75+
sa.Column(
76+
"task_instance_id",
77+
sa.String(length=36).with_variant(postgresql.UUID(), "postgresql"),
78+
)
79+
)
80+
batch_op.add_column(sa.Column("try_id", UUIDType(binary=False), nullable=True))
81+
# Update try_id column
82+
stmt = sa.text("SELECT id FROM task_instance_history WHERE try_id IS NULL")
83+
conn = op.get_bind()
84+
null_rows = conn.execute(stmt)
85+
if null_rows:
86+
null_rows = null_rows.fetchall()
87+
else:
88+
null_rows = []
89+
90+
stmt = sa.text("""
91+
UPDATE task_instance_history
92+
SET try_id = :uuid
93+
WHERE id = :row_id AND try_id IS NULL
94+
""")
95+
96+
# Update each row with a unique UUID
97+
for row in null_rows:
98+
uuid_value = uuid7()
99+
conn.execute(stmt.bindparams(uuid=uuid_value, row_id=row.id))
100+
# Update task_instance_id
101+
if dialect_name == "postgresql":
102+
op.execute("""
103+
UPDATE task_instance_history SET task_instance_id = task_instance.id
104+
FROM task_instance
105+
WHERE task_instance_history.task_id = task_instance.task_id
106+
AND task_instance_history.dag_id = task_instance.dag_id
107+
AND task_instance_history.run_id = task_instance.run_id
108+
AND task_instance_history.map_index = task_instance.map_index
109+
""")
110+
elif dialect_name == "mysql":
111+
op.execute("""
112+
UPDATE task_instance_history tih
113+
JOIN task_instance ti ON
114+
tih.task_id = ti.task_id
115+
AND tih.dag_id = ti.dag_id
116+
AND tih.run_id = ti.run_id
117+
AND tih.map_index = ti.map_index
118+
SET tih.task_instance_id = ti.id
119+
""")
120+
else:
121+
op.execute("""
122+
UPDATE task_instance_history
123+
SET task_instance_id = (SELECT id FROM task_instance WHERE task_instance_history.task_id = task_instance.task_id
124+
AND task_instance_history.dag_id = task_instance.dag_id
125+
AND task_instance_history.run_id = task_instance.run_id
126+
AND task_instance_history.map_index = task_instance.map_index)
127+
""")
128+
with op.batch_alter_table("task_instance_history") as batch_op:
129+
batch_op.alter_column("try_id", existing_type=UUIDType(binary=False), nullable=False)
130+
batch_op.drop_column("id")
131+
batch_op.alter_column(
132+
"task_instance_id",
133+
nullable=False,
134+
existing_type=sa.String(length=36).with_variant(postgresql.UUID(), "postgresql"),
135+
)
136+
137+
batch_op.create_primary_key(
138+
"task_instance_history_pkey",
139+
[
140+
"try_id",
141+
],
142+
)
143+
144+
145+
def downgrade():
146+
"""Unapply Add try_id to TI and TIH."""
147+
dialect_name = op.get_bind().dialect.name
148+
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
149+
batch_op.drop_constraint(batch_op.f("task_instance_history_pkey"), type_="primary")
150+
batch_op.add_column(sa.Column("id", sa.INTEGER, nullable=True))
151+
batch_op.drop_column("task_instance_id")
152+
if dialect_name == "postgresql":
153+
op.execute(
154+
"""
155+
ALTER TABLE task_instance_history ADD COLUMN row_num SERIAL;
156+
UPDATE task_instance_history SET id = row_num;
157+
ALTER TABLE task_instance_history DROP COLUMN row_num;
158+
"""
159+
)
160+
elif dialect_name == "mysql":
161+
op.execute(
162+
"""
163+
UPDATE task_instance_history tih
164+
JOIN (
165+
SELECT id, ROW_NUMBER() OVER (ORDER BY id) AS row_num
166+
FROM task_instance_history
167+
) AS temp ON tih.id = temp.id
168+
SET tih.id = temp.row_num;
169+
"""
170+
)
171+
else:
172+
op.execute("""
173+
UPDATE task_instance_history
174+
SET id = (SELECT COUNT(*) FROM task_instance_history t2 WHERE t2.id <= task_instance_history.id);
175+
""")
176+
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
177+
batch_op.alter_column("id", nullable=False, existing_type=sa.INTEGER)
178+
batch_op.drop_column("try_id")
179+
batch_op.create_primary_key("task_instance_history_pkey", ["id"])
180+
181+
with op.batch_alter_table("task_instance", schema=None) as batch_op:
182+
batch_op.drop_constraint(batch_op.f("task_instance_try_id_uq"), type_="unique")
183+
batch_op.drop_column("try_id")

airflow/models/taskinstance.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ def clear_task_instances(
472472

473473
for ti in tis:
474474
TaskInstanceHistory.record_ti(ti, session)
475+
ti.try_id = uuid7()
475476
if ti.state == TaskInstanceState.RUNNING:
476477
# If a task is cleared when running, set its state to RESTARTING so that
477478
# the task is terminated and becomes eligible for retry.
@@ -764,6 +765,7 @@ def _set_ti_attrs(target, source, include_dag_run=False):
764765
target.end_date = source.end_date
765766
target.duration = source.duration
766767
target.state = source.state
768+
target.try_id = source.try_id
767769
target.try_number = source.try_number
768770
target.max_tries = source.max_tries
769771
target.hostname = source.hostname
@@ -1655,6 +1657,7 @@ class TaskInstance(Base, LoggingMixin):
16551657
end_date = Column(UtcDateTime)
16561658
duration = Column(Float)
16571659
state = Column(String(20))
1660+
try_id = Column(UUIDType(binary=False), default=uuid7, unique=True, nullable=False)
16581661
try_number = Column(Integer, default=0)
16591662
max_tries = Column(Integer, server_default=text("-1"))
16601663
hostname = Column(String(1000))
@@ -3145,6 +3148,7 @@ def fetch_handle_failure_context(
31453148
from airflow.models.taskinstancehistory import TaskInstanceHistory
31463149

31473150
TaskInstanceHistory.record_ti(ti, session=session)
3151+
ti.try_id = uuid7()
31483152

31493153
ti.state = State.UP_FOR_RETRY
31503154
email_for_state = operator.attrgetter("email_on_retry")

airflow/models/taskinstancehistory.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
select,
3333
text,
3434
)
35+
from sqlalchemy.dialects import postgresql
3536
from sqlalchemy.ext.mutable import MutableDict
3637
from sqlalchemy.orm import relationship
3738
from sqlalchemy_utils import UUIDType
@@ -60,7 +61,11 @@ class TaskInstanceHistory(Base):
6061
"""
6162

6263
__tablename__ = "task_instance_history"
63-
id = Column(Integer(), primary_key=True, autoincrement=True)
64+
try_id = Column(UUIDType(binary=False), nullable=False, primary_key=True)
65+
task_instance_id = Column(
66+
String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
67+
nullable=False,
68+
)
6469
task_id = Column(StringID(), nullable=False)
6570
dag_id = Column(StringID(), nullable=False)
6671
run_id = Column(StringID(), nullable=False)
@@ -113,6 +118,9 @@ def __init__(
113118
for column in self.__table__.columns:
114119
if column.name == "id":
115120
continue
121+
if column.name == "task_instance_id":
122+
setattr(self, column.name, ti.id)
123+
continue
116124
setattr(self, column.name, getattr(ti, column.name))
117125

118126
if state:

airflow/utils/db.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol):
9494
"2.9.2": "686269002441",
9595
"2.10.0": "22ed7efa9da2",
9696
"2.10.3": "5f2621c13b39",
97-
"3.0.0": "e00344393f31",
97+
"3.0.0": "7645189f3479",
9898
}
9999

100100

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
7926c50f5d7526588adbd8017d50097392511315ec78f5a64301dbbf4b166de3
1+
c650e89558505c90a8b793794fbab0516ef66aa01289effbdd548c4772d29bc3

0 commit comments

Comments
 (0)