Skip to content

Commit 802cb87

Browse files
authored
Replace TaskInstanceNote composite primary key with TI.id (apache#47376)
This commit removes the composite primary key on TaskInstanceNote and replaces it with TaskInstance ID key, which is a UUID. Related: apache#44147
1 parent 7661659 commit 802cb87

File tree

7 files changed

+1545
-1433
lines changed

7 files changed

+1545
-1433
lines changed
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Use TI.id as primary key to TaskInstanceNote.
21+
22+
Revision ID: cf87489a35df
23+
Revises: 7645189f3479
24+
Create Date: 2025-03-04 20:29:19.935428
25+
26+
"""
27+
28+
from __future__ import annotations
29+
30+
import sqlalchemy as sa
31+
from alembic import op
32+
from sqlalchemy.dialects import postgresql
33+
34+
from airflow.migrations.db_types import StringID
35+
36+
# revision identifiers, used by Alembic.
37+
revision = "cf87489a35df"
38+
down_revision = "7645189f3479"
39+
branch_labels = None
40+
depends_on = None
41+
airflow_version = "3.0.0"
42+
43+
44+
def upgrade():
45+
"""Apply Use TI.id as primary key to TaskInstanceNote."""
46+
dialect_name = op.get_bind().dialect.name
47+
with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
48+
batch_op.add_column(
49+
sa.Column("ti_id", sa.String(length=36).with_variant(postgresql.UUID(), "postgresql"))
50+
)
51+
if dialect_name == "postgresql":
52+
op.execute("""
53+
UPDATE task_instance_note SET ti_id = task_instance.id
54+
FROM task_instance
55+
WHERE task_instance_note.task_id = task_instance.task_id
56+
AND task_instance_note.dag_id = task_instance.dag_id
57+
AND task_instance_note.run_id = task_instance.run_id
58+
AND task_instance_note.map_index = task_instance.map_index
59+
""")
60+
elif dialect_name == "mysql":
61+
op.execute("""
62+
UPDATE task_instance_note tin
63+
JOIN task_instance ti ON
64+
tin.task_id = ti.task_id
65+
AND tin.dag_id = ti.dag_id
66+
AND tin.run_id = ti.run_id
67+
AND tin.map_index = ti.map_index
68+
SET tin.ti_id = ti.id
69+
""")
70+
else:
71+
op.execute("""
72+
UPDATE task_instance_note
73+
SET ti_id = (SELECT id FROM task_instance WHERE task_instance_note.task_id = task_instance.task_id
74+
AND task_instance_note.dag_id = task_instance.dag_id
75+
AND task_instance_note.run_id = task_instance.run_id
76+
AND task_instance_note.map_index = task_instance.map_index)
77+
""")
78+
with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
79+
batch_op.alter_column(
80+
"ti_id",
81+
existing_type=sa.String(length=36).with_variant(postgresql.UUID(), "postgresql"),
82+
nullable=False,
83+
)
84+
batch_op.drop_constraint("task_instance_note_ti_fkey", type_="foreignkey")
85+
batch_op.drop_constraint("task_instance_note_pkey", type_="primary")
86+
batch_op.drop_column("map_index")
87+
batch_op.drop_column("dag_id")
88+
batch_op.drop_column("task_id")
89+
batch_op.drop_column("run_id")
90+
batch_op.create_primary_key("task_instance_note_pkey", ["ti_id"])
91+
batch_op.create_foreign_key(
92+
"task_instance_note_ti_fkey", "task_instance", ["ti_id"], ["id"], ondelete="CASCADE"
93+
)
94+
95+
96+
def downgrade():
97+
"""Unapply Use TI.id as primary key to TaskInstanceNote."""
98+
dialect_name = op.get_bind().dialect.name
99+
with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
100+
batch_op.add_column(sa.Column("dag_id", StringID(), nullable=True))
101+
batch_op.add_column(sa.Column("task_id", StringID(), nullable=True))
102+
batch_op.add_column(sa.Column("run_id", StringID(), nullable=True))
103+
batch_op.add_column(sa.Column("map_index", sa.Integer(), nullable=True, server_default=sa.text("-1")))
104+
105+
if dialect_name == "postgresql":
106+
op.execute("""
107+
UPDATE task_instance_note
108+
SET dag_id = task_instance.dag_id,
109+
task_id = task_instance.task_id,
110+
run_id = task_instance.run_id,
111+
map_index = task_instance.map_index
112+
FROM task_instance
113+
WHERE task_instance_note.ti_id = task_instance.id
114+
""")
115+
elif dialect_name == "mysql":
116+
op.execute("""
117+
UPDATE task_instance_note tin
118+
JOIN task_instance ti ON
119+
tin.ti_id = ti.id
120+
SET tin.dag_id = ti.dag_id,
121+
tin.task_id = ti.task_id,
122+
tin.run_id = ti.run_id,
123+
tin.map_index = ti.map_index
124+
""")
125+
else:
126+
op.execute("""
127+
UPDATE task_instance_note
128+
SET dag_id = (SELECT dag_id FROM task_instance WHERE task_instance_note.ti_id = task_instance.id),
129+
task_id = (SELECT task_id FROM task_instance WHERE task_instance_note.ti_id = task_instance.id),
130+
run_id = (SELECT run_id FROM task_instance WHERE task_instance_note.ti_id = task_instance.id),
131+
map_index = (SELECT map_index FROM task_instance WHERE task_instance_note.ti_id = task_instance.id)
132+
""")
133+
with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
134+
batch_op.drop_constraint("task_instance_note_ti_fkey", type_="foreignkey")
135+
batch_op.drop_constraint("task_instance_note_pkey", type_="primary")
136+
batch_op.drop_column("ti_id")
137+
batch_op.create_foreign_key(
138+
"task_instance_note_ti_fkey",
139+
"task_instance",
140+
["dag_id", "task_id", "run_id", "map_index"],
141+
["dag_id", "task_id", "run_id", "map_index"],
142+
ondelete="CASCADE",
143+
)
144+
batch_op.alter_column(
145+
"map_index", existing_type=sa.INTEGER(), existing_server_default=sa.text("-1"), nullable=False
146+
)
147+
batch_op.alter_column("run_id", existing_type=StringID(), nullable=False)
148+
batch_op.alter_column("task_id", existing_type=StringID(), nullable=False)
149+
batch_op.alter_column("dag_id", existing_type=StringID(), nullable=False)
150+
151+
batch_op.create_primary_key("task_instance_note_pkey", ["dag_id", "task_id", "run_id", "map_index"])

airflow/models/taskinstance.py

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3619,12 +3619,12 @@ def clear_db_references(self, session: Session):
36193619
from airflow.models.renderedtifields import RenderedTaskInstanceFields
36203620

36213621
tables: list[type[TaskInstanceDependencies]] = [
3622-
TaskInstanceNote,
36233622
TaskReschedule,
36243623
XCom,
36253624
RenderedTaskInstanceFields,
36263625
TaskMap,
36273626
]
3627+
tables_by_id: list[type[Base]] = [TaskInstanceNote]
36283628
for table in tables:
36293629
session.execute(
36303630
delete(table).where(
@@ -3634,6 +3634,8 @@ def clear_db_references(self, session: Session):
36343634
table.map_index == self.map_index,
36353635
)
36363636
)
3637+
for table in tables_by_id:
3638+
session.execute(delete(table).where(table.ti_id == self.id))
36373639

36383640
@classmethod
36393641
def duration_expression_update(
@@ -3804,31 +3806,27 @@ def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
38043806
)
38053807

38063808

3807-
class TaskInstanceNote(TaskInstanceDependencies):
3809+
class TaskInstanceNote(Base):
38083810
"""For storage of arbitrary notes concerning the task instance."""
38093811

38103812
__tablename__ = "task_instance_note"
3811-
3813+
ti_id = Column(
3814+
String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
3815+
primary_key=True,
3816+
nullable=False,
3817+
)
38123818
user_id = Column(String(128), nullable=True)
3813-
task_id = Column(StringID(), primary_key=True, nullable=False)
3814-
dag_id = Column(StringID(), primary_key=True, nullable=False)
3815-
run_id = Column(StringID(), primary_key=True, nullable=False)
3816-
map_index = Column(Integer, primary_key=True, nullable=False)
38173819
content = Column(String(1000).with_variant(Text(1000), "mysql"))
38183820
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
38193821
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
38203822

3821-
task_instance = relationship("TaskInstance", back_populates="task_instance_note")
3823+
task_instance = relationship("TaskInstance", back_populates="task_instance_note", uselist=False)
38223824

38233825
__table_args__ = (
3824-
PrimaryKeyConstraint("task_id", "dag_id", "run_id", "map_index", name="task_instance_note_pkey"),
38253826
ForeignKeyConstraint(
3826-
(dag_id, task_id, run_id, map_index),
3827+
(ti_id,),
38273828
[
3828-
"task_instance.dag_id",
3829-
"task_instance.task_id",
3830-
"task_instance.run_id",
3831-
"task_instance.map_index",
3829+
"task_instance.id",
38323830
],
38333831
name="task_instance_note_ti_fkey",
38343832
ondelete="CASCADE",
@@ -3840,10 +3838,10 @@ def __init__(self, content, user_id=None):
38403838
self.user_id = user_id
38413839

38423840
def __repr__(self):
3843-
prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.run_id}"
3844-
if self.map_index != -1:
3845-
prefix += f" map_index={self.map_index}"
3846-
return prefix + ">"
3841+
prefix = f"<{self.__class__.__name__}: {self.task_instance.dag_id}.{self.task_instance.task_id} {self.task_instance.run_id}"
3842+
if self.task_instance.map_index != -1:
3843+
prefix += f" map_index={self.task_instance.map_index}"
3844+
return prefix + f" TI ID: {self.ti_id}>"
38473845

38483846

38493847
STATICA_HACK = True

airflow/utils/db.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol):
9494
"2.9.2": "686269002441",
9595
"2.10.0": "22ed7efa9da2",
9696
"2.10.3": "5f2621c13b39",
97-
"3.0.0": "7645189f3479",
97+
"3.0.0": "cf87489a35df",
9898
}
9999

100100

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
777af678bc68670c54c2f0b2f3bc5af95ec918fa658169e99fc4e24fab0c616b
1+
47df58a2f21947ec4b722bff6437cc0b0ec21d0570c53b96d74b67a11f19e36b

0 commit comments

Comments
 (0)