Skip to content
This repository was archived by the owner on Apr 3, 2024. It is now read-only.

Commit 9636f32

Browse files
committed
backport deadlock and task details fix
1 parent 16bafda commit 9636f32

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

airflow/jobs/scheduler_job.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from contextlib import redirect_stderr, redirect_stdout, suppress
3232
from datetime import timedelta
3333
from multiprocessing.connection import Connection as MultiprocessingConnection
34-
from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
34+
from typing import Any, Callable, DefaultDict, Dict, Iterator, Iterable, List, Optional, Set, Tuple
3535

3636
from setproctitle import setproctitle
3737
from sqlalchemy import and_, func, not_, or_, tuple_
@@ -1218,7 +1218,15 @@ def _process_executor_events(self, session: Session = None) -> int:
12181218

12191219
# Check state of finished tasks
12201220
filter_for_tis = TI.filter_for_tis(tis_with_right_state)
1221-
tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all()
1221+
query = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model'))
1222+
# row lock this entire set of taskinstances to make sure the scheduler doesn't fail when we have
1223+
# multi-schedulers
1224+
tis: Iterator[TI] = with_row_locks(
1225+
query,
1226+
of=TI,
1227+
session=session,
1228+
**skip_locked(session=session),
1229+
)
12221230
for ti in tis:
12231231
try_number = ti_primary_key_to_try_number_map[ti.key.primary]
12241232
buffer_key = ti.key.with_try_number(try_number)

airflow/models/taskinstance.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,10 @@ def refresh_from_db(self, session=None, lock_for_update=False) -> None:
623623
self.priority_weight = ti.priority_weight
624624
self.operator = ti.operator
625625
self.queued_dttm = ti.queued_dttm
626+
self.queued_by_job_id = ti.queued_by_job_id
626627
self.pid = ti.pid
628+
self.executor_config = ti.executor_config
629+
self.external_executor_id = ti.external_executor_id
627630
else:
628631
self.state = None
629632

0 commit comments

Comments
 (0)