55from airflow .providers .fab .auth_manager .models import User
66from airflow .providers .fab .auth_manager .fab_auth_manager import FabAuthManager
77
8+ from airflow .api_fastapi .auth .managers .base_auth_manager import ResourceMethod
9+ from airflow .configuration import conf
10+ from airflow .api_fastapi .auth .managers .models .resource_details import (
11+ from airflow .providers .fab .auth_manager .models import User
12+ from airflow .providers .fab .auth_manager .fab_auth_manager import FabAuthManager
13+
814from airflow .api_fastapi .auth .managers .base_auth_manager import ResourceMethod
915from airflow .configuration import conf
1016from airflow .api_fastapi .auth .managers .models .resource_details import (
1117 AccessView ,
1218 AssetDetails ,
1319 AssetAliasDetails ,
1420 BackfillDetails ,
21+ AssetDetails ,
22+ AssetAliasDetails ,
23+ BackfillDetails ,
1524 ConfigurationDetails ,
1625 ConnectionDetails ,
1726 DagAccessEntity ,
@@ -74,22 +83,33 @@ class OpaFabAuthManager(FabAuthManager, LoggingMixin):
7483 AUTH_OPA_REQUEST_URL_DEFAULT = "http://opa:8081/v1/data/airflow"
7584 AUTH_OPA_REQUEST_TIMEOUT_DEFAULT = 10
7685
86+ @override
87+ def init_flask_resources (self ) -> None :
7788 @override
7889 def init_flask_resources (self ) -> None :
7990 """
8091 Run operations when Airflow is initializing.
8192 """
8293
94+ super ().init_flask_resources ()
8395 super ().init_flask_resources ()
8496
8597 Stats .incr (METRIC_NAME_OPA_CACHE_LIMIT_REACHED , count = 0 )
8698
8799 self .opa_cache = Cache (
100+ maxsize = conf .getint (
101+ "core" ,
102+ "AUTH_OPA_CACHE_MAXSIZE" ,
103+ fallback = self .AUTH_OPA_CACHE_MAXSIZE_DEFAULT ,
88104 maxsize = conf .getint (
89105 "core" ,
90106 "AUTH_OPA_CACHE_MAXSIZE" ,
91107 fallback = self .AUTH_OPA_CACHE_MAXSIZE_DEFAULT ,
92108 ),
109+ ttl = conf .getint (
110+ "core" ,
111+ "AUTH_OPA_CACHE_TTL_IN_SEC" ,
112+ fallback = self .AUTH_OPA_CACHE_TTL_IN_SEC_DEFAULT ,
93113 ttl = conf .getint (
94114 "core" ,
95115 "AUTH_OPA_CACHE_TTL_IN_SEC" ,
@@ -121,13 +141,20 @@ def _is_authorized_in_opa(self, endpoint: str, input: OpaInput) -> bool:
121141
122142 self .log .debug ("Forward authorization request to OPA" )
123143
144+ opa_url = conf .get (
145+ "core" , "AUTH_OPA_REQUEST_URL" , fallback = self .AUTH_OPA_REQUEST_URL_DEFAULT
146+ )
124147 opa_url = conf .get (
125148 "core" , "AUTH_OPA_REQUEST_URL" , fallback = self .AUTH_OPA_REQUEST_URL_DEFAULT
126149 )
127150 try :
128151 response = self .call_opa (
129152 f"{ opa_url } /{ endpoint } " ,
130153 json = input .to_dict (),
154+ timeout = conf .getint (
155+ "core" ,
156+ "AUTH_OPA_REQUEST_TIMEOUT" ,
157+ fallback = self .AUTH_OPA_REQUEST_TIMEOUT_DEFAULT ,
131158 timeout = conf .getint (
132159 "core" ,
133160 "AUTH_OPA_REQUEST_TIMEOUT" ,
@@ -146,6 +173,7 @@ def is_authorized_configuration(
146173 method : ResourceMethod ,
147174 details : Optional [ConfigurationDetails ] = None ,
148175 user : User ,
176+ user : User ,
149177 ) - > bool :
150178 """
151179 Return whether the user is authorized to perform a given action on
@@ -189,6 +217,7 @@ def is_authorized_connection(
189217 method : ResourceMethod ,
190218 details : Optional [ConnectionDetails ] = None ,
191219 user : User ,
220+ user : User ,
192221 ) -> bool :
193222 """
194223 Return whether the user is authorized to perform a given action on a connection.
@@ -232,6 +261,7 @@ def is_authorized_dag(
232261 access_entity : Optional [DagAccessEntity ] = None ,
233262 details : Optional [DagDetails ] = None ,
234263 user : User ,
264+ user : User ,
235265 ) -> bool :
236266 """
237267 Return whether the user is authorized to perform a given action on a DAG.
@@ -283,8 +313,34 @@ def is_authorized_backfill(
283313 details : Optional [BackfillDetails ] = None ,
284314 user : User ,
285315 ) -> bool :
286- raise NotImplementedError (
287- "Backfill authorization is not implemented in OPA auth manager. "
316+ """
317+ Return whether the user is authorized to perform a given action on a backfill.
318+
319+ :param method: the method to perform
320+ :param details: optional details about the backfill
321+ :param user: the user to perform the action on. If not provided (or None), it uses the
322+ current user
323+ """
324+ self .log .debug ("Check is_authorized_backfill" )
325+
326+ backfill_id = details .id if details else None
327+
328+ return self ._is_authorized_in_opa (
329+ "is_authorized_backfill" ,
330+ OpaInput (
331+ {
332+ "input" : {
333+ "method" : method ,
334+ "details" : {
335+ "id" : backfill_id ,
336+ },
337+ "user" : {
338+ "id" : user .get_id (),
339+ "name" : user .get_name (),
340+ },
341+ }
342+ }
343+ ),
288344 )
289345
290346 @override
@@ -295,8 +351,34 @@ def is_authorized_asset(
295351 user : User ,
296352 details : Optional [AssetDetails ] = None ,
297353 ) -> bool :
298- raise NotImplementedError (
299- "Asset authorization is not implemented in OPA auth manager. "
354+ """
355+ Return whether the user is authorized to perform a given action on an asset.
356+
357+ :param method: the method to perform
358+ :param details: optional details about the asset
359+ :param user: the user to perform the action on. If not provided (or None), it uses the
360+ current user
361+ """
362+ self .log .debug ("Check is_authorized_asset" )
363+
364+ asset_id = details .id if details else None
365+
366+ return self ._is_authorized_in_opa (
367+ "is_authorized_asset" ,
368+ OpaInput (
369+ {
370+ "input" : {
371+ "method" : method ,
372+ "details" : {
373+ "id" : asset_id ,
374+ },
375+ "user" : {
376+ "id" : user .get_id (),
377+ "name" : user .get_name (),
378+ },
379+ }
380+ }
381+ ),
300382 )
301383
302384 @override
@@ -307,8 +389,34 @@ def is_authorized_asset_alias(
307389 user : User ,
308390 details : Optional [AssetAliasDetails ] = None ,
309391 ) -> bool :
310- raise NotImplementedError (
311- "Asset alias authorization is not implemented in OPA auth manager. "
392+ """
393+ Return whether the user is authorized to perform a given action on an asset alias.
394+
395+ :param method: the method to perform
396+ :param details: optional details about the asset alias
397+ :param user: the user to perform the action on. If not provided (or None), it uses the
398+ current user
399+ """
400+ self .log .debug ("Check is_authorized_asset_alias" )
401+
402+ alias_id = details .id if details else None
403+
404+ return self ._is_authorized_in_opa (
405+ "is_authorized_asset_alias" ,
406+ OpaInput (
407+ {
408+ "input" : {
409+ "method" : method ,
410+ "details" : {
411+ "id" : alias_id ,
412+ },
413+ "user" : {
414+ "id" : user .get_id (),
415+ "name" : user .get_name (),
416+ },
417+ }
418+ }
419+ ),
312420 )
313421
314422 @override
@@ -318,6 +426,7 @@ def is_authorized_pool(
318426 method : ResourceMethod ,
319427 details : Optional [PoolDetails ] = None ,
320428 user : User ,
429+ user : User ,
321430 ) -> bool :
322431 """
323432 Return whether the user is authorized to perform a given action on a pool.
@@ -360,6 +469,7 @@ def is_authorized_variable(
360469 method : ResourceMethod ,
361470 details : Optional [VariableDetails ] = None ,
362471 user : User ,
472+ user : User ,
363473 ) -> bool :
364474 """
365475 Return whether the user is authorized to perform a given action on a variable.
@@ -401,6 +511,7 @@ def is_authorized_view(
401511 * ,
402512 access_view : AccessView ,
403513 user : User ,
514+ user : User ,
404515 ) -> bool :
405516 """
406517 Return whether the user is authorized to access a read-only state of the installation.
@@ -434,6 +545,7 @@ def is_authorized_custom_view(
434545 method : Union [ResourceMethod , str ],
435546 resource_name : str ,
436547 user : User ,
548+ user : User ,
437549 ) -> bool :
438550 """
439551 Return whether the user is authorized to perform a given action on a custom view.
0 commit comments