Skip to content

Commit 64bc285

Browse files
committed
Fix serialization tests
1 parent 5c2327e commit 64bc285

File tree

3 files changed

+35
-15
lines changed

3 files changed

+35
-15
lines changed

airflow-core/tests/unit/always/test_example_dags.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from tests_common.test_utils.asserts import assert_queries_count
3737
from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker
3838
from tests_common.test_utils.paths import AIRFLOW_PROVIDERS_ROOT_PATH, AIRFLOW_ROOT_PATH
39+
from tests_common.test_utils.providers import get_suspended_providers_folders
3940

4041
CURRENT_PYTHON_VERSION = f"{sys.version_info.major}.{sys.version_info.minor}"
4142
PROVIDERS_PREFIXES = ["providers/"]
@@ -96,19 +97,6 @@ def match_optional_dependencies(distribution_name: str, specifier: str | None) -
9697
return True, ""
9798

9899

99-
def get_suspended_providers_folders() -> list[str]:
100-
"""
101-
Returns a list of suspended providers folders that should be
102-
skipped when running tests (without any prefix - for example apache/beam, yandex, google etc.).
103-
"""
104-
suspended_providers = []
105-
for provider_path in AIRFLOW_PROVIDERS_ROOT_PATH.rglob("provider.yaml"):
106-
provider_yaml = yaml.safe_load(provider_path.read_text())
107-
if provider_yaml["state"] == "suspended":
108-
suspended_providers.append(provider_path.parent.resolve().as_posix())
109-
return suspended_providers
110-
111-
112100
def get_python_excluded_providers_folders() -> list[str]:
113101
"""
114102
Returns a list of providers folders that should be excluded for current Python version and

airflow-core/tests/unit/serialization/test_dag_serialization.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@
104104
GithubLink,
105105
MockOperator,
106106
)
107+
from tests_common.test_utils.providers import (
108+
IGNORE_MODULE_IMPORT_ERRORS,
109+
get_suspended_providers_folders,
110+
)
107111
from tests_common.test_utils.timetables import (
108112
CustomSerializationTimetable,
109113
cron_timetable,
@@ -424,8 +428,8 @@ def get_excluded_patterns() -> Generator[str, None, None]:
424428
(AIRFLOW_REPO_ROOT_PATH / "generated" / "provider_dependencies.json").read_text()
425429
)
426430
for provider, provider_info in all_providers.items():
431+
provider_path = provider.replace(".", "/")
427432
if python_version in provider_info.get("excluded-python-versions"):
428-
provider_path = provider.replace(".", "/")
429433
yield f"providers/{provider_path}"
430434
current_python_version = sys.version_info[:2]
431435
if current_python_version >= (3, 13):
@@ -455,9 +459,12 @@ def collect_dags(dag_folder=None):
455459
patterns = dag_folder
456460
else:
457461
patterns = [dag_folder]
462+
suspended_providers_path = get_suspended_providers_folders()
463+
458464
excluded_patterns = [
459465
f"{AIRFLOW_REPO_ROOT_PATH}/{excluded_pattern}" for excluded_pattern in get_excluded_patterns()
460-
]
466+
] + suspended_providers_path
467+
461468
with mock.patch("airflow.dag_processing.dagbag.settings.get_dagbag_import_timeout", return_value=60):
462469
for pattern in patterns:
463470
for directory in glob(f"{AIRFLOW_REPO_ROOT_PATH}/{pattern}"):
@@ -551,6 +558,11 @@ def test_serialization(self):
551558
# This "looks" like a problem, but is just a quirk of the parse-all-dags-in-one-process we do
552559
# in this test
553560
if "AirflowDagDuplicatedIdException: Ignoring DAG example_sagemaker" not in error
561+
# Ignore module import errors for any suspended provider paths used in example dags
562+
if any(
563+
f"{ignore_module_import_error}" not in error
564+
for ignore_module_import_error in IGNORE_MODULE_IMPORT_ERRORS
565+
)
554566
}
555567

556568
# Let's not be exact about this, but if everything fails to parse we should fail this test too

devel-common/src/tests_common/test_utils/providers.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
from __future__ import annotations
1919

2020
import semver
21+
import yaml
22+
23+
from tests_common.test_utils.paths import AIRFLOW_PROVIDERS_ROOT_PATH
2124

2225

2326
def object_exists(path: str):
@@ -62,3 +65,20 @@ def get_provider_min_airflow_version(provider_name: str) -> tuple[int, ...]:
6265
f"The provider should have `apache-airflow>=` in their dependencies: {provider_name}"
6366
)
6467
return Version(airflow_dep.split(">=")[1]).release
68+
69+
70+
# Ignore module import errors for any suspended provider paths used in example dags
71+
IGNORE_MODULE_IMPORT_ERRORS: list[str] = ["airflow.providers.apache.beam"]
72+
73+
74+
def get_suspended_providers_folders() -> list[str]:
75+
"""
76+
Returns a list of suspended providers folders that should be
77+
skipped when running tests (without any prefix - for example apache/beam, yandex, google etc.).
78+
"""
79+
suspended_providers = []
80+
for provider_path in AIRFLOW_PROVIDERS_ROOT_PATH.rglob("provider.yaml"):
81+
provider_yaml = yaml.safe_load(provider_path.read_text())
82+
if provider_yaml["state"] == "suspended":
83+
suspended_providers.append(provider_path.parent.resolve().as_posix())
84+
return suspended_providers

0 commit comments

Comments
 (0)