22Custom Auth manager for Airflow
33"""
44
5- from airflow .auth .managers .base_auth_manager import ResourceMethod
6- from airflow .auth .managers .models .base_user import BaseUser
7- from airflow .auth .managers .models .resource_details import (
5+ from airflow .providers .fab .auth_manager .models import User
6+ from airflow .providers .fab .auth_manager .fab_auth_manager import FabAuthManager
7+
8+ from airflow .api_fastapi .auth .managers .base_auth_manager import ResourceMethod
9+ from airflow .configuration import conf
10+ from airflow .api_fastapi .auth .managers .models .resource_details import (
811 AccessView ,
12+ AssetDetails ,
13+ AssetAliasDetails ,
14+ BackfillDetails ,
915 ConfigurationDetails ,
1016 ConnectionDetails ,
1117 DagAccessEntity ,
1218 DagDetails ,
13- DatasetDetails ,
1419 PoolDetails ,
1520 VariableDetails ,
1621)
17- from airflow .providers .fab .auth_manager .fab_auth_manager import FabAuthManager
1822from airflow .stats import Stats
1923from airflow .utils .log .logging_mixin import LoggingMixin
2024from cachetools import TTLCache , cachedmethod
@@ -70,22 +74,26 @@ class OpaFabAuthManager(FabAuthManager, LoggingMixin):
7074 AUTH_OPA_REQUEST_URL_DEFAULT = "http://opa:8081/v1/data/airflow"
7175 AUTH_OPA_REQUEST_TIMEOUT_DEFAULT = 10
7276
73- def init (self ) -> None :
77+ @override
78+ def init_flask_resources (self ) -> None :
7479 """
7580 Run operations when Airflow is initializing.
7681 """
7782
78- super ().init ()
83+ super ().init_flask_resources ()
7984
8085 Stats .incr (METRIC_NAME_OPA_CACHE_LIMIT_REACHED , count = 0 )
8186
82- config = self .appbuilder .get_app .config
8387 self .opa_cache = Cache (
84- maxsize = config .get (
85- "AUTH_OPA_CACHE_MAXSIZE" , self .AUTH_OPA_CACHE_MAXSIZE_DEFAULT
88+ maxsize = conf .getint (
89+ "core" ,
90+ "AUTH_OPA_CACHE_MAXSIZE" ,
91+ fallback = self .AUTH_OPA_CACHE_MAXSIZE_DEFAULT ,
8692 ),
87- ttl = config .get (
88- "AUTH_OPA_CACHE_TTL_IN_SEC" , self .AUTH_OPA_CACHE_TTL_IN_SEC_DEFAULT
93+ ttl = conf .getint (
94+ "core" ,
95+ "AUTH_OPA_CACHE_TTL_IN_SEC" ,
96+ fallback = self .AUTH_OPA_CACHE_TTL_IN_SEC_DEFAULT ,
8997 ),
9098 )
9199 self .opa_session = requests .Session ()
@@ -113,14 +121,17 @@ def _is_authorized_in_opa(self, endpoint: str, input: OpaInput) -> bool:
113121
114122 self .log .debug ("Forward authorization request to OPA" )
115123
116- config = self .appbuilder .get_app .config
117- opa_url = config .get ("AUTH_OPA_REQUEST_URL" , self .AUTH_OPA_REQUEST_URL_DEFAULT )
124+ opa_url = conf .get (
125+ "core" , "AUTH_OPA_REQUEST_URL" , fallback = self .AUTH_OPA_REQUEST_URL_DEFAULT
126+ )
118127 try :
119128 response = self .call_opa (
120129 f"{ opa_url } /{ endpoint } " ,
121130 json = input .to_dict (),
122- timeout = config .get (
123- "AUTH_OPA_REQUEST_TIMEOUT" , self .AUTH_OPA_REQUEST_TIMEOUT_DEFAULT
131+ timeout = conf .getint (
132+ "core" ,
133+ "AUTH_OPA_REQUEST_TIMEOUT" ,
134+ fallback = self .AUTH_OPA_REQUEST_TIMEOUT_DEFAULT ,
124135 ),
125136 )
126137 return response .json ().get ("result" )
@@ -134,7 +145,7 @@ def is_authorized_configuration(
134145 * ,
135146 method : ResourceMethod ,
136147 details : Optional [ConfigurationDetails ] = None ,
137- user : Optional [ BaseUser ] = None ,
148+ user : User ,
138149 ) -> bool :
139150 """
140151 Return whether the user is authorized to perform a given action on
@@ -148,9 +159,6 @@ def is_authorized_configuration(
148159
149160 self .log .debug ("Check is_authorized_configuration" )
150161
151- if not user :
152- user = self .get_user ()
153-
154162 if not details :
155163 section = None
156164 else :
@@ -180,7 +188,7 @@ def is_authorized_connection(
180188 * ,
181189 method : ResourceMethod ,
182190 details : Optional [ConnectionDetails ] = None ,
183- user : Optional [ BaseUser ] = None ,
191+ user : User ,
184192 ) -> bool :
185193 """
186194 Return whether the user is authorized to perform a given action on a connection.
@@ -193,9 +201,6 @@ def is_authorized_connection(
193201
194202 self .log .debug ("Check is_authorized_connection" )
195203
196- if not user :
197- user = self .get_user ()
198-
199204 if not details :
200205 conn_id = None
201206 else :
@@ -226,7 +231,7 @@ def is_authorized_dag(
226231 method : ResourceMethod ,
227232 access_entity : Optional [DagAccessEntity ] = None ,
228233 details : Optional [DagDetails ] = None ,
229- user : Optional [ BaseUser ] = None ,
234+ user : User ,
230235 ) -> bool :
231236 """
232237 Return whether the user is authorized to perform a given action on a DAG.
@@ -241,9 +246,6 @@ def is_authorized_dag(
241246
242247 self .log .debug ("Check is_authorized_dag" )
243248
244- if not user :
245- user = self .get_user ()
246-
247249 if not access_entity :
248250 entity = None
249251 else :
@@ -274,48 +276,39 @@ def is_authorized_dag(
274276 )
275277
276278 @override
277- def is_authorized_dataset (
279+ def is_authorized_backfill (
278280 self ,
279281 * ,
280282 method : ResourceMethod ,
281- details : Optional [DatasetDetails ] = None ,
282- user : Optional [ BaseUser ] = None ,
283+ details : Optional [BackfillDetails ] = None ,
284+ user : User ,
283285 ) -> bool :
284- """
285- Return whether the user is authorized to perform a given action on a dataset.
286-
287- :param method: the method to perform
288- :param details: optional details about the dataset
289- :param user: the user to perform the action on. If not provided (or None), it uses the
290- current user
291- """
292-
293- self .log .debug ("Check is_authorized_dataset" )
294-
295- if not user :
296- user = self .get_user ()
286+ raise NotImplementedError (
287+ "Backfill authorization is not implemented in OPA auth manager. "
288+ )
297289
298- if not details :
299- uri = None
300- else :
301- uri = details .uri
290+ @override
291+ def is_authorized_asset (
292+ self ,
293+ * ,
294+ method : ResourceMethod ,
295+ user : User ,
296+ details : Optional [AssetDetails ] = None ,
297+ ) -> bool :
298+ raise NotImplementedError (
299+ "Asset authorization is not implemented in OPA auth manager. "
300+ )
302301
303- return self ._is_authorized_in_opa (
304- "is_authorized_dataset" ,
305- OpaInput (
306- {
307- "input" : {
308- "method" : method ,
309- "details" : {
310- "uri" : uri ,
311- },
312- "user" : {
313- "id" : user .get_id (),
314- "name" : user .get_name (),
315- },
316- }
317- }
318- ),
302+ @override
303+ def is_authorized_asset_alias (
304+ self ,
305+ * ,
306+ method : ResourceMethod ,
307+ user : User ,
308+ details : Optional [AssetAliasDetails ] = None ,
309+ ) -> bool :
310+ raise NotImplementedError (
311+ "Asset alias authorization is not implemented in OPA auth manager. "
319312 )
320313
321314 @override
@@ -324,7 +317,7 @@ def is_authorized_pool(
324317 * ,
325318 method : ResourceMethod ,
326319 details : Optional [PoolDetails ] = None ,
327- user : Optional [ BaseUser ] = None ,
320+ user : User ,
328321 ) -> bool :
329322 """
330323 Return whether the user is authorized to perform a given action on a pool.
@@ -337,9 +330,6 @@ def is_authorized_pool(
337330
338331 self .log .debug ("Check is_authorized_pool" )
339332
340- if not user :
341- user = self .get_user ()
342-
343333 if not details :
344334 name = None
345335 else :
@@ -369,7 +359,7 @@ def is_authorized_variable(
369359 * ,
370360 method : ResourceMethod ,
371361 details : Optional [VariableDetails ] = None ,
372- user : Optional [ BaseUser ] = None ,
362+ user : User ,
373363 ) -> bool :
374364 """
375365 Return whether the user is authorized to perform a given action on a variable.
@@ -382,9 +372,6 @@ def is_authorized_variable(
382372
383373 self .log .debug ("Check is_authorized_variable" )
384374
385- if not user :
386- user = self .get_user ()
387-
388375 if not details :
389376 key = None
390377 else :
@@ -413,7 +400,7 @@ def is_authorized_view(
413400 self ,
414401 * ,
415402 access_view : AccessView ,
416- user : Optional [ BaseUser ] = None ,
403+ user : User ,
417404 ) -> bool :
418405 """
419406 Return whether the user is authorized to access a read-only state of the installation.
@@ -425,9 +412,6 @@ def is_authorized_view(
425412
426413 self .log .debug ("Check is_authorized_view" )
427414
428- if not user :
429- user = self .get_user ()
430-
431415 return self ._is_authorized_in_opa (
432416 "is_authorized_view" ,
433417 OpaInput (
@@ -449,7 +433,7 @@ def is_authorized_custom_view(
449433 * ,
450434 method : Union [ResourceMethod , str ],
451435 resource_name : str ,
452- user : Optional [ BaseUser ] = None ,
436+ user : User ,
453437 ) -> bool :
454438 """
455439 Return whether the user is authorized to perform a given action on a custom view.
0 commit comments