Skip to content

Commit 43deb99

Browse files
authored
Temporary fix to allow remote logging upload for task sdk (apache#47668)
1 parent 950107f commit 43deb99

File tree

2 files changed

+30
-4
lines changed

2 files changed

+30
-4
lines changed

task-sdk/src/airflow/sdk/execution_time/supervisor.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,8 @@ class ActivitySubprocess(WatchedSubprocess):
645645
TASK_OVERTIME_THRESHOLD: ClassVar[float] = 20.0
646646
_task_end_time_monotonic: float | None = attrs.field(default=None, init=False)
647647

648+
_what: TaskInstance | None = attrs.field(default=None, init=False)
649+
648650
decoder: ClassVar[TypeAdapter[ToSupervisor]] = TypeAdapter(ToSupervisor)
649651

650652
@classmethod
@@ -667,6 +669,7 @@ def start( # type: ignore[override]
667669

668670
def _on_child_started(self, ti: TaskInstance, dag_rel_path: str | os.PathLike[str], bundle_info):
669671
"""Send startup message to the subprocess."""
672+
self._what = ti
670673
start_date = datetime.now(tz=timezone.utc)
671674
try:
672675
# We've forked, but the task won't start doing anything until we send it the StartupDetails
@@ -735,7 +738,17 @@ def _upload_logs(self):
735738
"""
736739
from airflow.sdk.log import upload_to_remote
737740

738-
upload_to_remote(self.log)
741+
log_meta_dict = (
742+
{
743+
"dag_id": self._what.dag_id,
744+
"task_id": self._what.task_id,
745+
"run_id": self._what.run_id,
746+
"try_number": str(self._what.try_number),
747+
}
748+
if self._what
749+
else {}
750+
)
751+
upload_to_remote(self.log, log_meta_dict)
739752

740753
def _monitor_subprocess(self):
741754
"""

task-sdk/src/airflow/sdk/log.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -458,13 +458,25 @@ def init_log_file(local_relative_path: str) -> Path:
458458
return full_path
459459

460460

461-
def upload_to_remote(logger: FilteringBoundLogger):
461+
def load_remote_log_handler() -> logging.Handler | None:
462+
from airflow.logging_config import configure_logging as airflow_configure_logging
463+
from airflow.utils.log.log_reader import TaskLogReader
464+
465+
try:
466+
airflow_configure_logging()
467+
468+
return TaskLogReader().log_handler
469+
finally:
470+
# This is a _monstrosity_ but put our logging back immediately...
471+
configure_logging()
472+
473+
474+
def upload_to_remote(logger: FilteringBoundLogger, log_meta_dict: dict[str, Any]):
462475
# We haven't yet switched the Remote log handlers over, they are still wired up in providers as
463476
# logging.Handlers (but we should re-write most of them to just be the upload and read instead of full
464477
# variants.) In the mean time, lets just create the right handler directly
465478
from airflow.configuration import conf
466479
from airflow.utils.log.file_task_handler import FileTaskHandler
467-
from airflow.utils.log.log_reader import TaskLogReader
468480

469481
raw_logger = getattr(logger, "_logger")
470482

@@ -481,7 +493,7 @@ def upload_to_remote(logger: FilteringBoundLogger):
481493
base_log_folder = conf.get("logging", "base_log_folder")
482494
relative_path = Path(fname).relative_to(base_log_folder)
483495

484-
handler = TaskLogReader().log_handler
496+
handler = load_remote_log_handler()
485497
if not isinstance(handler, FileTaskHandler):
486498
logger.warning(
487499
"Airflow core logging is not using a FileTaskHandler, can't upload logs to remote",
@@ -493,6 +505,7 @@ def upload_to_remote(logger: FilteringBoundLogger):
493505
# set_context() which opens a real FH again. (And worse, in some cases it _truncates_ the file too). This
494506
# is just for the first Airflow 3 betas, but we will re-write a better remote log interface that isn't
495507
# tied to being a logging Handler.
508+
handler.log_meta_dict = log_meta_dict # type: ignore[attr-defined]
496509
handler.log_relative_path = relative_path.as_posix() # type: ignore[attr-defined]
497510
handler.upload_on_close = True # type: ignore[attr-defined]
498511

0 commit comments

Comments
 (0)