Skip to content

Commit 4eb776a

Browse files
authored
Ensure Edge Plugin for API endpoint is only loaded on API-Server and AF2 Webserver (#52952)
* Ensure Edge Plugin for API endpoint is only loaded on API-Server and AF2 Webserver * Fix and extend pytests for edge3 plugin * Add a note to clean bad init code up
1 parent e311c6b commit 4eb776a

File tree

2 files changed

+39
-3
lines changed

2 files changed

+39
-3
lines changed

providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from __future__ import annotations
1919

20+
import sys
2021
from typing import TYPE_CHECKING, Any
2122

2223
from airflow.configuration import conf
@@ -213,12 +214,22 @@ def change_maintenance_comment(self, worker_name: str):
213214
except AirflowConfigException:
214215
EDGE_EXECUTOR_ACTIVE = False
215216

217+
# Load the API endpoint only on api-server (Airflow 3.x) or webserver (Airflow 2.x)
218+
# todo(jscheffl): Remove this check when the discussion in
219+
# https://lists.apache.org/thread/w170czq6r7bslkqp1tk6bjjjo0789wgl
220+
# resulted in a proper API to selective initialize. Maybe backcompat-shim
221+
# is also needed to support Airflow-versions prior the rework.
222+
if AIRFLOW_V_3_0_PLUS:
223+
RUNNING_ON_APISERVER = sys.argv[1] in ["api-server"] if len(sys.argv) > 1 else False
224+
else:
225+
RUNNING_ON_APISERVER = "gunicorn" in sys.argv[0] and "airflow-webserver" in sys.argv
226+
216227

217228
class EdgeExecutorPlugin(AirflowPlugin):
218229
"""EdgeExecutor Plugin - provides API endpoints for Edge Workers in Webserver."""
219230

220231
name = "edge_executor"
221-
if EDGE_EXECUTOR_ACTIVE:
232+
if EDGE_EXECUTOR_ACTIVE and RUNNING_ON_APISERVER:
222233
if AIRFLOW_V_3_0_PLUS:
223234
fastapi_apps = [_get_api_endpoint()]
224235
else:

providers/edge3/tests/unit/edge3/plugins/test_edge_executor_plugin.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from __future__ import annotations
1818

1919
import importlib
20+
from unittest.mock import patch
2021

2122
import pytest
2223
import time_machine
@@ -44,17 +45,20 @@ def test_plugin_inactive():
4445

4546

4647
@pytest.mark.db_test
47-
def test_plugin_active():
48-
with conf_vars({("edge", "api_enabled"): "true"}):
48+
def test_plugin_active_apiserver():
49+
mock_cli = ["airflow", "api-server"] if AIRFLOW_V_3_0_PLUS else ["gunicorn", "airflow-webserver"]
50+
with conf_vars({("edge", "api_enabled"): "true"}), patch("sys.argv", mock_cli):
4951
importlib.reload(edge_executor_plugin)
5052

5153
from airflow.providers.edge3.plugins.edge_executor_plugin import (
5254
EDGE_EXECUTOR_ACTIVE,
55+
RUNNING_ON_APISERVER,
5356
EdgeExecutorPlugin,
5457
)
5558

5659
rep = EdgeExecutorPlugin()
5760
assert EDGE_EXECUTOR_ACTIVE
61+
assert RUNNING_ON_APISERVER
5862
if AIRFLOW_V_3_0_PLUS:
5963
assert len(rep.appbuilder_views) == 0
6064
assert len(rep.flask_blueprints) == 0
@@ -64,6 +68,27 @@ def test_plugin_active():
6468
assert len(rep.flask_blueprints) == 2
6569

6670

71+
@patch("sys.argv", ["airflow", "some-other-command"])
72+
def test_plugin_active_non_apiserver():
73+
with conf_vars({("edge", "api_enabled"): "true"}):
74+
importlib.reload(edge_executor_plugin)
75+
76+
from airflow.providers.edge3.plugins.edge_executor_plugin import (
77+
EDGE_EXECUTOR_ACTIVE,
78+
RUNNING_ON_APISERVER,
79+
EdgeExecutorPlugin,
80+
)
81+
82+
rep = EdgeExecutorPlugin()
83+
assert EDGE_EXECUTOR_ACTIVE
84+
assert not RUNNING_ON_APISERVER
85+
assert len(rep.appbuilder_views) == 0
86+
assert len(rep.flask_blueprints) == 0
87+
assert len(rep.appbuilder_views) == 0
88+
if AIRFLOW_V_3_0_PLUS:
89+
assert len(rep.fastapi_apps) == 0
90+
91+
6792
@pytest.fixture
6893
def plugin():
6994
from airflow.providers.edge3.plugins.edge_executor_plugin import EdgeExecutorPlugin

0 commit comments

Comments
 (0)