Skip to content

Commit 69fdd3d

Browse files
feat(airflow): Add cache and configuration options for OPA and implement all is_authorized functions
1 parent a6aecd1 commit 69fdd3d

File tree

5 files changed

+1513
-955
lines changed

5 files changed

+1513
-955
lines changed

airflow/Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ FROM oci.stackable.tech/sdp/git-sync:${GIT_SYNC} AS gitsync-image
99

1010
FROM stackable/image/statsd_exporter AS statsd_exporter-builder
1111

12-
FROM python:3.12-alpine AS opa-auth-manager-builder
12+
FROM python:3.12-bookworm AS opa-auth-manager-builder
1313

1414
COPY airflow/opa-auth-manager/ /tmp/opa-auth-manager
1515

@@ -18,6 +18,8 @@ WORKDIR /tmp/opa-auth-manager
1818
RUN <<EOF
1919
pip install poetry
2020
poetry build
21+
poetry install
22+
poetry run pytest
2123
EOF
2224

2325
FROM stackable/image/vector AS airflow-build-image

airflow/opa-auth-manager/opa_auth_manager/opa_fab_auth_manager.py

Lines changed: 237 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Custom Auth manager for Airflow
33
"""
44

5+
from typing import cast, override
56
from airflow.auth.managers.base_auth_manager import ResourceMethod
67
from airflow.auth.managers.models.base_user import BaseUser
78
from airflow.auth.managers.models.resource_details import (
@@ -16,14 +17,70 @@
1617
)
1718
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
1819
from airflow.utils.log.logging_mixin import LoggingMixin
20+
from cachetools import TTLCache, cachedmethod
21+
import json
1922
import requests
2023

24+
class OpaInput:
25+
26+
def __init__(self, input: dict) -> None:
27+
self.input = input
28+
29+
def __eq__(self, other: object) -> bool:
30+
return isinstance(other, OpaInput) \
31+
and self.input == other.input
32+
33+
def __hash__(self) -> int:
34+
return hash(json.dumps(self.input, sort_keys=True))
35+
36+
def to_dict(self) -> dict:
37+
return self.input
38+
2139
class OpaFabAuthManager(FabAuthManager, LoggingMixin):
2240
"""
2341
Auth manager based on the FabAuthManager which delegates the authorization to an Open Policy
2442
Agent
2543
"""
2644

45+
def init(self) -> None:
46+
"""Run operations when Airflow is initializing."""
47+
super().init()
48+
self._init_config()
49+
50+
config = self.appbuilder.get_app.config
51+
self.opa_cache = TTLCache(
52+
maxsize=config.get("AUTH_OPA_CACHE_MAXSIZE"),
53+
ttl=config.get("AUTH_OPA_CACHE_TTL_IN_SEC"),
54+
)
55+
self.opa_session = requests.Session()
56+
57+
def _init_config(self):
58+
config = self.appbuilder.get_app.config
59+
config.setdefault('AUTH_OPA_CACHE_MAXSIZE', 1000)
60+
config.setdefault("AUTH_OPA_CACHE_TTL_IN_SEC", 30)
61+
config.setdefault("AUTH_OPA_REQUEST_URL", "http://opa:8081/v1/data/airflow")
62+
config.setdefault("AUTH_OPA_REQUEST_TIMEOUT", 10)
63+
64+
def call_opa(self, url: str, json: dict, timeout: int) -> requests.Response | None:
65+
self.opa_session.post(url=url, json=json, timeout=timeout)
66+
67+
@cachedmethod(lambda self: self.opa_cache)
68+
def _is_authorized_in_opa(self, endpoint: str, input: OpaInput) -> bool:
69+
config = self.appbuilder.get_app.config
70+
opa_url = config.get("AUTH_OPA_REQUEST_URL")
71+
try:
72+
response = cast(requests.Response, self.call_opa(
73+
f'{opa_url}/{endpoint}',
74+
json=input.to_dict(),
75+
timeout=config.get("AUTH_OPA_REQUEST_TIMEOUT")
76+
))
77+
result = response.json().get("result")
78+
return result == True
79+
except Exception as e:
80+
self.log.error(f"Request to OPA failed: {e}")
81+
return False
82+
83+
@override
2784
def is_authorized_configuration(
2885
self,
2986
*,
@@ -46,22 +103,28 @@ def is_authorized_configuration(
46103
if not user:
47104
user = self.get_user()
48105

49-
input= {
50-
'method': method,
51-
'details': details,
52-
'user': {
53-
'id': user.get_id(),
54-
'name': user.get_name(),
55-
},
56-
}
57-
response = requests.post(
58-
'http://opa:8081/v1/data/airflow/is_authorized_configuration',
59-
json=input,
60-
timeout=10
61-
).json()
62-
63-
return response.get("result") == "True"
64-
106+
if not details:
107+
section = None
108+
else:
109+
section = details.section
110+
111+
return self._is_authorized_in_opa(
112+
'is_authorized_configuration',
113+
OpaInput({
114+
'input': {
115+
'method': method,
116+
'details': {
117+
'section': section,
118+
},
119+
'user': {
120+
'id': user.get_id(),
121+
'name': user.get_name(),
122+
},
123+
}
124+
})
125+
)
126+
127+
@override
65128
def is_authorized_connection(
66129
self,
67130
*,
@@ -80,8 +143,31 @@ def is_authorized_connection(
80143

81144
self.log.info("Forward is_authorized_connection to OPA")
82145

83-
return True
146+
if not user:
147+
user = self.get_user()
84148

149+
if not details:
150+
conn_id = None
151+
else:
152+
conn_id = details.conn_id
153+
154+
return self._is_authorized_in_opa(
155+
'is_authorized_connection',
156+
OpaInput({
157+
'input': {
158+
'method': method,
159+
'details': {
160+
'conn_id': conn_id,
161+
},
162+
'user': {
163+
'id': user.get_id(),
164+
'name': user.get_name(),
165+
},
166+
}
167+
})
168+
)
169+
170+
@override
85171
def is_authorized_dag(
86172
self,
87173
*,
@@ -103,8 +189,37 @@ def is_authorized_dag(
103189

104190
self.log.info("Forward is_authorized_dag to OPA")
105191

106-
return True
192+
if not user:
193+
user = self.get_user()
107194

195+
if not access_entity:
196+
entity = None
197+
else:
198+
entity = access_entity.name
199+
200+
if not details:
201+
dag_id = None
202+
else:
203+
dag_id = details.id
204+
205+
return self._is_authorized_in_opa(
206+
'is_authorized_dag',
207+
OpaInput({
208+
'input': {
209+
'method': method,
210+
'access_entity': entity,
211+
'details': {
212+
'id': dag_id,
213+
},
214+
'user': {
215+
'id': user.get_id(),
216+
'name': user.get_name(),
217+
},
218+
}
219+
})
220+
)
221+
222+
@override
108223
def is_authorized_dataset(
109224
self,
110225
*,
@@ -123,8 +238,31 @@ def is_authorized_dataset(
123238

124239
self.log.info("Forward is_authorized_dataset to OPA")
125240

126-
return True
241+
if not user:
242+
user = self.get_user()
127243

244+
if not details:
245+
uri = None
246+
else:
247+
uri = details.uri
248+
249+
return self._is_authorized_in_opa(
250+
'is_authorized_dataset',
251+
OpaInput({
252+
'input': {
253+
'method': method,
254+
'details': {
255+
'uri': uri,
256+
},
257+
'user': {
258+
'id': user.get_id(),
259+
'name': user.get_name(),
260+
},
261+
}
262+
})
263+
)
264+
265+
@override
128266
def is_authorized_pool(
129267
self,
130268
*,
@@ -143,8 +281,31 @@ def is_authorized_pool(
143281

144282
self.log.info("Forward is_authorized_pool to OPA")
145283

146-
return True
284+
if not user:
285+
user = self.get_user()
147286

287+
if not details:
288+
name = None
289+
else:
290+
name = details.name
291+
292+
return self._is_authorized_in_opa(
293+
'is_authorized_pool',
294+
OpaInput({
295+
'input': {
296+
'method': method,
297+
'details': {
298+
'name': name,
299+
},
300+
'user': {
301+
'id': user.get_id(),
302+
'name': user.get_name(),
303+
},
304+
}
305+
})
306+
)
307+
308+
@override
148309
def is_authorized_variable(
149310
self,
150311
*,
@@ -163,8 +324,31 @@ def is_authorized_variable(
163324

164325
self.log.info("Forward is_authorized_variable to OPA")
165326

166-
return True
327+
if not user:
328+
user = self.get_user()
167329

330+
if not details:
331+
key = None
332+
else:
333+
key = details.key
334+
335+
return self._is_authorized_in_opa(
336+
'is_authorized_variable',
337+
OpaInput({
338+
'input': {
339+
'method': method,
340+
'details': {
341+
'key': key,
342+
},
343+
'user': {
344+
'id': user.get_id(),
345+
'name': user.get_name(),
346+
},
347+
}
348+
})
349+
)
350+
351+
@override
168352
def is_authorized_view(
169353
self,
170354
*,
@@ -181,8 +365,23 @@ def is_authorized_view(
181365

182366
self.log.info("Forward is_authorized_view to OPA")
183367

184-
return True
368+
if not user:
369+
user = self.get_user()
185370

371+
return self._is_authorized_in_opa(
372+
'is_authorized_view',
373+
OpaInput({
374+
'input': {
375+
'access_view': access_view.name,
376+
'user': {
377+
'id': user.get_id(),
378+
'name': user.get_name(),
379+
},
380+
}
381+
})
382+
)
383+
384+
@override
186385
def is_authorized_custom_view(
187386
self,
188387
*,
@@ -208,4 +407,19 @@ def is_authorized_custom_view(
208407

209408
self.log.info("Forward is_authorized_custom_view to OPA")
210409

211-
return True
410+
if not user:
411+
user = self.get_user()
412+
413+
return self._is_authorized_in_opa(
414+
'is_authorized_custom_view',
415+
OpaInput({
416+
'input': {
417+
'method': method,
418+
'resource_name': resource_name,
419+
'user': {
420+
'id': user.get_id(),
421+
'name': user.get_name(),
422+
},
423+
}
424+
})
425+
)

0 commit comments

Comments
 (0)