Skip to content

Commit 2b8818a

Browse files
[v2-11-test] Stop streaming task logs if end of log mark is missing (#51904)
Sometimes, somehow, the end of log mark can be missing, and when that happens the streaming log reader enters an infinite loop. Instead, if the task is in a non-running state and we stop receiving log lines but never get the end of log mark, we assume we won't and stop trying. We do tell emit that we are stopping though.
1 parent 868efd2 commit 2b8818a

File tree

2 files changed

+33
-1
lines changed

2 files changed

+33
-1
lines changed

airflow/utils/log/log_reader.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ class TaskLogReader:
3939
STREAM_LOOP_SLEEP_SECONDS = 1
4040
"""Time to sleep between loops while waiting for more logs"""
4141

42+
STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 5
43+
"""Number of empty loop iterations before stopping the stream"""
44+
4245
def read_log_chunks(
4346
self, ti: TaskInstance, try_number: int | None, metadata
4447
) -> tuple[list[tuple[tuple[str, str]]], dict[str, str]]:
@@ -83,6 +86,7 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
8386
metadata.pop("max_offset", None)
8487
metadata.pop("offset", None)
8588
metadata.pop("log_pos", None)
89+
empty_iterations = 0
8690
while True:
8791
logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
8892
for host, log in logs[0]:
@@ -91,10 +95,17 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
9195
not metadata["end_of_log"]
9296
and ti.state not in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED)
9397
):
94-
if not logs[0]:
98+
if logs[0]:
99+
empty_iterations = 0
100+
else:
95101
# we did not receive any logs in this loop
96102
# sleeping to conserve resources / limit requests on external services
97103
time.sleep(self.STREAM_LOOP_SLEEP_SECONDS)
104+
empty_iterations += 1
105+
if empty_iterations >= self.STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS:
106+
# we have not received any logs for a while, so we stop the stream
107+
yield "\n(Log stream stopped - End of log marker not found; logs may be incomplete.)\n"
108+
break
98109
else:
99110
break
100111

tests/utils/log/test_log_reader.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,27 @@ def test_read_log_stream_should_read_each_try_in_turn(self, mock_read):
225225
any_order=False,
226226
)
227227

228+
@mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read")
229+
def test_read_log_stream_no_end_of_log_marker(self, mock_read):
230+
mock_read.side_effect = [
231+
([[("", "hello")]], [{"end_of_log": False}]),
232+
([[]], [{"end_of_log": False}]),
233+
([[]], [{"end_of_log": False}]),
234+
([[]], [{"end_of_log": False}]),
235+
([[]], [{"end_of_log": False}]),
236+
([[]], [{"end_of_log": False}]),
237+
]
238+
239+
self.ti.state = TaskInstanceState.SUCCESS
240+
task_log_reader = TaskLogReader()
241+
task_log_reader.STREAM_LOOP_SLEEP_SECONDS = 0.001 # to speed up the test
242+
log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={})
243+
assert list(log_stream) == [
244+
"\nhello\n",
245+
"\n(Log stream stopped - End of log marker not found; logs may be incomplete.)\n",
246+
]
247+
assert mock_read.call_count == 6
248+
228249
def test_supports_external_link(self):
229250
task_log_reader = TaskLogReader()
230251

0 commit comments

Comments
 (0)