Skip to content

Commit 8c3b26e

Browse files
authored
DagBag: use Path.relative_to for consistent cross-platform behavior (#59785)
1 parent 8d2e623 commit 8c3b26e

File tree

4 files changed

+85
-5
lines changed

4 files changed

+85
-5
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Modify the information returned by ``DagBag``
2+
3+
**New behavior:**
4+
- ``DagBag`` now uses ``Path.relative_to`` for consistent cross-platform behavior.
5+
- ``FileLoadStat`` now has two additional nullable fields: ``bundle_path`` and ``bundle_name``.
6+
7+
**Backward compatibility:**
8+
``FileLoadStat`` will no longer produce paths beginning with ``/`` with the meaning of "relative to the dags folder".
9+
This is a breaking change for any custom code that performs string-based path manipulations relying on this behavior.
10+
Users are advised to update such code to use ``pathlib.Path``.

airflow-core/src/airflow/dag_processing/dagbag.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ class FileLoadStat(NamedTuple):
9797
:param task_num: Total number of Tasks loaded in this file.
9898
:param dags: DAGs names loaded in this file.
9999
:param warning_num: Total number of warnings captured from processing this file.
100+
:param bundle_path: The bundle path from DagBag, if any.
101+
:param bundle_name: The bundle name from DagBag, if any.
100102
"""
101103

102104
file: str
@@ -105,6 +107,8 @@ class FileLoadStat(NamedTuple):
105107
task_num: int
106108
dags: str
107109
warning_num: int
110+
bundle_path: Path | None
111+
bundle_name: str | None
108112

109113

110114
@contextlib.contextmanager
@@ -647,14 +651,21 @@ def collect_dags(
647651
found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)
648652

649653
file_parse_end_dttm = timezone.utcnow()
654+
try:
655+
relative_file = Path(filepath).relative_to(Path(self.dag_folder)).as_posix()
656+
except ValueError:
657+
# filepath is not under dag_folder (e.g., example DAGs from a different location)
658+
relative_file = Path(filepath).as_posix()
650659
stats.append(
651660
FileLoadStat(
652-
file=filepath.replace(settings.DAGS_FOLDER, ""),
661+
file=relative_file,
653662
duration=file_parse_end_dttm - file_parse_start_dttm,
654663
dag_num=len(found_dags),
655664
task_num=sum(len(dag.tasks) for dag in found_dags),
656665
dags=str([dag.dag_id for dag in found_dags]),
657666
warning_num=len(self.captured_warnings.get(filepath, [])),
667+
bundle_path=self.bundle_path,
668+
bundle_name=self.bundle_name,
658669
)
659670
)
660671
except Exception as e:

airflow-core/tests/unit/cli/commands/test_dag_command.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,9 @@ def test_cli_report(self, stdout_capture):
225225
dag_command.dag_report(args)
226226
out = temp_stdout.getvalue()
227227

228-
assert "airflow/example_dags/example_complex.py" in out
229-
assert "example_complex" in out
228+
data = json.loads(out)
229+
assert any(item["file"].endswith("example_complex.py") for item in data)
230+
assert any("example_complex" in item["dags"] for item in data)
230231

231232
@conf_vars({("core", "load_examples"): "true"})
232233
def test_cli_get_dag_details(self, stdout_capture):

airflow-core/tests/unit/dag_processing/test_dagbag.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ def test_safe_mode_heuristic_match(self, tmp_path):
368368
dagbag = DagBag(include_examples=False, safe_mode=True)
369369

370370
assert len(dagbag.dagbag_stats) == 1
371-
assert dagbag.dagbag_stats[0].file == f"/{path.name}"
371+
assert dagbag.dagbag_stats[0].file == path.name
372372

373373
def test_safe_mode_heuristic_mismatch(self, tmp_path):
374374
"""
@@ -388,7 +388,65 @@ def test_safe_mode_disabled(self, tmp_path):
388388
with conf_vars({("core", "dags_folder"): os.fspath(path.parent)}):
389389
dagbag = DagBag(include_examples=False, safe_mode=False)
390390
assert len(dagbag.dagbag_stats) == 1
391-
assert dagbag.dagbag_stats[0].file == f"/{path.name}"
391+
assert dagbag.dagbag_stats[0].file == path.name
392+
393+
def test_dagbag_stats_file_is_relative_path_with_mixed_separators(self, tmp_path):
394+
"""
395+
Test that dagbag_stats.file contains a relative path even when DAGS_FOLDER
396+
and filepath have different path separators (simulates Windows behavior).
397+
398+
On Windows, settings.DAGS_FOLDER may use forward slashes (e.g., 'C:/foo/dags')
399+
while filepath from os.path operations uses backslashes (e.g., 'C:\\foo\\dags\\my_dag.py').
400+
This test verifies that path normalization works correctly in such cases.
401+
402+
See: https://github.com/apache/airflow/issues/XXXXX
403+
"""
404+
path = tmp_path / "testfile.py"
405+
path.write_text("# airflow\n# DAG")
406+
407+
# Simulate the Windows scenario where DAGS_FOLDER has forward slashes
408+
# but the filesystem returns paths with backslashes
409+
dags_folder_with_forward_slashes = path.parent.as_posix()
410+
with conf_vars({("core", "dags_folder"): dags_folder_with_forward_slashes}):
411+
dagbag = DagBag(include_examples=False, safe_mode=True)
412+
413+
assert len(dagbag.dagbag_stats) == 1
414+
assert dagbag.dagbag_stats[0].file == path.name
415+
416+
def test_dagbag_stats_includes_bundle_info(self, tmp_path):
417+
"""Test that FileLoadStat includes bundle_path and bundle_name from DagBag."""
418+
path = tmp_path / "testfile.py"
419+
path.write_text("# airflow\n# DAG")
420+
421+
bundle_path = tmp_path / "bundle"
422+
bundle_path.mkdir()
423+
bundle_name = "test_bundle"
424+
425+
with conf_vars({("core", "dags_folder"): os.fspath(path.parent)}):
426+
dagbag = DagBag(
427+
include_examples=False,
428+
safe_mode=True,
429+
bundle_path=bundle_path,
430+
bundle_name=bundle_name,
431+
)
432+
433+
assert len(dagbag.dagbag_stats) == 1
434+
stat = dagbag.dagbag_stats[0]
435+
assert stat.bundle_path == bundle_path
436+
assert stat.bundle_name == bundle_name
437+
438+
def test_dagbag_stats_bundle_info_none_when_not_provided(self, tmp_path):
439+
"""Test that FileLoadStat has None for bundle_path and bundle_name when not provided."""
440+
path = tmp_path / "testfile.py"
441+
path.write_text("# airflow\n# DAG")
442+
443+
with conf_vars({("core", "dags_folder"): os.fspath(path.parent)}):
444+
dagbag = DagBag(include_examples=False, safe_mode=True)
445+
446+
assert len(dagbag.dagbag_stats) == 1
447+
stat = dagbag.dagbag_stats[0]
448+
assert stat.bundle_path is None
449+
assert stat.bundle_name is None
392450

393451
def test_process_file_that_contains_multi_bytes_char(self, tmp_path):
394452
"""

0 commit comments

Comments
 (0)