diff --git a/src/audit.rs b/src/audit.rs new file mode 100644 index 000000000..0cddb602b --- /dev/null +++ b/src/audit.rs @@ -0,0 +1,303 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{ + collections::HashMap, + fmt::{Debug, Display}, +}; + +use crate::{about::current, storage::StorageMetadata}; + +use super::option::CONFIG; +use chrono::{DateTime, Utc}; +use once_cell::sync::Lazy; +use reqwest::Client; +use serde::Serialize; +use serde_json::{json, Value}; +use tracing::error; + +use ulid::Ulid; +use url::Url; + +static AUDIT_LOGGER: Lazy> = Lazy::new(AuditLogger::new); + +// AuditLogger handles sending audit logs to a remote logging system +pub struct AuditLogger { + client: Client, + log_endpoint: Url, +} + +impl AuditLogger { + /// Create an audit logger that can be used to capture and push + /// audit logs to the appropriate logging system over HTTP + pub fn new() -> Option { + // Try to construct the log endpoint URL by joining the base URL + // with the ingest path, This can fail if the URL is not valid, + // when the base URL is not set or the ingest path is not valid + let log_endpoint = match CONFIG + .parseable + .audit_logger + .as_ref()? + .join("/api/v1/ingest") + { + Ok(url) => url, + Err(err) => { + eprintln!("Couldn't setup audit logger: {err}"); + return None; + } + }; + + Some(AuditLogger { + client: reqwest::Client::new(), + log_endpoint, + }) + } + + // Sends the audit log to the configured endpoint with proper authentication + async fn send_log(&self, json: Value) { + let mut req = self + .client + .post(self.log_endpoint.as_str()) + .json(&json) + .header("x-p-stream", "audit_log"); + + // Use basic auth if credentials are configured + if let Some(username) = CONFIG.parseable.audit_username.as_ref() { + req = req.basic_auth(username, CONFIG.parseable.audit_password.as_ref()) + } + + match req.send().await { + Ok(r) => { + if let Err(e) = r.error_for_status() { + error!("{e}") + } + } + Err(e) => error!("Failed to send audit event: {}", e), + } + } +} + +// Represents the version of the audit log format +#[non_exhaustive] +#[repr(u8)] +#[derive(Debug, Clone, Copy, Serialize, Default)] +pub enum AuditLogVersion { + // NOTE: default should be latest version + #[default] + V1 = 1, +} + +#[derive(Serialize, Default)] +pub struct AuditDetails { + pub version: AuditLogVersion, + pub id: Ulid, + pub generated_at: DateTime, +} + +#[derive(Serialize, Default)] +pub struct ServerDetails { + pub version: String, + pub deployment_id: Ulid, +} + +// Contains information about the actor (user) who performed the action +#[derive(Serialize, Default)] +pub struct ActorDetails { + pub remote_host: String, + pub user_agent: String, + pub username: String, + pub authorization_method: String, +} + +// Contains details about the HTTP request that was made +#[derive(Serialize, Default)] +pub struct RequestDetails { + pub stream: String, + pub start_time: DateTime, + pub end_time: DateTime, + pub method: String, + pub path: String, + pub protocol: String, + pub headers: HashMap, +} + +/// Contains information about the response sent back to the client +#[derive(Default, Serialize)] +pub struct ResponseDetails { + pub status_code: u16, + pub error: Option, +} + +/// The main audit log structure that combines all audit information +#[derive(Serialize)] +pub struct AuditLog { + pub audit: AuditDetails, + pub parseable_server: ServerDetails, + pub actor: ActorDetails, + pub request: RequestDetails, + pub response: ResponseDetails, +} + +/// Builder pattern implementation for constructing audit logs +pub struct AuditLogBuilder { + // Used to ensure that log is only constructed if the logger is enabled + enabled: bool, + inner: AuditLog, +} + +impl Default for AuditLogBuilder { + fn default() -> Self { + AuditLogBuilder { + enabled: AUDIT_LOGGER.is_some(), + inner: AuditLog { + audit: AuditDetails { + version: AuditLogVersion::V1, + id: Ulid::new(), + ..Default::default() + }, + parseable_server: ServerDetails { + version: current().released_version.to_string(), + deployment_id: StorageMetadata::global().deployment_id, + }, + request: RequestDetails { + start_time: Utc::now(), + ..Default::default() + }, + actor: ActorDetails::default(), + response: ResponseDetails::default(), + }, + } + } +} + +impl AuditLogBuilder { + /// Sets the remote host for the audit log + pub fn with_host(mut self, host: impl Into) -> Self { + if self.enabled { + self.inner.actor.remote_host = host.into(); + } + self + } + + /// Sets the username for the audit log + pub fn with_username(mut self, username: impl Into) -> Self { + if self.enabled { + self.inner.actor.username = username.into(); + } + self + } + + /// Sets the user agent for the audit log + pub fn with_user_agent(mut self, user_agent: impl Into) -> Self { + if self.enabled { + self.inner.actor.user_agent = user_agent.into(); + } + self + } + + /// Sets the authorization method for the audit log + pub fn with_auth_method(mut self, auth_method: impl Into) -> Self { + if self.enabled { + self.inner.actor.authorization_method = auth_method.into(); + } + self + } + + /// Sets the stream for the request details + pub fn with_stream(mut self, stream: impl Into) -> Self { + if self.enabled { + self.inner.request.stream = stream.into(); + } + self + } + + /// Sets the request method details + pub fn with_method(mut self, method: impl Into) -> Self { + if self.enabled { + self.inner.request.method = method.into(); + } + self + } + + /// Sets the request path + pub fn with_path(mut self, path: impl Into) -> Self { + if self.enabled { + self.inner.request.path = path.into(); + } + self + } + + /// Sets the request protocol + pub fn with_protocol(mut self, protocol: impl Into) -> Self { + if self.enabled { + self.inner.request.protocol = protocol.into(); + } + self + } + + /// Sets the request headers + pub fn with_headers(mut self, headers: impl IntoIterator) -> Self { + if self.enabled { + self.inner.request.headers = headers.into_iter().collect(); + } + self + } + + /// Sets the response status code + pub fn with_status(mut self, status_code: u16) -> Self { + if self.enabled { + self.inner.response.status_code = status_code; + } + self + } + + /// Sets the response error if any + pub fn with_error(mut self, err: impl Display) -> Self { + if self.enabled { + let error = err.to_string(); + if !error.is_empty() { + self.inner.response.error = Some(error); + } + } + self + } + + /// Sends the audit log to the logging server if configured + pub async fn send(self) { + // ensures that we don't progress if logger is not enabled + if !self.enabled { + return; + } + + // build the audit log + let AuditLogBuilder { + inner: mut audit_log, + .. + } = self; + + let now = Utc::now(); + audit_log.audit.generated_at = now; + audit_log.request.end_time = now; + + AUDIT_LOGGER + .as_ref() + .unwrap() + .send_log(json!(audit_log)) + .await + } +} diff --git a/src/cli.rs b/src/cli.rs index 9021649b5..82cd27d5e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -116,6 +116,11 @@ pub struct Cli { pub kafka_client_id: Option, pub kafka_security_protocol: Option, pub kafka_partitions: Option, + + // Audit Logging env vars + pub audit_logger: Option, + pub audit_username: Option, + pub audit_password: Option, } impl Cli { @@ -165,6 +170,10 @@ impl Cli { pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol"; pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions"; + pub const AUDIT_LOGGER: &'static str = "audit-logger"; + pub const AUDIT_USERNAME: &'static str = "audit-username"; + pub const AUDIT_PASSWORD: &'static str = "audit-password"; + pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) } @@ -219,6 +228,29 @@ impl Cli { .env("P_KAFKA_PARTITIONS") .value_name("STRING") .help("Kafka partitions"), + ) + .arg( + Arg::new(Self::AUDIT_LOGGER) + .long(Self::AUDIT_LOGGER) + .env("P_AUDIT_LOGGER") + .value_name("URL") + .required(false) + .value_parser(validation::url) + .help("Audit logger endpoint"), + ) + .arg( + Arg::new(Self::AUDIT_USERNAME) + .long(Self::AUDIT_USERNAME) + .env("P_AUDIT_USERNAME") + .value_name("STRING") + .help("Audit logger username"), + ) + .arg( + Arg::new(Self::AUDIT_PASSWORD) + .long(Self::AUDIT_PASSWORD) + .env("P_AUDIT_PASSWORD") + .value_name("STRING") + .help("Audit logger password"), ) .arg( Arg::new(Self::TRINO_ENDPOINT) @@ -536,6 +568,10 @@ impl FromArgMatches for Cli { .cloned(); self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); + self.audit_logger = m.get_one::(Self::AUDIT_LOGGER).cloned(); + self.audit_username = m.get_one::(Self::AUDIT_USERNAME).cloned(); + self.audit_password = m.get_one::(Self::AUDIT_PASSWORD).cloned(); + self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.trusted_ca_certs_path = m.get_one::(Self::TRUSTED_CA_CERTS_PATH).cloned(); diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs new file mode 100644 index 000000000..79e180d71 --- /dev/null +++ b/src/handlers/http/audit.rs @@ -0,0 +1,132 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use super::middleware::Message; +use actix_web::{ + body::MessageBody, + dev::{ServiceRequest, ServiceResponse}, + middleware::Next, +}; +use actix_web_httpauth::extractors::basic::BasicAuth; +use ulid::Ulid; + +use crate::{ + audit::AuditLogBuilder, + handlers::{KINESIS_COMMON_ATTRIBUTES_KEY, STREAM_NAME_HEADER_KEY}, + rbac::{map::SessionKey, Users}, +}; + +const DROP_HEADERS: [&str; 4] = ["authorization", "cookie", "user-agent", "x-p-stream"]; + +/// A middleware that logs incoming requests and outgoing responses from parseable's HTTP API to an audit service +pub async fn audit_log_middleware( + mut req: ServiceRequest, + next: Next, +) -> Result, actix_web::Error> { + // Start building the audit log entry + let mut log_builder = AuditLogBuilder::default(); + + // Get stream name from request headers, if available + if let Some(kinesis_common_attributes) = + req.request().headers().get(KINESIS_COMMON_ATTRIBUTES_KEY) + { + let attribute_value: &str = kinesis_common_attributes.to_str().unwrap(); + let message: Message = serde_json::from_str(attribute_value).unwrap(); + log_builder = log_builder.with_stream(message.common_attributes.x_p_stream); + } else if let Some(stream) = req.match_info().get("logstream") { + log_builder = log_builder.with_stream(stream); + } else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) { + if let Ok(stream) = value.to_str() { + log_builder = log_builder.with_stream(stream); + } + } + + // Get username and authorization method + let mut username = "Unknown".to_owned(); + let mut authorization_method = "None".to_owned(); + + // Extract authorization details from request, either from basic auth + // header or cookie, else use default value. + if let Ok(creds) = req.extract::().into_inner() { + username = creds.user_id().trim().to_owned(); + authorization_method = "Basic Auth".to_owned(); + } else if let Some(cookie) = req.cookie("session") { + authorization_method = "Session Cookie".to_owned(); + if let Some(user_id) = Ulid::from_string(cookie.value()) + .ok() + .and_then(|ulid| Users.get_username_from_session(&SessionKey::SessionId(ulid))) + { + username = user_id; + } + } + + // Add details to the audit log, based on the incoming request + // NOTE: we save on the cost of cloning by doing so only if audit logger is configured + log_builder = log_builder + .with_host( + req.connection_info() + .realip_remote_addr() + .unwrap_or_default(), + ) + .with_user_agent( + req.headers() + .get("User-Agent") + .and_then(|a| a.to_str().ok()) + .unwrap_or_default(), + ) + .with_username(username) + .with_auth_method(authorization_method) + .with_method(req.method().as_str()) + .with_path(req.path()) + .with_protocol(req.connection_info().scheme().to_string()) + .with_headers( + req.headers() + .iter() + .filter_map(|(name, value)| match name.as_str() { + // NOTE: drop headers that are not required + name if DROP_HEADERS.contains(&name.to_lowercase().as_str()) => None, + name => { + // NOTE: Drop headers that can't be parsed as string + value + .to_str() + .map(|value| (name.to_owned(), value.to_string())) + .ok() + } + }), + ); + + // forward request to parseable + let res = next.call(req).await; + + // Capture status_code and error information from outgoing response + match &res { + Ok(res) => { + log_builder = log_builder.with_status(res.status().as_u16()); + // Use error information from reponse object if an error + if let Some(err) = res.response().error() { + log_builder = log_builder.with_error(err); + } + } + Err(err) => log_builder = log_builder.with_status(500).with_error(err), + } + + // Send the audit log to audit service, if configured + log_builder.send().await; + + res +} diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 79e7f6e15..d02513637 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -23,7 +23,7 @@ use actix_web::{ dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}, error::{ErrorBadRequest, ErrorForbidden, ErrorUnauthorized}, http::header::{self, HeaderName}, - Error, Route, + Error, HttpMessage, Route, }; use futures_util::future::LocalBoxFuture; @@ -45,16 +45,16 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -struct Message { - common_attributes: CommonAttributes, +pub struct Message { + pub common_attributes: CommonAttributes, } #[derive(Serialize, Deserialize, Debug)] -struct CommonAttributes { +pub struct CommonAttributes { #[serde(rename = "Authorization")] authorization: String, #[serde(rename = "X-P-Stream")] - x_p_stream: String, + pub x_p_stream: String, } pub trait RouteExt { @@ -137,23 +137,18 @@ where For requests made from other clients, no change. ## Section start */ - if let Some((_, kinesis_common_attributes)) = req - .request() - .headers() - .iter() - .find(|&(key, _)| key == KINESIS_COMMON_ATTRIBUTES_KEY) + if let Some(kinesis_common_attributes) = + req.request().headers().get(KINESIS_COMMON_ATTRIBUTES_KEY) { let attribute_value: &str = kinesis_common_attributes.to_str().unwrap(); let message: Message = serde_json::from_str(attribute_value).unwrap(); req.headers_mut().insert( HeaderName::from_static(AUTHORIZATION_KEY), - header::HeaderValue::from_str(&message.common_attributes.authorization.clone()) - .unwrap(), + header::HeaderValue::from_str(&message.common_attributes.authorization).unwrap(), ); req.headers_mut().insert( HeaderName::from_static(STREAM_NAME_HEADER_KEY), - header::HeaderValue::from_str(&message.common_attributes.x_p_stream.clone()) - .unwrap(), + header::HeaderValue::from_str(&message.common_attributes.x_p_stream).unwrap(), ); req.headers_mut().insert( HeaderName::from_static(LOG_SOURCE_KEY), @@ -164,6 +159,7 @@ where /* ## Section end */ let auth_result: Result<_, Error> = (self.auth_method)(&mut req, self.action); + let fut = self.service.call(req); Box::pin(async move { match auth_result? { @@ -175,6 +171,7 @@ where ), _ => {} } + fut.await }) } @@ -192,11 +189,7 @@ pub fn auth_stream_context( let creds = extract_session_key(req); let mut stream = req.match_info().get("logstream"); if stream.is_none() { - if let Some((_, stream_name)) = req - .headers() - .iter() - .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) - { + if let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) { stream = Some(stream_name.to_str().unwrap()); } } diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 8fabc8f1f..99ab350c1 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -28,6 +28,7 @@ use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY}; use self::{cluster::get_ingestor_info, query::Query}; pub mod about; +mod audit; pub mod cluster; pub mod correlation; pub mod health_check; diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 5080f2038..17d2cbb4b 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -41,6 +41,7 @@ use ssl_acceptor::get_ssl_acceptor; use tokio::sync::{oneshot, Mutex}; use tracing::{error, info, warn}; +use super::audit; use super::cross_origin_config; use super::API_BASE_PATH; use super::API_VERSION; @@ -101,6 +102,7 @@ pub trait ParseableServer { .wrap(prometheus.clone()) .configure(|config| Self::configure_routes(config, oidc_client.clone())) .wrap(from_fn(health_check::check_shutdown_middleware)) + .wrap(from_fn(audit::audit_log_middleware)) .wrap(actix_web::middleware::Logger::default()) .wrap(actix_web::middleware::Compress::default()) .wrap(cross_origin_config()) diff --git a/src/kafka.rs b/src/kafka.rs index b917eca83..1ad20fc73 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -33,6 +33,7 @@ use std::sync::Arc; use std::{collections::HashMap, fmt::Debug}; use tracing::{debug, error, info, warn}; +use crate::audit::AuditLogBuilder; use crate::event::format::LogSource; use crate::option::CONFIG; use crate::{ @@ -240,8 +241,19 @@ pub async fn setup_integration() { let mut stream = consumer.stream(); while let Ok(curr) = stream.next().await.unwrap() { - if let Err(err) = ingest_message(curr).await { - error!("Unable to ingest incoming kafka message- {err}") - } + // TODO: maybe we should not constructs an audit log for each kafka message, but do so at the batch level + let log_builder = AuditLogBuilder::default() + .with_host(CONFIG.parseable.kafka_host.as_deref().unwrap_or("")) + .with_user_agent("Kafka Client") + .with_protocol("Kafka") + .with_stream(curr.topic()); + + let Err(err) = ingest_message(curr).await else { + log_builder.with_status(200).send().await; + continue; + }; + error!("Unable to ingest incoming kafka message- {err}"); + + log_builder.with_status(500).with_error(err).send().await; } } diff --git a/src/lib.rs b/src/lib.rs index 951bb432a..cd703c4dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ mod about; mod alerts; pub mod analytics; +pub mod audit; pub mod banner; mod catalog; mod cli; diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 49db5af7f..d9c31b8c9 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -20,6 +20,7 @@ use crate::rbac::user::User; use crate::{option::CONFIG, storage::StorageMetadata}; use std::{collections::HashMap, sync::Mutex}; +use super::Response; use super::{ role::{model::DefaultPrivilege, Action, Permission, RoleBuilder}, user, @@ -193,16 +194,16 @@ impl Sessions { } // returns None if user is not in the map - // Otherwise returns Some(is_authenticated) + // Otherwise returns Some(Response) where response is authorized/unauthorized pub fn check_auth( &self, key: &SessionKey, required_action: Action, context_stream: Option<&str>, context_user: Option<&str>, - ) -> Option { + ) -> Option { self.active_sessions.get(key).map(|(username, perms)| { - perms.iter().any(|user_perm| { + if perms.iter().any(|user_perm| { match *user_perm { // if any action is ALL then we we authorize Permission::Unit(action) => action == required_action || action == Action::All, @@ -221,7 +222,11 @@ impl Sessions { } _ => false, } - }) + }) { + Response::Authorized + } else { + Response::UnAuthorized + } }) } diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index 1679d7ffa..adee85920 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -126,11 +126,7 @@ impl Users { ) -> Response { // try fetch from auth map for faster auth flow if let Some(res) = sessions().check_auth(&key, action, context_stream, context_user) { - return if res { - Response::Authorized - } else { - Response::UnAuthorized - }; + return res; } // attempt reloading permissions into new session for basic auth user @@ -155,14 +151,9 @@ impl Users { DateTime::::MAX_UTC, roles_to_permission(user.roles()), ); - return if sessions + return sessions .check_auth(&key, action, context_stream, context_user) - .expect("entry for this key just added") - { - Response::Authorized - } else { - Response::UnAuthorized - }; + .expect("entry for this key just added"); } }