Skip to content

Commit 0f8e388

Browse files
authored
Added data update callbacks config (#219)
* Added data update callbacks config * Fixed comment
1 parent f348266 commit 0f8e388

File tree

2 files changed

+53
-2
lines changed

2 files changed

+53
-2
lines changed

horizon/config.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
from typing import Any
2+
13
from opal_common.confi import Confi, confi
4+
from opal_common.schemas.data import CallbackEntry
5+
from pydantic import parse_obj_as, parse_raw_as
26

37
MOCK_API_KEY = "MUST BE DEFINED"
48

@@ -271,6 +275,21 @@ def __new__(cls, prefix=None, is_model=True):
271275
description="The path to the file that contains the PDP version",
272276
)
273277

278+
@staticmethod
279+
def parse_callbacks(value: Any) -> list[CallbackEntry]:
280+
if isinstance(value, str):
281+
return parse_raw_as(list[CallbackEntry], value)
282+
else:
283+
return parse_obj_as(list[CallbackEntry], value)
284+
285+
DATA_UPDATE_CALLBACKS: list[CallbackEntry] = confi.str(
286+
"DATA_UPDATE_CALLBACKS",
287+
[],
288+
description="List of callbacks to be triggered when data is updated",
289+
cast=parse_callbacks,
290+
cast_from_json=parse_callbacks,
291+
)
292+
274293
# non configurable values -------------------------------------------------
275294

276295
# redoc configuration (openapi schema)

horizon/factdb/client.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
from opal_client.policy_store.schemas import PolicyStoreTypes
1919
from opal_common.authentication.deps import JWTAuthenticator
2020
from opal_common.authentication.verifier import JWTVerifier
21+
from opal_common.fetcher.providers.http_fetch_provider import (
22+
HttpFetcherConfig,
23+
HttpMethods,
24+
)
2125
from starlette import status
2226
from starlette.responses import JSONResponse
2327

@@ -39,6 +43,11 @@ async def check_healthy(self) -> bool:
3943
async def check_ready(self) -> bool:
4044
return self._backup_loaded or await self.policy_store.is_ready()
4145

46+
def _init_fast_api_app(self) -> FastAPI:
47+
# Called at the end of OPALClient.__init__
48+
self._inject_extra_callbacks()
49+
return super()._init_fast_api_app()
50+
4251
def _configure_api_routes(self, app: FastAPI):
4352
"""mounts the api routes on the app object."""
4453

@@ -48,13 +57,14 @@ def _configure_api_routes(self, app: FastAPI):
4857
policy_router = init_policy_router(policy_updater=self.policy_updater)
4958
data_router = init_data_router(data_updater=self.data_updater)
5059
policy_store_router = init_policy_store_router(authenticator)
51-
callbacks_router = init_callbacks_api(authenticator, self._callbacks_register)
5260

5361
# mount the api routes on the app object
5462
app.include_router(policy_router, tags=["Policy Updater"])
5563
app.include_router(data_router, tags=["Data Updater"])
5664
app.include_router(policy_store_router, tags=["Policy Store"])
57-
app.include_router(callbacks_router, tags=["Callbacks"])
65+
66+
# excluded callbacks api from the main api, since we use it internally.
67+
# Use the DATA_UPDATE_CALLBACKS config to configure callbacks instead
5868

5969
# top level routes (i.e: healthchecks)
6070
@app.get("/healthcheck", include_in_schema=False)
@@ -90,6 +100,28 @@ async def ready():
90100

91101
return app
92102

103+
def _inject_extra_callbacks(self) -> None:
104+
register = self._callbacks_register
105+
default_config = HttpFetcherConfig(
106+
method=HttpMethods.POST,
107+
headers={"content-type": "application/json"},
108+
process_data=False,
109+
fetcher=None,
110+
)
111+
for entry in sidecar_config.DATA_UPDATE_CALLBACKS:
112+
entry.config = entry.config or default_config
113+
entry.key = entry.key or register.calc_hash(entry.url, entry.config)
114+
115+
if register.get(entry.key):
116+
raise RuntimeError(
117+
f"Callback with key '{entry.key}' already exists. Please specify a different key."
118+
)
119+
120+
logger.info(
121+
f"Registering data update callback to url '{entry.url}' with key '{entry.key}'"
122+
)
123+
register.put(entry.url, entry.config, entry.key)
124+
93125

94126
class FactDBClient(ExtendedOpalClient):
95127
def __init__(

0 commit comments

Comments
 (0)