Skip to content

Commit 6901fe0

Browse files
authored
Gate multi team executor loader changes with feature flag (apache#56740)
Do not attempt to load executors from other teams unless core.multi_team=True
1 parent 6afab7b commit 6901fe0

File tree

2 files changed

+20
-7
lines changed

2 files changed

+20
-7
lines changed

airflow-core/src/airflow/executors/executor_loader.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,15 @@ def _get_team_executor_configs(cls) -> list[tuple[str | None, list[str]]]:
211211
executor_names = team_executor_config.strip("=")
212212
else:
213213
cls.block_use_of_multi_team()
214-
team_name, executor_names = team_executor_config.split("=")
214+
if conf.getboolean("core", "multi_team", fallback=False):
215+
team_name, executor_names = team_executor_config.split("=")
216+
else:
217+
log.warning(
218+
"The 'multi_team' config is not enabled, but team executors were configured. "
219+
"The following team executor config will be ignored: %s",
220+
team_executor_config,
221+
)
222+
continue
215223

216224
# Check for duplicate team names
217225
if team_name in seen_teams:

airflow-core/tests/unit/executors/test_executor_loader.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ def test_get_hybrid_executors_from_configs(self, executor_config, expected_execu
239239
mock.patch.object(executor_loader.ExecutorLoader, "block_use_of_multi_team"),
240240
mock.patch.object(executor_loader.ExecutorLoader, "_validate_teams_exist_in_database"),
241241
):
242-
with conf_vars({("core", "executor"): executor_config}):
242+
with conf_vars({("core", "executor"): executor_config, ("core", "multi_team"): "True"}):
243243
executors = executor_loader.ExecutorLoader._get_executor_names()
244244
assert executors == expected_executors_list
245245

@@ -391,10 +391,11 @@ def test_load_custom_executor_with_classname(self):
391391
def test_get_executor_names_set_module_variables(self):
392392
with conf_vars(
393393
{
394+
("core", "multi_team"): "True",
394395
(
395396
"core",
396397
"executor",
397-
): "=CeleryExecutor,LocalExecutor,fake_exec:unit.executors.test_executor_loader.FakeExecutor;team_a=CeleryExecutor,unit.executors.test_executor_loader.FakeExecutor;team_b=fake_exec:unit.executors.test_executor_loader.FakeExecutor"
398+
): "=CeleryExecutor,LocalExecutor,fake_exec:unit.executors.test_executor_loader.FakeExecutor;team_a=CeleryExecutor,unit.executors.test_executor_loader.FakeExecutor;team_b=fake_exec:unit.executors.test_executor_loader.FakeExecutor",
398399
}
399400
):
400401
celery_path = "airflow.providers.celery.executors.celery_executor.CeleryExecutor"
@@ -488,7 +489,7 @@ def test_get_executor_names_set_module_variables(self):
488489
def test_duplicate_team_names_should_fail(self, executor_config):
489490
"""Test that duplicate team names in executor configuration raise an exception."""
490491
with mock.patch.object(executor_loader.ExecutorLoader, "block_use_of_multi_team"):
491-
with conf_vars({("core", "executor"): executor_config}):
492+
with conf_vars({("core", "executor"): executor_config, ("core", "multi_team"): "True"}):
492493
with pytest.raises(
493494
AirflowConfigException,
494495
match=r"Team '.+' appears more than once in executor configuration",
@@ -529,7 +530,7 @@ def test_valid_team_configurations_order_preservation(self):
529530
mock.patch.object(executor_loader.ExecutorLoader, "block_use_of_multi_team"),
530531
mock.patch.object(executor_loader.ExecutorLoader, "_validate_teams_exist_in_database"),
531532
):
532-
with conf_vars({("core", "executor"): executor_config}):
533+
with conf_vars({("core", "executor"): executor_config, ("core", "multi_team"): "True"}):
533534
configs = executor_loader.ExecutorLoader._get_team_executor_configs()
534535
assert configs == expected_configs
535536

@@ -586,7 +587,10 @@ def test_team_validation_with_valid_teams_in_config(self):
586587
mock.patch.object(executor_loader.ExecutorLoader, "block_use_of_multi_team"),
587588
):
588589
with conf_vars(
589-
{("core", "executor"): "=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor"}
590+
{
591+
("core", "executor"): "=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor",
592+
("core", "multi_team"): "True",
593+
}
590594
):
591595
configs = executor_loader.ExecutorLoader._get_team_executor_configs()
592596

@@ -608,7 +612,8 @@ def test_team_validation_with_invalid_teams_in_config(self):
608612
(
609613
"core",
610614
"executor",
611-
): "=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor;team_c=KubernetesExecutor"
615+
): "=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor;team_c=KubernetesExecutor",
616+
("core", "multi_team"): "True",
612617
}
613618
):
614619
with pytest.raises(AirflowConfigException):

0 commit comments

Comments
 (0)