Skip to content

Commit 7fc8098

Browse files
authored
Re-enable uploading to remote blob storage from Trigger and executors for the beta (apache#47115)
The way this achieved is the epitome of hackyness, but it works well enough for S3, GCS and WASB task handlers. It won't work for the "non-blob" based logging backends such as CloudWatchLogs, and it only works with the way the 3 blob-based log backends are currently implemented. I want remote logs to be working for most people for the Beta, and I will follow up with a better fix (that doesn't rely on internal implementation details and works for log "streams" too.)
1 parent 133d2f8 commit 7fc8098

File tree

7 files changed

+92
-16
lines changed

7 files changed

+92
-16
lines changed

airflow/jobs/triggerer_job_runner.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,15 @@ def __call__(self, processors: Iterable[structlog.typing.Processor]) -> WrappedL
234234
self.bound_logger = logger
235235
return logger
236236

237+
def upload_to_remote(self):
238+
from airflow.sdk.log import upload_to_remote
239+
240+
if not hasattr(self, "bound_logger"):
241+
# Never actually called, nothing to do
242+
return
243+
244+
upload_to_remote(self.bound_logger)
245+
237246

238247
@attrs.define(kw_only=True)
239248
class TriggerRunnerSupervisor(WatchedSubprocess):
@@ -299,7 +308,8 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger) -
299308
self.cancelling_triggers.discard(id)
300309
# Remove logger from the cache, and since structlog doesn't have an explicit close method, we
301310
# only need to remove the last reference to it to close the open FH
302-
self.logger_cache.pop(id, None)
311+
if factory := self.logger_cache.pop(id, None):
312+
factory.upload_to_remote()
303313
return
304314

305315
raise ValueError(f"Unknown message type {type(msg)}")
@@ -460,10 +470,8 @@ def update_triggers(self, requested_trigger_ids: set[int]):
460470
self.cancelling_triggers.update(cancel_trigger_ids)
461471
self._send(messages.CancelTriggers(ids=cancel_trigger_ids))
462472

463-
def _register_pipe_readers(
464-
self, logger: FilteringBoundLogger, stdout: socket, stderr: socket, requests: socket, logs: socket
465-
):
466-
super()._register_pipe_readers(logger, stdout, stderr, requests, logs)
473+
def _register_pipe_readers(self, stdout: socket, stderr: socket, requests: socket, logs: socket):
474+
super()._register_pipe_readers(stdout, stderr, requests, logs)
467475

468476
# We want to handle logging differently here, so un-register the one our parent class created
469477
self.selector.unregister(logs)
@@ -488,7 +496,9 @@ def _process_log_messages_from_subprocess(self) -> Generator[None, bytes, None]:
488496

489497
def get_logger(trigger_id: int) -> WrappedLogger:
490498
# TODO: Is a separate dict worth it, or should we make `self.running_triggers` a dict?
491-
return self.logger_cache[trigger_id](processors)
499+
if factory := self.logger_cache.get(trigger_id):
500+
return factory(processors)
501+
return fallback_log
492502

493503
# We need to look at the json, pull out the
494504
while True:

airflow/models/trigger.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,10 @@ def submit_event(cls, trigger_id, event: events.TriggerEvent, session: Session =
246246
handle_event_submit(event, task_instance=task_instance, session=session)
247247

248248
# Send an event to assets
249-
trigger = session.scalars(select(cls).where(cls.id == trigger_id)).one()
249+
trigger = session.scalars(select(cls).where(cls.id == trigger_id)).one_or_none()
250+
if trigger is None:
251+
# Already deleted for some reason
252+
return
250253
for asset in trigger.assets:
251254
AssetManager.register_asset_change(
252255
asset=asset.to_public(), session=session, extra={"from_trigger": True}

task_sdk/src/airflow/sdk/execution_time/supervisor.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,8 @@ class WatchedSubprocess:
316316

317317
selector: selectors.BaseSelector = attrs.field(factory=selectors.DefaultSelector)
318318

319+
log: FilteringBoundLogger
320+
319321
@classmethod
320322
def start(
321323
cls,
@@ -363,17 +365,17 @@ def start(
363365
# other end of the pair open
364366
cls._close_unused_sockets(child_stdin, child_stdout, child_stderr, child_comms, child_logs)
365367

368+
logger = logger or cast("FilteringBoundLogger", structlog.get_logger(logger_name="task").bind())
366369
proc = cls(
367370
pid=pid,
368371
stdin=feed_stdin,
369372
process=psutil.Process(pid),
370373
requests_fd=requests_fd,
374+
log=logger,
371375
**constructor_kwargs,
372376
)
373377

374-
logger = logger or cast("FilteringBoundLogger", structlog.get_logger(logger_name="task").bind())
375378
proc._register_pipe_readers(
376-
logger=logger,
377379
stdout=read_stdout,
378380
stderr=read_stderr,
379381
requests=read_msgs,
@@ -382,26 +384,24 @@ def start(
382384

383385
return proc
384386

385-
def _register_pipe_readers(
386-
self, logger: FilteringBoundLogger, stdout: socket, stderr: socket, requests: socket, logs: socket
387-
):
387+
def _register_pipe_readers(self, stdout: socket, stderr: socket, requests: socket, logs: socket):
388388
"""Register handlers for subprocess communication channels."""
389389
# self.selector is a way of registering a handler/callback to be called when the given IO channel has
390390
# activity to read on (https://www.man7.org/linux/man-pages/man2/select.2.html etc, but better
391391
# alternatives are used automatically) -- this is a way of having "event-based" code, but without
392392
# needing full async, to read and process output from each socket as it is received.
393393

394-
self.selector.register(stdout, selectors.EVENT_READ, self._create_socket_handler(logger, "stdout"))
394+
self.selector.register(stdout, selectors.EVENT_READ, self._create_socket_handler(self.log, "stdout"))
395395
self.selector.register(
396396
stderr,
397397
selectors.EVENT_READ,
398-
self._create_socket_handler(logger, "stderr", log_level=logging.ERROR),
398+
self._create_socket_handler(self.log, "stderr", log_level=logging.ERROR),
399399
)
400400
self.selector.register(
401401
logs,
402402
selectors.EVENT_READ,
403403
make_buffered_socket_reader(
404-
process_log_messages_from_subprocess(logger), on_close=self._on_socket_closed
404+
process_log_messages_from_subprocess(self.log), on_close=self._on_socket_closed
405405
),
406406
)
407407
self.selector.register(
@@ -658,8 +658,24 @@ def wait(self) -> int:
658658
self.client.task_instances.finish(
659659
id=self.id, state=self.final_state, when=datetime.now(tz=timezone.utc)
660660
)
661+
662+
# Now at the last possible moment, when all logs and comms with the subprocess has finished, lets
663+
# upload the remote logs
664+
self._upload_logs()
665+
661666
return self._exit_code
662667

668+
def _upload_logs(self):
669+
"""
670+
Upload all log files found to the remote storage.
671+
672+
We upload logs from here after the task has finished to give us the best possible chance of logs being
673+
uploaded in case the task task.
674+
"""
675+
from airflow.sdk.log import upload_to_remote
676+
677+
upload_to_remote(self.log)
678+
663679
def _monitor_subprocess(self):
664680
"""
665681
Monitor the subprocess until it exits.

task_sdk/src/airflow/sdk/log.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import structlog
3232

3333
if TYPE_CHECKING:
34-
from structlog.typing import EventDict, ExcInfo, Processor
34+
from structlog.typing import EventDict, ExcInfo, FilteringBoundLogger, Processor
3535

3636

3737
__all__ = [
@@ -456,3 +456,44 @@ def init_log_file(local_relative_path: str) -> Path:
456456
log.warning("OSError while changing ownership of the log file. %s", e)
457457

458458
return full_path
459+
460+
461+
def upload_to_remote(logger: FilteringBoundLogger):
462+
# We haven't yet switched the Remote log handlers over, they are still wired up in providers as
463+
# logging.Handlers (but we should re-write most of them to just be the upload and read instead of full
464+
# variants.) In the mean time, lets just create the right handler directly
465+
from airflow.configuration import conf
466+
from airflow.utils.log.file_task_handler import FileTaskHandler
467+
from airflow.utils.log.log_reader import TaskLogReader
468+
469+
raw_logger = getattr(logger, "_logger")
470+
471+
if not raw_logger or not hasattr(raw_logger, "_file"):
472+
logger.warning("Unable to find log file, logger was of unexpected type", type=type(logger))
473+
return
474+
475+
fh = raw_logger._file
476+
fname = fh.name
477+
478+
if fh.fileno() == 1 or not isinstance(fname, str):
479+
# Logging to stdout, or something odd about this logger, don't try to upload!
480+
return
481+
base_log_folder = conf.get("logging", "base_log_folder")
482+
relative_path = Path(fname).relative_to(base_log_folder)
483+
484+
handler = TaskLogReader().log_handler
485+
if not isinstance(handler, FileTaskHandler):
486+
logger.warning(
487+
"Airflow core logging is not using a FileTaskHandler, can't upload logs to remote",
488+
handler=type(handler),
489+
)
490+
return
491+
492+
# This is a _monstrosity_, and super fragile, but we don't want to do the base FileTaskHandler
493+
# set_context() which opens a real FH again. (And worse, in some cases it _truncates_ the file too). This
494+
# is just for the first Airflow 3 betas, but we will re-write a better remote log interface that isn't
495+
# tied to being a logging Handler.
496+
handler.log_relative_path = relative_path.as_posix() # type: ignore[attr-defined]
497+
handler.upload_on_close = True # type: ignore[attr-defined]
498+
499+
handler.close()

task_sdk/tests/execution_time/test_supervisor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,7 @@ def test_heartbeat_failures_handling(self, monkeypatch, mocker, captured_logs, t
505505
mock_kill = mocker.patch("airflow.sdk.execution_time.supervisor.WatchedSubprocess.kill")
506506

507507
proc = ActivitySubprocess(
508+
log=mocker.MagicMock(),
508509
id=TI_ID,
509510
pid=mock_process.pid,
510511
stdin=mocker.MagicMock(),
@@ -594,6 +595,7 @@ def test_overtime_handling(
594595
monkeypatch.setattr(ActivitySubprocess, "TASK_OVERTIME_THRESHOLD", overtime_threshold)
595596

596597
mock_watched_subprocess = ActivitySubprocess(
598+
log=mocker.MagicMock(),
597599
id=TI_ID,
598600
pid=12345,
599601
stdin=mocker.Mock(),
@@ -735,6 +737,7 @@ def mock_process(self, mocker):
735737
@pytest.fixture
736738
def watched_subprocess(self, mocker, mock_process):
737739
proc = ActivitySubprocess(
740+
log=mocker.MagicMock(),
738741
id=TI_ID,
739742
pid=12345,
740743
stdin=mocker.Mock(),
@@ -918,6 +921,7 @@ class TestHandleRequest:
918921
def watched_subprocess(self, mocker):
919922
"""Fixture to provide a WatchedSubprocess instance."""
920923
return ActivitySubprocess(
924+
log=mocker.MagicMock(),
921925
id=TI_ID,
922926
pid=12345,
923927
stdin=BytesIO(),

tests/dag_processing/test_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ def mock_processor(self) -> DagFileProcessorProcess:
137137
proc.create_time.return_value = time.time()
138138
proc.wait.return_value = 0
139139
ret = DagFileProcessorProcess(
140+
log=MagicMock(),
140141
id=uuid7(),
141142
pid=1234,
142143
process=proc,

tests/jobs/test_triggerer_job.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def builder(job=None):
149149

150150
process = mocker.Mock(spec=psutil.Process, pid=10 * job.id + 1)
151151
proc = TriggerRunnerSupervisor(
152+
log=mocker.Mock(),
152153
id=job.id,
153154
job=job,
154155
pid=process.pid,

0 commit comments

Comments
 (0)