Skip to content

Commit af10644

Browse files
amoghrajeshkaxilashb
authored
Improve detection and handling of timed out DAG processor processes (apache#49868)
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com> Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
1 parent 75d56f1 commit af10644

File tree

4 files changed

+9
-9
lines changed

4 files changed

+9
-9
lines changed

airflow-core/src/airflow/dag_processing/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,7 @@ def prepare_file_queue(self, known_files: dict[str, set[DagFileInfo]]):
992992

993993
def _kill_timed_out_processors(self):
994994
"""Kill any file processors that timeout to defend against process hangs."""
995-
now = time.time()
995+
now = time.monotonic()
996996
processors_to_remove = []
997997
for file, processor in self._processors.items():
998998
duration = now - processor.start_time

airflow-core/src/airflow/dag_processing/processor.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,5 @@ def is_ready(self) -> bool:
312312

313313
return self._num_open_sockets == 0
314314

315-
@property
316-
def start_time(self) -> float:
317-
return self._process.create_time()
318-
319315
def wait(self) -> int:
320316
raise NotImplementedError(f"Don't call wait on {type(self).__name__} objects")

airflow-core/tests/unit/dag_processing/test_manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def teardown_class(self):
133133
clear_db_import_errors()
134134
clear_db_dag_bundles()
135135

136-
def mock_processor(self) -> tuple[DagFileProcessorProcess, socket]:
136+
def mock_processor(self, start_time: float | None = None) -> tuple[DagFileProcessorProcess, socket]:
137137
proc = MagicMock()
138138
logger_filehandle = MagicMock()
139139
proc.create_time.return_value = time.time()
@@ -148,6 +148,8 @@ def mock_processor(self) -> tuple[DagFileProcessorProcess, socket]:
148148
requests_fd=123,
149149
logger_filehandle=logger_filehandle,
150150
)
151+
if start_time:
152+
ret.start_time = start_time
151153
ret._num_open_sockets = 0
152154
return ret, read_end
153155

@@ -518,9 +520,7 @@ def test_scan_stale_dags(self, testing_dag_bundle):
518520

519521
def test_kill_timed_out_processors_kill(self):
520522
manager = DagFileProcessorManager(max_runs=1, processor_timeout=5)
521-
522-
processor, _ = self.mock_processor()
523-
processor._process.create_time.return_value = timezone.make_aware(datetime.min).timestamp()
523+
processor, _ = self.mock_processor(start_time=16000)
524524
manager._processors = {
525525
DagFileInfo(
526526
bundle_name="testing", rel_path=Path("abc.txt"), bundle_path=TEST_DAGS_FOLDER

task-sdk/src/airflow/sdk/execution_time/supervisor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,9 @@ class WatchedSubprocess:
428428
subprocess_logs_to_stdout: bool = False
429429
"""Duplicate log messages to stdout, or only send them to ``self.process_log``."""
430430

431+
start_time: float = attrs.field(factory=time.monotonic)
432+
"""The start time of the child process."""
433+
431434
@classmethod
432435
def start(
433436
cls,
@@ -481,6 +484,7 @@ def start(
481484
process=psutil.Process(pid),
482485
requests_fd=requests_fd,
483486
process_log=logger,
487+
start_time=time.monotonic(),
484488
**constructor_kwargs,
485489
)
486490

0 commit comments

Comments
 (0)