Skip to content

Commit bca0a63

Browse files
authored
Fix missing TaskInstanceHistory on scheduler TI resets (#59639)
When a SchedulerJob is marked failed, orphaned task instances may be reset and re-scheduled, incrementing try_number without recording the abandoned attempt. This change records the current attempt into task_instance_history before resetting so users have a complete audit trail of the failure. related #57618
1 parent 5945bd5 commit bca0a63

File tree

2 files changed

+28
-0
lines changed

2 files changed

+28
-0
lines changed

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2604,8 +2604,16 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int:
26042604
reset_tis_message = []
26052605
for ti in to_reset:
26062606
reset_tis_message.append(repr(ti))
2607+
# If we reset a TI, it will be eligible to be scheduled again.
2608+
# This can cause the scheduler to increase the try_number on the TI.
2609+
# Record the current try to TaskInstanceHistory first so users have an audit trail for
2610+
# the attempt that was abandoned.
2611+
ti.prepare_db_for_next_try(session=session)
2612+
26072613
ti.state = None
26082614
ti.queued_by_job_id = None
2615+
ti.external_executor_id = None
2616+
ti.clear_next_method_args()
26092617

26102618
for ti in set(tis_to_adopt_or_reset) - set(to_reset):
26112619
ti.queued_by_job_id = self.job.id

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4138,6 +4138,8 @@ def test_adopt_or_reset_orphaned_tasks_nothing(self):
41384138
list(sorted(State.adoptable_states)),
41394139
)
41404140
def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, session):
4141+
from airflow.models.taskinstancehistory import TaskInstanceHistory
4142+
41414143
dag_id = "test_adopt_or_reset_adoptable_tasks_" + adoptable_state.name
41424144
with dag_maker(dag_id=dag_id, schedule="@daily"):
41434145
task_id = dag_id + "_task"
@@ -4152,13 +4154,31 @@ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, sessi
41524154
ti = dr1.get_task_instances(session=session)[0]
41534155
ti.state = adoptable_state
41544156
ti.queued_by_job_id = old_job.id
4157+
old_ti_id = ti.id
4158+
old_try_number = ti.try_number
41554159
session.merge(ti)
41564160
session.merge(dr1)
41574161
session.commit()
41584162

41594163
num_reset_tis = self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
41604164
assert num_reset_tis == 1
41614165

4166+
ti.refresh_from_db(session=session)
4167+
assert ti.id != old_ti_id
4168+
assert (
4169+
session.scalar(
4170+
select(TaskInstanceHistory).where(
4171+
TaskInstanceHistory.dag_id == ti.dag_id,
4172+
TaskInstanceHistory.task_id == ti.task_id,
4173+
TaskInstanceHistory.run_id == ti.run_id,
4174+
TaskInstanceHistory.map_index == ti.map_index,
4175+
TaskInstanceHistory.try_number == old_try_number,
4176+
TaskInstanceHistory.task_instance_id == old_ti_id,
4177+
)
4178+
)
4179+
is not None
4180+
)
4181+
41624182
def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self, dag_maker, session):
41634183
dag_id = "test_reset_orphaned_tasks_external_triggered_dag"
41644184
with dag_maker(dag_id=dag_id, schedule="@daily"):

0 commit comments

Comments
 (0)