Skip to content

Commit 33ed4d1

Browse files
jason810496Lee-W
authored andcommitted
AIP-84 | Add Auth for Dags (apache#47433)
* AIP-84 | Add Auth for Dag Refactor conftest for api_fastapi and test_dags Add unauthorized 403 test cases Remove PATCH in requires_access Fix unauthorized_test_client, requires_access_dag Add EditableDagsFilterDep, ReadableDagsFilterDep Add permitted_dag_filter for dags API Fix test_security Add OrmFilterClause Fix mypy error * fix(api_fastapi): rename methods argument to method * Fix kubernetes_tests * Fix api_fastapi/test_dags * Add dags_reserialize for k8s tests Refactor _get_jwt_token * Increase threshold of test_integration_run_dag_with_scheduler_failure * test: raise if we cannot get jwt_token not due to connection error * Fix _get_jwt_token after dynamic patching k8s configMap * Remove dags_reserialize setup in BaseK8STest * Fix test_docker_compose_quick_start * Ensure scheduler health in test_integration_run_dag_with_scheduler_failure * Increase timeout threshold * Add HTTP retry for _get_jwt_token * Add JWTRefreshAdapter and restart api-server if needed --------- Co-authored-by: Wei Lee <[email protected]>
1 parent f78194f commit 33ed4d1

File tree

12 files changed

+355
-69
lines changed

12 files changed

+355
-69
lines changed

airflow/api_fastapi/common/db/common.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
if TYPE_CHECKING:
3636
from sqlalchemy.sql import Select
3737

38-
from airflow.api_fastapi.common.parameters import BaseParam
38+
from airflow.api_fastapi.core_api.base import OrmClause
3939

4040

4141
def _get_session() -> Session:
@@ -47,7 +47,7 @@ def _get_session() -> Session:
4747

4848

4949
def apply_filters_to_select(
50-
*, statement: Select, filters: Sequence[BaseParam | None] | None = None
50+
*, statement: Select, filters: Sequence[OrmClause | None] | None = None
5151
) -> Select:
5252
if filters is None:
5353
return statement
@@ -71,10 +71,10 @@ async def _get_async_session() -> AsyncSession:
7171
async def paginated_select_async(
7272
*,
7373
statement: Select,
74-
filters: Sequence[BaseParam] | None = None,
75-
order_by: BaseParam | None = None,
76-
offset: BaseParam | None = None,
77-
limit: BaseParam | None = None,
74+
filters: Sequence[OrmClause] | None = None,
75+
order_by: OrmClause | None = None,
76+
offset: OrmClause | None = None,
77+
limit: OrmClause | None = None,
7878
session: AsyncSession,
7979
return_total_entries: Literal[True] = True,
8080
) -> tuple[Select, int]: ...
@@ -84,10 +84,10 @@ async def paginated_select_async(
8484
async def paginated_select_async(
8585
*,
8686
statement: Select,
87-
filters: Sequence[BaseParam] | None = None,
88-
order_by: BaseParam | None = None,
89-
offset: BaseParam | None = None,
90-
limit: BaseParam | None = None,
87+
filters: Sequence[OrmClause] | None = None,
88+
order_by: OrmClause | None = None,
89+
offset: OrmClause | None = None,
90+
limit: OrmClause | None = None,
9191
session: AsyncSession,
9292
return_total_entries: Literal[False],
9393
) -> tuple[Select, None]: ...
@@ -96,10 +96,10 @@ async def paginated_select_async(
9696
async def paginated_select_async(
9797
*,
9898
statement: Select,
99-
filters: Sequence[BaseParam | None] | None = None,
100-
order_by: BaseParam | None = None,
101-
offset: BaseParam | None = None,
102-
limit: BaseParam | None = None,
99+
filters: Sequence[OrmClause | None] | None = None,
100+
order_by: OrmClause | None = None,
101+
offset: OrmClause | None = None,
102+
limit: OrmClause | None = None,
103103
session: AsyncSession,
104104
return_total_entries: bool = True,
105105
) -> tuple[Select, int | None]:
@@ -129,10 +129,10 @@ async def paginated_select_async(
129129
def paginated_select(
130130
*,
131131
statement: Select,
132-
filters: Sequence[BaseParam] | None = None,
133-
order_by: BaseParam | None = None,
134-
offset: BaseParam | None = None,
135-
limit: BaseParam | None = None,
132+
filters: Sequence[OrmClause] | None = None,
133+
order_by: OrmClause | None = None,
134+
offset: OrmClause | None = None,
135+
limit: OrmClause | None = None,
136136
session: Session = NEW_SESSION,
137137
return_total_entries: Literal[True] = True,
138138
) -> tuple[Select, int]: ...
@@ -142,10 +142,10 @@ def paginated_select(
142142
def paginated_select(
143143
*,
144144
statement: Select,
145-
filters: Sequence[BaseParam] | None = None,
146-
order_by: BaseParam | None = None,
147-
offset: BaseParam | None = None,
148-
limit: BaseParam | None = None,
145+
filters: Sequence[OrmClause] | None = None,
146+
order_by: OrmClause | None = None,
147+
offset: OrmClause | None = None,
148+
limit: OrmClause | None = None,
149149
session: Session = NEW_SESSION,
150150
return_total_entries: Literal[False],
151151
) -> tuple[Select, None]: ...
@@ -155,10 +155,10 @@ def paginated_select(
155155
def paginated_select(
156156
*,
157157
statement: Select,
158-
filters: Sequence[BaseParam] | None = None,
159-
order_by: BaseParam | None = None,
160-
offset: BaseParam | None = None,
161-
limit: BaseParam | None = None,
158+
filters: Sequence[OrmClause] | None = None,
159+
order_by: OrmClause | None = None,
160+
offset: OrmClause | None = None,
161+
limit: OrmClause | None = None,
162162
session: Session = NEW_SESSION,
163163
return_total_entries: bool = True,
164164
) -> tuple[Select, int | None]:

airflow/api_fastapi/common/parameters.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from sqlalchemy import Column, and_, case, or_
4141
from sqlalchemy.inspection import inspect
4242

43+
from airflow.api_fastapi.core_api.base import OrmClause
4344
from airflow.models import Base
4445
from airflow.models.asset import (
4546
AssetAliasModel,
@@ -65,18 +66,14 @@
6566
T = TypeVar("T")
6667

6768

68-
class BaseParam(Generic[T], ABC):
69-
"""Base class for filters."""
69+
class BaseParam(OrmClause[T], ABC):
70+
"""Base class for path or query parameters with ORM transformation."""
7071

7172
def __init__(self, value: T | None = None, skip_none: bool = True) -> None:
72-
self.value = value
73+
super().__init__(value)
7374
self.attribute: ColumnElement | None = None
7475
self.skip_none = skip_none
7576

76-
@abstractmethod
77-
def to_orm(self, select: Select) -> Select:
78-
pass
79-
8077
def set_value(self, value: T | None) -> Self:
8178
self.value = value
8279
return self

airflow/api_fastapi/core_api/base.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,16 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
from abc import ABC, abstractmethod
20+
from typing import TYPE_CHECKING, Generic, TypeVar
21+
1922
from pydantic import BaseModel as PydanticBaseModel, ConfigDict
2023

24+
if TYPE_CHECKING:
25+
from sqlalchemy.sql import Select
26+
27+
T = TypeVar("T")
28+
2129

2230
class BaseModel(PydanticBaseModel):
2331
"""
@@ -39,3 +47,18 @@ class StrictBaseModel(BaseModel):
3947
"""
4048

4149
model_config = ConfigDict(from_attributes=True, populate_by_name=True, extra="forbid")
50+
51+
52+
class OrmClause(Generic[T], ABC):
53+
"""
54+
Base class for filtering clauses with paginated_select.
55+
56+
The subclasses should implement the `to_orm` method and set the `value` attribute.
57+
"""
58+
59+
def __init__(self, value: T | None = None):
60+
self.value = value
61+
62+
@abstractmethod
63+
def to_orm(self, select: Select) -> Select:
64+
pass

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3058,6 +3058,8 @@ paths:
30583058
summary: Get Dags
30593059
description: Get all DAGs.
30603060
operationId: get_dags
3061+
security:
3062+
- OAuth2PasswordBearer: []
30613063
parameters:
30623064
- name: limit
30633065
in: query
@@ -3223,6 +3225,8 @@ paths:
32233225
summary: Patch Dags
32243226
description: Patch multiple DAGs.
32253227
operationId: patch_dags
3228+
security:
3229+
- OAuth2PasswordBearer: []
32263230
parameters:
32273231
- name: update_mask
32283232
in: query
@@ -3358,6 +3362,8 @@ paths:
33583362
summary: Get Dag
33593363
description: Get basic information about a DAG.
33603364
operationId: get_dag
3365+
security:
3366+
- OAuth2PasswordBearer: []
33613367
parameters:
33623368
- name: dag_id
33633369
in: path
@@ -3408,6 +3414,8 @@ paths:
34083414
summary: Patch Dag
34093415
description: Patch the specific DAG.
34103416
operationId: patch_dag
3417+
security:
3418+
- OAuth2PasswordBearer: []
34113419
parameters:
34123420
- name: dag_id
34133421
in: path
@@ -3474,6 +3482,8 @@ paths:
34743482
summary: Delete Dag
34753483
description: Delete the specific DAG.
34763484
operationId: delete_dag
3485+
security:
3486+
- OAuth2PasswordBearer: []
34773487
parameters:
34783488
- name: dag_id
34793489
in: path
@@ -3524,6 +3534,8 @@ paths:
35243534
summary: Get Dag Details
35253535
description: Get details of DAG.
35263536
operationId: get_dag_details
3537+
security:
3538+
- OAuth2PasswordBearer: []
35273539
parameters:
35283540
- name: dag_id
35293541
in: path

airflow/api_fastapi/core_api/routes/public/dags.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@
5757
DAGResponse,
5858
)
5959
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
60+
from airflow.api_fastapi.core_api.security import (
61+
EditableDagsFilterDep,
62+
ReadableDagsFilterDep,
63+
requires_access_dag,
64+
)
6065
from airflow.api_fastapi.logging.decorators import action_logging
6166
from airflow.exceptions import AirflowException, DagNotFound
6267
from airflow.models import DAG, DagModel
@@ -65,7 +70,7 @@
6570
dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")
6671

6772

68-
@dags_router.get("")
73+
@dags_router.get("", dependencies=[Depends(requires_access_dag(method="GET"))])
6974
def get_dags(
7075
limit: QueryLimit,
7176
offset: QueryOffset,
@@ -105,6 +110,7 @@ def get_dags(
105110
).dynamic_depends()
106111
),
107112
],
113+
readable_dags_filter: ReadableDagsFilterDep,
108114
session: SessionDep,
109115
) -> DAGCollectionResponse:
110116
"""Get all DAGs."""
@@ -132,6 +138,7 @@ def get_dags(
132138
tags,
133139
owners,
134140
last_dag_run_state,
141+
readable_dags_filter,
135142
],
136143
order_by=order_by,
137144
offset=offset,
@@ -156,6 +163,7 @@ def get_dags(
156163
status.HTTP_422_UNPROCESSABLE_ENTITY,
157164
]
158165
),
166+
dependencies=[Depends(requires_access_dag(method="GET"))],
159167
)
160168
def get_dag(dag_id: str, session: SessionDep, request: Request) -> DAGResponse:
161169
"""Get basic information about a DAG."""
@@ -182,6 +190,7 @@ def get_dag(dag_id: str, session: SessionDep, request: Request) -> DAGResponse:
182190
status.HTTP_404_NOT_FOUND,
183191
]
184192
),
193+
dependencies=[Depends(requires_access_dag(method="GET"))],
185194
)
186195
def get_dag_details(dag_id: str, session: SessionDep, request: Request) -> DAGDetailsResponse:
187196
"""Get details of DAG."""
@@ -208,7 +217,7 @@ def get_dag_details(dag_id: str, session: SessionDep, request: Request) -> DAGDe
208217
status.HTTP_404_NOT_FOUND,
209218
]
210219
),
211-
dependencies=[Depends(action_logging())],
220+
dependencies=[Depends(requires_access_dag(method="PUT")), Depends(action_logging())],
212221
)
213222
def patch_dag(
214223
dag_id: str,
@@ -251,7 +260,7 @@ def patch_dag(
251260
status.HTTP_404_NOT_FOUND,
252261
]
253262
),
254-
dependencies=[Depends(action_logging())],
263+
dependencies=[Depends(requires_access_dag(method="PUT")), Depends(action_logging())],
255264
)
256265
def patch_dags(
257266
patch_body: DAGPatchBody,
@@ -263,6 +272,7 @@ def patch_dags(
263272
only_active: QueryOnlyActiveFilter,
264273
paused: QueryPausedFilter,
265274
last_dag_run_state: QueryLastDagRunStateFilter,
275+
editable_dags_filter: EditableDagsFilterDep,
266276
session: SessionDep,
267277
update_mask: list[str] | None = Query(None),
268278
) -> DAGCollectionResponse:
@@ -283,7 +293,7 @@ def patch_dags(
283293

284294
dags_select, total_entries = paginated_select(
285295
statement=generate_dag_with_latest_run_query(),
286-
filters=[only_active, paused, dag_id_pattern, tags, owners, last_dag_run_state],
296+
filters=[only_active, paused, dag_id_pattern, tags, owners, last_dag_run_state, editable_dags_filter],
287297
order_by=None,
288298
offset=offset,
289299
limit=limit,
@@ -313,7 +323,7 @@ def patch_dags(
313323
status.HTTP_422_UNPROCESSABLE_ENTITY,
314324
]
315325
),
316-
dependencies=[Depends(action_logging())],
326+
dependencies=[Depends(requires_access_dag(method="DELETE")), Depends(action_logging())],
317327
)
318328
def delete_dag(
319329
dag_id: str,

0 commit comments

Comments
 (0)