1515 VariableDetails ,
1616)
1717from airflow .providers .fab .auth_manager .fab_auth_manager import FabAuthManager
18+ from airflow .stats import Stats
1819from airflow .utils .log .logging_mixin import LoggingMixin
1920from cachetools import TTLCache , cachedmethod
2021from typing import override
2122import json
2223import requests
2324
25+ METRIC_NAME_OPA_CACHE_LIMIT_REACHED = "opa_cache_limit_reached"
26+
2427class OpaInput :
2528 """
2629 Wrapper for the OPA input structure which is hashable so that it can be cached
@@ -39,6 +42,20 @@ def __hash__(self) -> int:
3942 def to_dict (self ) -> dict :
4043 return self .input
4144
45+ class Cache (TTLCache ):
46+ """
47+ LRU Cache implementation with per-item time-to-live (TTL) value.
48+ """
49+
50+ @override
51+ def popitem (self ):
52+ """
53+ Remove the least recently used item that has not already expired.
54+ """
55+
56+ Stats .incr (METRIC_NAME_OPA_CACHE_LIMIT_REACHED )
57+ return super ().popitem ()
58+
4259class OpaFabAuthManager (FabAuthManager , LoggingMixin ):
4360 """
4461 Auth manager based on the FabAuthManager which delegates the authorization to an Open Policy
@@ -57,8 +74,10 @@ def init(self) -> None:
5774
5875 super ().init ()
5976
77+ Stats .incr (METRIC_NAME_OPA_CACHE_LIMIT_REACHED , count = 0 )
78+
6079 config = self .appbuilder .get_app .config
61- self .opa_cache = TTLCache (
80+ self .opa_cache = Cache (
6281 maxsize = config .get (
6382 'AUTH_OPA_CACHE_MAXSIZE' ,
6483 self .AUTH_OPA_CACHE_MAXSIZE_DEFAULT
0 commit comments