Skip to content

Commit 83643e5

Browse files
authored
Remove core Airflow support for static hybrid executors (apache#47322)
Remove all the handholding and custom logic we have in core airflow which allows the use of static hybrid executors like LocalKubernetesExecutor and CeleryKubernetesExecutor. These executors will still work on 2.X versions of Airflow, but moving forward they will not be supported on Airflow 3
1 parent c505411 commit 83643e5

File tree

11 files changed

+10
-81
lines changed

11 files changed

+10
-81
lines changed

airflow/config_templates/config.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@ core:
7171
description: |
7272
The executor class that airflow should use. Choices include
7373
``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``,
74-
``KubernetesExecutor``, ``CeleryKubernetesExecutor``, ``LocalKubernetesExecutor`` or the
75-
full import path to the class when using a custom executor.
74+
``KubernetesExecutor`` or the full import path to the class when using a custom executor.
7675
version_added: ~
7776
type: string
7877
example: ~

airflow/executors/executor_constants.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,15 @@ class ConnectorSource(Enum):
2828

2929

3030
LOCAL_EXECUTOR = "LocalExecutor"
31-
LOCAL_KUBERNETES_EXECUTOR = "LocalKubernetesExecutor"
3231
SEQUENTIAL_EXECUTOR = "SequentialExecutor"
3332
CELERY_EXECUTOR = "CeleryExecutor"
34-
CELERY_KUBERNETES_EXECUTOR = "CeleryKubernetesExecutor"
3533
KUBERNETES_EXECUTOR = "KubernetesExecutor"
3634
DEBUG_EXECUTOR = "DebugExecutor"
3735
MOCK_EXECUTOR = "MockExecutor"
3836
CORE_EXECUTOR_NAMES = {
3937
LOCAL_EXECUTOR,
40-
LOCAL_KUBERNETES_EXECUTOR,
4138
SEQUENTIAL_EXECUTOR,
4239
CELERY_EXECUTOR,
43-
CELERY_KUBERNETES_EXECUTOR,
4440
KUBERNETES_EXECUTOR,
4541
DEBUG_EXECUTOR,
4642
MOCK_EXECUTOR,

airflow/executors/executor_loader.py

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,10 @@
2525
from airflow.exceptions import AirflowConfigException, UnknownExecutorException
2626
from airflow.executors.executor_constants import (
2727
CELERY_EXECUTOR,
28-
CELERY_KUBERNETES_EXECUTOR,
2928
CORE_EXECUTOR_NAMES,
3029
DEBUG_EXECUTOR,
3130
KUBERNETES_EXECUTOR,
3231
LOCAL_EXECUTOR,
33-
LOCAL_KUBERNETES_EXECUTOR,
3432
SEQUENTIAL_EXECUTOR,
3533
ConnectorSource,
3634
)
@@ -59,12 +57,8 @@ class ExecutorLoader:
5957

6058
executors = {
6159
LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
62-
LOCAL_KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
63-
"executors.local_kubernetes_executor.LocalKubernetesExecutor",
6460
SEQUENTIAL_EXECUTOR: "airflow.executors.sequential_executor.SequentialExecutor",
6561
CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor",
66-
CELERY_KUBERNETES_EXECUTOR: "airflow.providers.celery."
67-
"executors.celery_kubernetes_executor.CeleryKubernetesExecutor",
6862
KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
6963
"executors.kubernetes_executor.KubernetesExecutor",
7064
DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
@@ -265,17 +259,12 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor
265259
_executor_name = executor_name
266260

267261
try:
268-
if _executor_name.alias == CELERY_KUBERNETES_EXECUTOR:
269-
executor = cls.__load_celery_kubernetes_executor()
270-
elif _executor_name.alias == LOCAL_KUBERNETES_EXECUTOR:
271-
executor = cls.__load_local_kubernetes_executor()
262+
executor_cls, import_source = cls.import_executor_cls(_executor_name)
263+
log.debug("Loading executor %s from %s", _executor_name, import_source.value)
264+
if _executor_name.team_id:
265+
executor = executor_cls(team_id=_executor_name.team_id)
272266
else:
273-
executor_cls, import_source = cls.import_executor_cls(_executor_name)
274-
log.debug("Loading executor %s from %s", _executor_name, import_source.value)
275-
if _executor_name.team_id:
276-
executor = executor_cls(team_id=_executor_name.team_id)
277-
else:
278-
executor = executor_cls()
267+
executor = executor_cls()
279268

280269
except ImportError as e:
281270
log.error(e)
@@ -315,19 +304,3 @@ def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSourc
315304
executor_name = cls.get_default_executor_name()
316305
executor, source = cls.import_executor_cls(executor_name)
317306
return executor, source
318-
319-
@classmethod
320-
def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
321-
celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
322-
kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
323-
324-
celery_kubernetes_executor_cls = import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR])
325-
return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor)
326-
327-
@classmethod
328-
def __load_local_kubernetes_executor(cls) -> BaseExecutor:
329-
local_executor = import_string(cls.executors[LOCAL_EXECUTOR])()
330-
kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()
331-
332-
local_kubernetes_executor_cls = import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR])
333-
return local_kubernetes_executor_cls(local_executor, kubernetes_executor)

airflow/settings.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -687,11 +687,7 @@ def initialize():
687687
LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)
688688

689689
# Determines if the executor utilizes Kubernetes
690-
IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in {
691-
executor_constants.KUBERNETES_EXECUTOR,
692-
executor_constants.CELERY_KUBERNETES_EXECUTOR,
693-
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
694-
}
690+
IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") == executor_constants.KUBERNETES_EXECUTOR
695691

696692
# Executors can set this to true to configure logging correctly for
697693
# containerized executors.

tests/cli/commands/local_commands/test_scheduler_command.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ def setup_class(cls):
4545
("LocalExecutor", True),
4646
("SequentialExecutor", True),
4747
("KubernetesExecutor", False),
48-
("LocalKubernetesExecutor", True),
4948
],
5049
)
5150
@mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner")

tests/cli/commands/local_commands/test_standalone_command.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,9 @@
2626
from airflow.executors import executor_loader
2727
from airflow.executors.executor_constants import (
2828
CELERY_EXECUTOR,
29-
CELERY_KUBERNETES_EXECUTOR,
3029
DEBUG_EXECUTOR,
3130
KUBERNETES_EXECUTOR,
3231
LOCAL_EXECUTOR,
33-
LOCAL_KUBERNETES_EXECUTOR,
3432
SEQUENTIAL_EXECUTOR,
3533
)
3634

@@ -40,17 +38,13 @@ class TestStandaloneCommand:
4038
"conf_executor_name, conf_sql_alchemy_conn, expected_standalone_executor",
4139
[
4240
(LOCAL_EXECUTOR, "sqlite_conn_string", LOCAL_EXECUTOR),
43-
(LOCAL_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
4441
(SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
4542
(CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
46-
(CELERY_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
4743
(KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
4844
(DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
4945
(LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
50-
(LOCAL_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
5146
(SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR),
5247
(CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
53-
(CELERY_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
5448
(KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
5549
(DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
5650
],

tests/cli/conftest.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323

2424
from airflow.executors import local_executor
2525
from airflow.models.dagbag import DagBag
26-
from airflow.providers.celery.executors import celery_executor, celery_kubernetes_executor
27-
from airflow.providers.cncf.kubernetes.executors import kubernetes_executor, local_kubernetes_executor
26+
from airflow.providers.celery.executors import celery_executor
27+
from airflow.providers.cncf.kubernetes.executors import kubernetes_executor
2828

2929
from tests_common.test_utils.config import conf_vars
3030

@@ -33,15 +33,9 @@
3333
custom_executor_module.CustomCeleryExecutor = type( # type: ignore
3434
"CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
3535
)
36-
custom_executor_module.CustomCeleryKubernetesExecutor = type( # type: ignore
37-
"CustomCeleryKubernetesExecutor", (celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
38-
)
3936
custom_executor_module.CustomLocalExecutor = type( # type: ignore
4037
"CustomLocalExecutor", (local_executor.LocalExecutor,), {}
4138
)
42-
custom_executor_module.CustomLocalKubernetesExecutor = type( # type: ignore
43-
"CustomLocalKubernetesExecutor", (local_kubernetes_executor.LocalKubernetesExecutor,), {}
44-
)
4539
custom_executor_module.CustomKubernetesExecutor = type( # type: ignore
4640
"CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {}
4741
)

tests/cli/test_cli_parser.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -365,16 +365,12 @@ def test_executor_specific_commands_not_accessible(self, command):
365365
"executor,expected_args",
366366
[
367367
("CeleryExecutor", ["celery"]),
368-
("CeleryKubernetesExecutor", ["celery", "kubernetes"]),
369368
("KubernetesExecutor", ["kubernetes"]),
370369
("LocalExecutor", []),
371-
("LocalKubernetesExecutor", ["kubernetes"]),
372370
("SequentialExecutor", []),
373371
# custom executors are mapped to the regular ones in `conftest.py`
374372
("custom_executor.CustomLocalExecutor", []),
375-
("custom_executor.CustomLocalKubernetesExecutor", ["kubernetes"]),
376373
("custom_executor.CustomCeleryExecutor", ["celery"]),
377-
("custom_executor.CustomCeleryKubernetesExecutor", ["celery", "kubernetes"]),
378374
("custom_executor.CustomKubernetesExecutor", ["kubernetes"]),
379375
],
380376
)

tests/executors/test_executor_loader.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ def test_no_executor_configured(self):
4545
"executor_name",
4646
[
4747
"CeleryExecutor",
48-
"CeleryKubernetesExecutor",
4948
"DebugExecutor",
5049
"KubernetesExecutor",
5150
"LocalExecutor",
@@ -287,7 +286,6 @@ def test_get_hybrid_executors_from_config_core_executors_bad_config_format(self,
287286
("executor_config", "expected_value"),
288287
[
289288
("CeleryExecutor", "CeleryExecutor"),
290-
("CeleryKubernetesExecutor", "CeleryKubernetesExecutor"),
291289
("DebugExecutor", "DebugExecutor"),
292290
("KubernetesExecutor", "KubernetesExecutor"),
293291
("LocalExecutor", "LocalExecutor"),

tests/sensors/test_base.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,9 @@
3535
from airflow.executors.debug_executor import DebugExecutor
3636
from airflow.executors.executor_constants import (
3737
CELERY_EXECUTOR,
38-
CELERY_KUBERNETES_EXECUTOR,
3938
DEBUG_EXECUTOR,
4039
KUBERNETES_EXECUTOR,
4140
LOCAL_EXECUTOR,
42-
LOCAL_KUBERNETES_EXECUTOR,
4341
SEQUENTIAL_EXECUTOR,
4442
)
4543
from airflow.executors.local_executor import LocalExecutor
@@ -48,9 +46,7 @@
4846
from airflow.models.trigger import TriggerFailureReason
4947
from airflow.models.xcom import XCom
5048
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
51-
from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
5249
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
53-
from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor import LocalKubernetesExecutor
5450
from airflow.providers.standard.operators.empty import EmptyOperator
5551
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue, poke_mode_only
5652
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
@@ -1306,20 +1302,16 @@ def test_sensor_with_xcom_fails(self, make_sensor):
13061302
"executor_cls_mode",
13071303
[
13081304
(CeleryExecutor, "poke"),
1309-
(CeleryKubernetesExecutor, "poke"),
13101305
(DebugExecutor, "reschedule"),
13111306
(KubernetesExecutor, "poke"),
13121307
(LocalExecutor, "poke"),
1313-
(LocalKubernetesExecutor, "poke"),
13141308
(SequentialExecutor, "poke"),
13151309
],
13161310
ids=[
13171311
CELERY_EXECUTOR,
1318-
CELERY_KUBERNETES_EXECUTOR,
13191312
DEBUG_EXECUTOR,
13201313
KUBERNETES_EXECUTOR,
13211314
LOCAL_EXECUTOR,
1322-
LOCAL_KUBERNETES_EXECUTOR,
13231315
SEQUENTIAL_EXECUTOR,
13241316
],
13251317
)

0 commit comments

Comments
 (0)