|
2 | 2 | Custom Auth manager for Airflow |
3 | 3 | """ |
4 | 4 |
|
5 | | -from typing import cast, override |
| 5 | +from typing import override |
6 | 6 | from airflow.auth.managers.base_auth_manager import ResourceMethod |
7 | 7 | from airflow.auth.managers.models.base_user import BaseUser |
8 | 8 | from airflow.auth.managers.models.resource_details import ( |
@@ -61,23 +61,23 @@ def _init_config(self): |
61 | 61 | config.setdefault("AUTH_OPA_REQUEST_URL", "http://opa:8081/v1/data/airflow") |
62 | 62 | config.setdefault("AUTH_OPA_REQUEST_TIMEOUT", 10) |
63 | 63 |
|
64 | | - def call_opa(self, url: str, json: dict, timeout: int) -> requests.Response | None: |
65 | | - self.opa_session.post(url=url, json=json, timeout=timeout) |
| 64 | + def call_opa(self, url: str, json: dict, timeout: int) -> requests.Response: |
| 65 | + return self.opa_session.post(url=url, json=json, timeout=timeout) |
66 | 66 |
|
67 | 67 | @cachedmethod(lambda self: self.opa_cache) |
68 | 68 | def _is_authorized_in_opa(self, endpoint: str, input: OpaInput) -> bool: |
69 | 69 | config = self.appbuilder.get_app.config |
70 | 70 | opa_url = config.get("AUTH_OPA_REQUEST_URL") |
71 | 71 | try: |
72 | | - response = cast(requests.Response, self.call_opa( |
| 72 | + response = self.call_opa( |
73 | 73 | f'{opa_url}/{endpoint}', |
74 | 74 | json=input.to_dict(), |
75 | 75 | timeout=config.get("AUTH_OPA_REQUEST_TIMEOUT") |
76 | | - )) |
| 76 | + ) |
77 | 77 | result = response.json().get("result") |
78 | 78 | return result == True |
79 | 79 | except Exception as e: |
80 | | - self.log.error(f"Request to OPA failed: {e}") |
| 80 | + self.log.error(f"Request to OPA failed", exc_info=e) |
81 | 81 | return False |
82 | 82 |
|
83 | 83 | @override |
|
0 commit comments