Skip to content

Commit 7451150

Browse files
committed
Merge branch 'refs/heads/main' into release/0.9.0
2 parents 77189a0 + fe6ed06 commit 7451150

File tree

4 files changed

+84
-13
lines changed

4 files changed

+84
-13
lines changed

horizon/config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,16 @@ def parse_plugins(value: Any) -> dict[str, dict[str, int | bool | str]]:
265265
description="The path to the file that contains the PDP version",
266266
)
267267

268+
HORIZON_NICENESS = confi.int(
269+
"HORIZON_NICENESS",
270+
10,
271+
description=(
272+
"The niceness value for the PDP Horizon process (Python process). "
273+
"Niceness values range from -20 (highest priority) to 19 (lowest priority) with 0 is neutral. "
274+
"Adjusting this can help manage CPU resource allocation. "
275+
),
276+
)
277+
268278
@staticmethod
269279
def parse_callbacks(value: Any) -> list[CallbackEntry]:
270280
if isinstance(value, str):

horizon/facts/opal_forwarder.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from functools import cache
22
from urllib.parse import urljoin
3-
from uuid import uuid4
3+
from uuid import UUID
44

55
from opal_common.fetcher.providers.http_fetch_provider import HttpFetcherConfig
66
from opal_common.schemas.data import DataSourceEntry, DataUpdate
@@ -34,6 +34,8 @@ def create_data_source_entry(
3434
obj_id: str,
3535
obj_key: str,
3636
authorization_header: str,
37+
*,
38+
update_id: UUID,
3739
) -> DataSourceEntry:
3840
obj_id = obj_id.replace("-", "") # convert UUID to Hex
3941
url = urljoin(
@@ -45,6 +47,7 @@ def create_data_source_entry(
4547

4648
headers = {
4749
"Authorization": authorization_header,
50+
"X-Permit-Update-Id": update_id.hex,
4851
}
4952
if sidecar_config.SHARD_ID:
5053
headers["X-Shard-Id"] = sidecar_config.SHARD_ID
@@ -59,10 +62,15 @@ def create_data_source_entry(
5962
)
6063

6164

62-
def create_data_update_entry(entries: list[DataSourceEntry]) -> DataUpdate:
65+
def create_data_update_entry(
66+
entries: list[DataSourceEntry],
67+
*,
68+
update_id: UUID,
69+
) -> DataUpdate:
6370
entries_text = ", ".join(entry.dst_path for entry in entries)
71+
6472
return DataUpdate(
65-
id=uuid4().hex,
73+
id=update_id.hex,
6674
entries=entries,
6775
reason=f"Local facts upload for {entries_text}",
6876
)

horizon/facts/router.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from collections.abc import Callable, Iterable
22
from typing import Any
3+
from uuid import UUID, uuid4
34

45
from fastapi import (
56
APIRouter,
@@ -45,12 +46,13 @@ async def create_user(
4546
update_subscriber,
4647
wait_timeout,
4748
path="/users",
48-
entries_callback=lambda r, body: [
49+
entries_callback=lambda r, body, update_id: [
4950
create_data_source_entry(
5051
obj_type="users",
5152
obj_id=body["id"],
5253
obj_key=body["key"],
5354
authorization_header=r.headers.get("Authorization"),
55+
update_id=update_id,
5456
)
5557
],
5658
timeout_policy=timeout_policy,
@@ -71,12 +73,13 @@ async def create_tenant(
7173
update_subscriber,
7274
wait_timeout,
7375
path="/tenants",
74-
entries_callback=lambda r, body: [
76+
entries_callback=lambda r, body, update_id: [
7577
create_data_source_entry(
7678
obj_type="tenants",
7779
obj_id=body["id"],
7880
obj_key=body["key"],
7981
authorization_header=r.headers.get("Authorization"),
82+
update_id=update_id,
8083
)
8184
],
8285
timeout_policy=timeout_policy,
@@ -98,12 +101,13 @@ async def sync_user(
98101
update_subscriber,
99102
wait_timeout,
100103
path=f"/users/{user_id}",
101-
entries_callback=lambda r, body: [
104+
entries_callback=lambda r, body, update_id: [
102105
create_data_source_entry(
103106
obj_type="users",
104107
obj_id=body["id"],
105108
obj_key=body["key"],
106109
authorization_header=r.headers.get("Authorization"),
110+
update_id=update_id,
107111
)
108112
],
109113
timeout_policy=timeout_policy,
@@ -125,31 +129,36 @@ async def update_user(
125129
update_subscriber,
126130
wait_timeout,
127131
path=f"/users/{user_id}",
128-
entries_callback=lambda r, body: [
132+
entries_callback=lambda r, body, update_id: [
129133
create_data_source_entry(
130134
obj_type="users",
131135
obj_id=body["id"],
132136
obj_key=body["key"],
133137
authorization_header=r.headers.get("Authorization"),
138+
update_id=update_id,
134139
)
135140
],
136141
timeout_policy=timeout_policy,
137142
)
138143

139144

140-
def create_role_assignment_data_entries(request: FastApiRequest, body: dict[str, Any]) -> Iterable[DataSourceEntry]:
145+
def create_role_assignment_data_entries(
146+
request: FastApiRequest, body: dict[str, Any], update_id: UUID | None
147+
) -> Iterable[DataSourceEntry]:
141148
if not body.get("resource_instance"):
142149
yield create_data_source_entry(
143150
obj_type="role_assignments",
144151
obj_id=body["user_id"],
145152
obj_key=f"user:{body['user']}",
146153
authorization_header=request.headers.get("Authorization"),
154+
update_id=update_id,
147155
)
148156
yield create_data_source_entry(
149157
obj_type="users",
150158
obj_id=body["user_id"],
151159
obj_key=body["user"],
152160
authorization_header=request.headers.get("Authorization"),
161+
update_id=update_id,
153162
)
154163
else:
155164
# note that user_id == subject_id,
@@ -159,6 +168,7 @@ def create_role_assignment_data_entries(request: FastApiRequest, body: dict[str,
159168
obj_id=body["user_id"],
160169
obj_key=f"user:{body['user']}",
161170
authorization_header=request.headers.get("Authorization"),
171+
update_id=update_id,
162172
)
163173

164174

@@ -215,12 +225,13 @@ async def create_resource_instance(
215225
update_subscriber,
216226
wait_timeout,
217227
path="/resource_instances",
218-
entries_callback=lambda r, body: [
228+
entries_callback=lambda r, body, update_id: [
219229
create_data_source_entry(
220230
obj_type="resource_instances",
221231
obj_id=body["id"],
222232
obj_key=f"{body['resource']}:{body['key']}",
223233
authorization_header=r.headers.get("Authorization"),
234+
update_id=update_id,
224235
),
225236
],
226237
timeout_policy=timeout_policy,
@@ -242,12 +253,13 @@ async def update_resource_instance(
242253
update_subscriber,
243254
wait_timeout,
244255
path=f"/resource_instances/{instance_id}",
245-
entries_callback=lambda r, body: [
256+
entries_callback=lambda r, body, update_id: [
246257
create_data_source_entry(
247258
obj_type="resource_instances",
248259
obj_id=body["id"],
249260
obj_key=f"{body['resource']}:{body['key']}",
250261
authorization_header=r.headers.get("Authorization"),
262+
update_id=update_id,
251263
),
252264
],
253265
timeout_policy=timeout_policy,
@@ -268,12 +280,13 @@ async def create_relationship_tuple(
268280
update_subscriber,
269281
wait_timeout,
270282
path="/relationship_tuples",
271-
entries_callback=lambda r, body: [
283+
entries_callback=lambda r, body, update_id: [
272284
create_data_source_entry(
273285
obj_type="relationships",
274286
obj_id=body["object_id"],
275287
obj_key=body["object"],
276288
authorization_header=r.headers.get("Authorization"),
289+
update_id=update_id,
277290
),
278291
],
279292
timeout_policy=timeout_policy,
@@ -287,16 +300,20 @@ async def forward_request_then_wait_for_update(
287300
wait_timeout: float | None,
288301
*,
289302
path: str,
290-
entries_callback: Callable[[FastApiRequest, dict[str, Any]], Iterable[DataSourceEntry]],
303+
update_id: UUID | None = None,
304+
entries_callback: Callable[[FastApiRequest, dict[str, Any], UUID | None], Iterable[DataSourceEntry]],
291305
timeout_policy: TimeoutPolicy = TimeoutPolicy.IGNORE,
292306
) -> Response:
307+
_update_id = update_id or uuid4()
293308
response = await client.send_forward_request(request, path)
294309
body = client.extract_body(response)
295310
if body is None:
296311
return client.convert_response(response)
297312

298313
try:
299-
data_update_entry = create_data_update_entry(list(entries_callback(request, body)))
314+
data_update_entry = create_data_update_entry(
315+
list(entries_callback(request, body, _update_id)), update_id=_update_id
316+
)
300317
except KeyError as e:
301318
logger.warning(f"Missing required field {e.args[0]} in the response body, skipping wait for update.")
302319
return client.convert_response(response)
@@ -315,6 +332,7 @@ async def forward_request_then_wait_for_update(
315332
)
316333
else:
317334
logger.warning("Timeout waiting for update and policy is set to ignore")
335+
return client.convert_response(response)
318336

319337

320338
@facts_router.api_route(

horizon/pdp.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import os
23
import sys
34
from pathlib import Path
45
from uuid import UUID, uuid4
@@ -45,6 +46,37 @@
4546
OPA_LOGGER_MODULE = "opal_client.opa.logger"
4647

4748

49+
def set_process_niceness(target_nice: int) -> None:
50+
"""
51+
Attempts to set the current process's niceness value to `target_nice`.
52+
This operation is performed only once during the call.
53+
54+
This function is idempotent if the current niceness already equals `target_nice`.
55+
Setting a lower niceness value (increasing priority) may require CAP_SYS_NICE
56+
capabilities and could fail if the process lacks sufficient privileges.
57+
"""
58+
if target_nice < -20 or target_nice > 19:
59+
raise ValueError(f"Target niceness must be between -20 and 19, got {target_nice}")
60+
61+
try:
62+
current_niceness = os.nice(0) # Read current niceness without changing it
63+
delta = target_nice - current_niceness
64+
if delta != 0:
65+
os.nice(delta) # Apply the change
66+
new_niceness = os.nice(0) # Read the new niceness to confirm
67+
logging.info(
68+
"Changed the process niceness by %d from %d to %d (target was %d).",
69+
delta,
70+
current_niceness,
71+
new_niceness,
72+
target_nice,
73+
)
74+
else:
75+
logging.debug("Process niceness is already %d, which matches the target; no change made.", current_niceness)
76+
except OSError as exc:
77+
logging.warning("Failed to change process niceness to %d: %s", target_nice, exc)
78+
79+
4880
def apply_config(overrides_dict: dict, config_object: Confi):
4981
"""
5082
apply config values from dict into a confi object
@@ -131,6 +163,9 @@ def __init__(self):
131163
if sidecar_config.ENABLE_MONITORING:
132164
self._configure_monitoring()
133165

166+
if sidecar_config.HORIZON_NICENESS:
167+
set_process_niceness(sidecar_config.HORIZON_NICENESS)
168+
134169
self._opal = OpalClient(shard_id=sidecar_config.SHARD_ID, data_topics=self._fix_data_topics())
135170
self._inject_extra_callbacks()
136171
# remove default data update callbacks that are not needed and might be managed by the control plane

0 commit comments

Comments
 (0)