Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ ENV PDP_HORIZON_HOST=0.0.0.0
ENV PDP_HORIZON_PORT=7001
ENV PDP_PORT=7000
ENV PDP_PYTHON_PATH=python3
ENV NO_PROXY=localhost,127.0.0.1,::1

# 7000 pdp port
# 7001 horizon port
Expand Down
2 changes: 1 addition & 1 deletion horizon/enforcer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async def post_to_opa(request: Request, path: str, data: dict | None):
_set_use_debugger(data)
try:
logger.debug(f"calling OPA at '{url}' with input: {data}")
async with aiohttp.ClientSession() as session: # noqa: SIM117
async with aiohttp.ClientSession(trust_env=True) as session: # noqa: SIM117
async with session.post(
url,
data=json.dumps(data) if data is not None else None,
Expand Down
1 change: 1 addition & 0 deletions horizon/facts/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def client(self) -> AsyncClient:
self._client = AsyncClient(
base_url=sidecar_config.CONTROL_PLANE,
headers={"Authorization": f"Bearer {env_api_key}"},
trust_env=True,
)
return self._client

Expand Down
6 changes: 4 additions & 2 deletions horizon/opal_relay_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def _apply_context(self, context: dict[str, str]):
def api_session(self) -> ClientSession:
if self._api_session is None:
env_api_key = get_env_api_key()
self._api_session = ClientSession(headers={"Authorization": f"Bearer {env_api_key}"})
self._api_session = ClientSession(headers={"Authorization": f"Bearer {env_api_key}"}, trust_env=True)
return self._api_session

async def relay_session(self) -> ClientSession:
Expand Down Expand Up @@ -133,7 +133,9 @@ async def relay_session(self) -> ClientSession:
f"Server responded to token request with an invalid result: {text}",
) from e
self._relay_token = obj.token
self._relay_session = ClientSession(headers={"Authorization": f"Bearer {self._relay_token}"})
self._relay_session = ClientSession(
headers={"Authorization": f"Bearer {self._relay_token}"}, trust_env=True
)
return self._relay_session

async def send_ping(self):
Expand Down
4 changes: 3 additions & 1 deletion horizon/proxy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ async def proxy_request_to_cloud_service(

logger.info(f"Proxying request: {request.method} {path}")

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(
trust_env=True,
) as session:
if request.method == HTTP_GET:
async with session.get(path, headers=headers, params=params) as backend_response:
return await proxy_response(backend_response)
Expand Down
4 changes: 3 additions & 1 deletion horizon/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ async def _report(self, state: PersistentState | None = None):
if state is not None:
self._state = state.copy()
config_url = f"{sidecar_config.CONTROL_PLANE}{sidecar_config.REMOTE_STATE_ENDPOINT}"
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(
trust_env=True,
) as session:
logger.info("Reporting status update to server...")
response = await session.post(
url=config_url,
Expand Down
40 changes: 39 additions & 1 deletion pdp-server/src/opa_client/allowed.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
use crate::opa_client::{send_request_to_opa, ForwardingError};
use crate::state::AppState;
use log::{debug, info};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Display;
use utoipa::ToSchema;

/// Send an allowed query to OPA and get the result
pub async fn query_allowed(
state: &AppState,
query: &AllowedQuery,
) -> Result<AllowedResult, ForwardingError> {
send_request_to_opa::<AllowedResult, _>(state, "/v1/data/permit/root", query).await
let result =
send_request_to_opa::<AllowedResult, _>(state, "/v1/data/permit/root", query).await;
if let Ok(response) = &result {
if state.config.debug.unwrap_or(false) {
info!(
"permit.check(\"{user}\", \"{action}\", \"{resource}\") -> {result}",
user = query.user,
action = query.action,
resource = query.resource,
result = response.allow,
);
debug!(
"Query: {}\nResult: {}",
serde_json::to_string_pretty(query).unwrap_or("Serialization error".to_string()),
serde_json::to_string_pretty(response).unwrap_or("Serialization error".to_string()),
);
}
}
result
}

#[derive(Debug, Serialize, Deserialize, ToSchema, Clone, PartialEq)]
Expand All @@ -30,6 +50,12 @@ pub struct User {
pub attributes: HashMap<String, serde_json::Value>,
}

impl Display for User {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.key)
}
}

#[derive(Debug, Serialize, Deserialize, ToSchema, Clone, PartialEq)]
pub struct Resource {
/// Type of the resource
Expand All @@ -48,6 +74,18 @@ pub struct Resource {
pub context: HashMap<String, serde_json::Value>,
}

impl Display for Resource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(key) = &self.key {
write!(f, "{}:{}", self.r#type, key)
} else if let Some(tenant) = &self.tenant {
write!(f, "{}@{}", self.r#type, tenant)
} else {
write!(f, "{}", self.r#type)
}
}
}

/// Authorization query parameters for the allowed endpoint
#[derive(Debug, Serialize, Deserialize, ToSchema, Clone, PartialEq)]
pub struct AllowedQuery {
Expand Down
29 changes: 27 additions & 2 deletions pdp-server/src/opa_client/allowed_bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,31 @@ pub async fn query_allowed_bulk(
let bulk_query = BulkAuthorizationQuery {
checks: queries.to_vec(),
};
send_request_to_opa::<BulkAuthorizationResult, _>(state, "/v1/data/permit/bulk", &bulk_query)
.await
let result = send_request_to_opa::<BulkAuthorizationResult, _>(
state,
"/v1/data/permit/bulk",
&bulk_query,
)
.await;

// Add debug logging if enabled
if let Ok(response) = &result {
if state.config.debug.unwrap_or(false) {
let allowed_count = response.allow.iter().filter(|r| r.allow).count();
log::info!(
"permit.bulk_check({} queries) -> {} allowed, {} denied",
queries.len(),
allowed_count,
response.allow.len() - allowed_count
);
log::debug!(
"Query: {}\nResult: {}",
serde_json::to_string_pretty(&bulk_query)
.unwrap_or("Serialization error".to_string()),
serde_json::to_string_pretty(response).unwrap_or("Serialization error".to_string()),
);
}
}

result
}
41 changes: 38 additions & 3 deletions pdp-server/src/opa_client/authorized_users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,27 @@ pub async fn query_authorized_users(
// Process the result to extract the nested 'result' field if it exists
if let serde_json::Value::Object(map) = &result {
if let Some(inner_result) = map.get("result") {
return Ok(serde_json::from_value(inner_result.clone())?);
let authorized_result: AuthorizedUsersResult =
serde_json::from_value(inner_result.clone())?;

// Add debug logging if enabled
if state.config.debug.unwrap_or(false) {
log::info!(
"permit.authorized_users(\"{}\", \"{}\") -> {} users",
query.action,
query.resource,
authorized_result.users.len()
);
log::debug!(
"Query: {}\nResult: {}",
serde_json::to_string_pretty(query)
.unwrap_or("Serialization error".to_string()),
serde_json::to_string_pretty(&authorized_result)
.unwrap_or("Serialization error".to_string()),
);
}

return Ok(authorized_result);
}
}

Expand All @@ -40,11 +60,26 @@ pub async fn query_authorized_users(
.clone()
.unwrap_or_else(|| "default".to_string());

Ok(AuthorizedUsersResult {
let empty_result = AuthorizedUsersResult {
resource: format!("{}:{}", query.resource.r#type, resource_key),
tenant,
users: HashMap::new(),
})
};

// Add debug logging if enabled
if state.config.debug.unwrap_or(false) {
log::info!(
"permit.authorized_users(\"{}\", \"{}\") -> 0 users",
query.action,
query.resource
);
log::debug!(
"Query: {}\nResult: empty",
serde_json::to_string_pretty(query)?
);
}

Ok(empty_result)
}

/// Query parameters for the authorized users endpoint
Expand Down
43 changes: 41 additions & 2 deletions pdp-server/src/opa_client/user_permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,29 @@ pub async fn query_user_permissions(

if let serde_json::Value::Object(result_map) = response {
if let Some(permissions) = result_map.get("permissions") {
return Ok(serde_json::from_value(permissions.clone())?);
let result: HashMap<String, UserPermissionsResult> =
serde_json::from_value(permissions.clone())?;

// Add debug logging if enabled
if state.config.debug.unwrap_or(false) {
log::info!(
"permit.user_permissions(\"{}\", tenants={:?}, resources={:?}, resource_types={:?}) -> {} permissions",
query.user,
query.tenants,
query.resources,
query.resource_types,
result.len()
);
log::debug!(
"Query: {}\nResult: {}",
serde_json::to_string_pretty(query)
.unwrap_or("Serialization error".to_string()),
serde_json::to_string_pretty(&result)
.unwrap_or("Serialization error".to_string()),
);
}

return Ok(result);
} else {
debug!("No 'permissions' field found in result");
}
Expand All @@ -31,7 +53,24 @@ pub async fn query_user_permissions(
}

// If we couldn't find the permissions, return an empty map
Ok(HashMap::new())
let empty_result = HashMap::new();

// Add debug logging if enabled
if state.config.debug.unwrap_or(false) {
log::info!(
"permit.user_permissions(\"{}\", tenants={:?}, resources={:?}, resource_types={:?}) -> 0 permissions",
query.user,
query.tenants,
query.resources,
query.resource_types
);
log::debug!(
"Query: {}\nResult: empty",
serde_json::to_string_pretty(query)?
);
}

Ok(empty_result)
}

#[derive(Debug, Serialize, Deserialize, ToSchema, Clone, PartialEq)]
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ httpx>=0.27.0,<1
# TODO: change to use re2 in the future, currently not supported in alpine due to c++ library issues
# google-re2 # use re2 instead of re for regex matching because it's simiplier and safer for user inputted regexes
protobuf>=3.20.2 # not directly required, pinned by Snyk to avoid a vulnerability
opal-common==0.8.1
opal-client==0.8.1
opal-common==0.8.2
opal-client==0.8.2
16 changes: 13 additions & 3 deletions watchdog/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::health::HealthCheck;
use crate::stats::ServiceWatchdogStats;
use crate::{CommandWatchdog, CommandWatchdogOptions};
use log::{debug, error, info, warn};
use log::{debug, error, info, log, warn};
use std::sync::Arc;
use std::time::Duration;
use tokio::process::Command;
Expand Down Expand Up @@ -130,7 +130,12 @@ impl ServiceWatchdog {
}

if consecutive_failures > 0 {
info!(
log!(
if consecutive_failures < opt.health_check_failure_threshold / 2 {
log::Level::Debug
} else {
log::Level::Info
},
"Service '{}' health restored after {} failures",
command_watchdog.program_name, consecutive_failures
);
Expand All @@ -146,7 +151,12 @@ impl ServiceWatchdog {
stats.increment_failed_health_checks();
consecutive_failures += 1;

warn!(
log!(
if consecutive_failures < opt.health_check_failure_threshold / 2 {
log::Level::Debug
} else {
log::Level::Warn
},
"Service '{}' health check failed: {} (consecutive failures: {}/{})",
command_watchdog.program_name,
e,
Expand Down
Loading