Skip to content

Commit 08a5859

Browse files
[v3-1-test] Fix callback files losing priority during queue resort (#61232) (#61243)
When the DAG processor resorts its file queue by modification time (e.g., after a bundle refresh), files with pending callbacks could lose their position at the front of the queue. This could delay callback execution (like DAG failure callbacks) if those files happened to have older modification times. The fix partitions the queue during resort: callback files stay at the front in their original order, while only regular files are sorted by mtime. (cherry picked from commit f5e70fc) Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
1 parent 3b41866 commit 08a5859

File tree

2 files changed

+51
-2
lines changed

2 files changed

+51
-2
lines changed

airflow-core/src/airflow/dag_processing/manager.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,8 +1009,21 @@ def _add_new_files_to_queue(self, known_files: dict[str, set[DagFileInfo]]):
10091009

10101010
def _resort_file_queue(self):
10111011
if self._file_parsing_sort_mode == "modified_time" and self._file_queue:
1012-
files, _ = self._sort_by_mtime(self._file_queue)
1013-
self._file_queue = deque(files)
1012+
# Separate files with pending callbacks from regular files
1013+
# Callbacks should stay at the front regardless of mtime
1014+
callback_files = []
1015+
regular_files = []
1016+
for file in self._file_queue:
1017+
if file in self._callback_to_execute:
1018+
callback_files.append(file)
1019+
else:
1020+
regular_files.append(file)
1021+
1022+
# Sort only the regular files by mtime
1023+
sorted_regular_files, _ = self._sort_by_mtime(regular_files)
1024+
1025+
# Put callback files at the front, then sorted regular files
1026+
self._file_queue = deque(callback_files + sorted_regular_files)
10141027

10151028
def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
10161029
files_with_mtime: dict[DagFileInfo, float] = {}

airflow-core/tests/unit/dag_processing/test_manager.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,42 @@ def test_resort_file_queue_does_nothing_when_alphabetical(self):
429429
# Order should remain unchanged
430430
assert list(manager._file_queue) == [file_b, file_a]
431431

432+
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
433+
@mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
434+
def test_resort_file_queue_keeps_callbacks_at_front(self):
435+
"""
436+
Check that files with pending callbacks stay at the front of the queue
437+
regardless of their modification time, and preserve their relative order.
438+
"""
439+
files_with_mtime = [
440+
("callback_1.py", 50.0), # has callback, oldest mtime
441+
("callback_2.py", 300.0), # has callback, newest mtime
442+
("regular_1.py", 100.0), # no callback
443+
("regular_2.py", 200.0), # no callback
444+
]
445+
filenames = encode_mtime_in_filename(files_with_mtime)
446+
dag_files = _get_file_infos(filenames)
447+
# dag_files[0] -> callback_1 (mtime 50)
448+
# dag_files[1] -> callback_2 (mtime 300)
449+
# dag_files[2] -> regular_1 (mtime 100)
450+
# dag_files[3] -> regular_2 (mtime 200)
451+
452+
manager = DagFileProcessorManager(max_runs=1)
453+
454+
# Queue order: callback_1, callback_2, regular_1, regular_2
455+
manager._file_queue = deque([dag_files[0], dag_files[1], dag_files[2], dag_files[3]])
456+
457+
# Both callback files have pending callbacks
458+
manager._callback_to_execute[dag_files[0]] = [MagicMock()]
459+
manager._callback_to_execute[dag_files[1]] = [MagicMock()]
460+
461+
manager._resort_file_queue()
462+
463+
# Callback files should stay at front in original order (callback_1, callback_2)
464+
# despite callback_1 having the oldest mtime and callback_2 having the newest
465+
# Regular files should be sorted by mtime (newest first): regular_2 (200), regular_1 (100)
466+
assert list(manager._file_queue) == [dag_files[0], dag_files[1], dag_files[3], dag_files[2]]
467+
432468
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
433469
@mock.patch("airflow.utils.file.os.path.getmtime")
434470
def test_recently_modified_file_is_parsed_with_mtime_mode(self, mock_getmtime):

0 commit comments

Comments
 (0)