From 4fce69d01626763bb8e399a7e1630bbcaff33fd7 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 2 Jan 2025 12:51:51 +0530 Subject: [PATCH 01/30] feat: setup audit logging infrastructure --- src/audit.rs | 136 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/cli.rs | 36 ++++++++++++++ src/main.rs | 22 +++++++-- 3 files changed, 190 insertions(+), 4 deletions(-) create mode 100644 src/audit.rs diff --git a/src/audit.rs b/src/audit.rs new file mode 100644 index 000000000..8055d438b --- /dev/null +++ b/src/audit.rs @@ -0,0 +1,136 @@ +use std::{fmt::Debug, sync::Arc}; + +use parseable::option::CONFIG; +use reqwest::Client; +use serde_json::{json, Map, Value}; +use tokio::runtime::Handle; +use tracing::{ + error, + field::{Field, Visit}, + Event, Metadata, Subscriber, +}; +use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; +use url::Url; + +pub struct AuditLayer { + client: Arc, + log_endpoint: Url, + username: Option, + password: Option, + runtime_handle: Handle, +} + +impl AuditLayer { + /// Create an audit layer that works with the tracing system to capture + /// and push audit logs to the appropriate logger over HTTP + pub fn new(runtime_handle: Handle) -> Option { + let audit_logger = CONFIG.parseable.audit_logger.as_ref()?; + let client = Arc::new(reqwest::Client::new()); + let log_endpoint = match audit_logger.join("/api/v1/ingest") { + Ok(url) => url, + Err(err) => { + error!("Couldn't setup audit logger: {err}"); + return None; + } + }; + + let username = CONFIG.parseable.audit_username.clone(); + let password = CONFIG.parseable.audit_password.clone(); + + Some(Self { + client, + log_endpoint, + username, + password, + runtime_handle, + }) + } +} + +impl Layer for AuditLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn enabled(&self, _: &Metadata<'_>, _: Context<'_, S>) -> bool { + true // log everything if it is auditable + } + + fn on_event(&self, event: &Event<'_>, _: Context<'_, S>) { + let mut visitor = AuditVisitor::default(); + event.record(&mut visitor); + + // if the log line contains `audit = true`, construct an HTTP request and push to audit endpouint + // NOTE: We only support the ingest API of parseable for audit logging parseable + if visitor.audit { + visitor + .json + .insert("message".to_owned(), json!(visitor.message)); + + let mut req = self + .client + .post(self.log_endpoint.as_str()) + .json(&visitor.json) + .header("x-p-stream", "audit_log"); + if let Some(username) = self.username.as_ref() { + req = req.basic_auth(username, self.password.as_ref()) + } + + self.runtime_handle.spawn(async move { + match req.send().await { + Ok(r) => { + if let Err(e) = r.error_for_status() { + println!("{e}") + } + } + Err(e) => eprintln!("Failed to send audit event: {}", e), + } + }); + } + } +} + +#[derive(Debug, Default)] +struct AuditVisitor { + message: String, + json: Map, + audit: bool, +} + +impl Visit for AuditVisitor { + fn record_bool(&mut self, field: &Field, value: bool) { + if field.name() == "audit" { + self.audit = value; + } else { + self.json.insert(field.name().to_owned(), json!(value)); + } + } + + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "message" { + self.message = value.to_owned(); + } else { + self.json.insert(field.name().to_owned(), json!(value)); + } + } + + fn record_f64(&mut self, field: &Field, value: f64) { + self.json.insert(field.name().to_owned(), json!(value)); + } + + fn record_i64(&mut self, field: &Field, value: i64) { + self.json.insert(field.name().to_owned(), json!(value)); + } + + fn record_u64(&mut self, field: &Field, value: u64) { + self.json.insert(field.name().to_owned(), json!(value)); + } + + fn record_debug(&mut self, field: &Field, value: &dyn Debug) { + if field.name() == "message" { + self.message = format!("{value:?}"); + } else { + self.json + .insert(field.name().to_owned(), json!(format!("{value:?}"))); + } + } +} diff --git a/src/cli.rs b/src/cli.rs index 9021649b5..82cd27d5e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -116,6 +116,11 @@ pub struct Cli { pub kafka_client_id: Option, pub kafka_security_protocol: Option, pub kafka_partitions: Option, + + // Audit Logging env vars + pub audit_logger: Option, + pub audit_username: Option, + pub audit_password: Option, } impl Cli { @@ -165,6 +170,10 @@ impl Cli { pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol"; pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions"; + pub const AUDIT_LOGGER: &'static str = "audit-logger"; + pub const AUDIT_USERNAME: &'static str = "audit-username"; + pub const AUDIT_PASSWORD: &'static str = "audit-password"; + pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) } @@ -219,6 +228,29 @@ impl Cli { .env("P_KAFKA_PARTITIONS") .value_name("STRING") .help("Kafka partitions"), + ) + .arg( + Arg::new(Self::AUDIT_LOGGER) + .long(Self::AUDIT_LOGGER) + .env("P_AUDIT_LOGGER") + .value_name("URL") + .required(false) + .value_parser(validation::url) + .help("Audit logger endpoint"), + ) + .arg( + Arg::new(Self::AUDIT_USERNAME) + .long(Self::AUDIT_USERNAME) + .env("P_AUDIT_USERNAME") + .value_name("STRING") + .help("Audit logger username"), + ) + .arg( + Arg::new(Self::AUDIT_PASSWORD) + .long(Self::AUDIT_PASSWORD) + .env("P_AUDIT_PASSWORD") + .value_name("STRING") + .help("Audit logger password"), ) .arg( Arg::new(Self::TRINO_ENDPOINT) @@ -536,6 +568,10 @@ impl FromArgMatches for Cli { .cloned(); self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); + self.audit_logger = m.get_one::(Self::AUDIT_LOGGER).cloned(); + self.audit_username = m.get_one::(Self::AUDIT_USERNAME).cloned(); + self.audit_password = m.get_one::(Self::AUDIT_PASSWORD).cloned(); + self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.trusted_ca_certs_path = m.get_one::(Self::TRUSTED_CA_CERTS_PATH).cloned(); diff --git a/src/main.rs b/src/main.rs index d1663d539..6f6a06270 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,19 +16,33 @@ * */ +use audit::AuditLayer; use parseable::{ banner, kafka, option::{Mode, CONFIG}, rbac, storage, IngestServer, ParseableServer, QueryServer, Server, }; -use tracing_subscriber::EnvFilter; +use tokio::runtime::Handle; +use tracing_subscriber::{ + layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry, +}; + +mod audit; #[actix_web::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) + let stdout_layer = tracing_subscriber::fmt::layer() .compact() - .init(); + .with_filter(EnvFilter::from_default_env()); + let subscriber = Registry::default().with(stdout_layer); + + // Use audit logging endpoint if set, else only log to STDOUT + if let Some(audit_layer) = AuditLayer::new(Handle::current()) { + // All logs go through audit_layer before reaching the stdout_layer + subscriber.with(audit_layer).init(); + } else { + subscriber.init(); + }; // these are empty ptrs so mem footprint should be minimal let server: Box = match CONFIG.parseable.mode { From 1d5ff4582c23a9197e12b5cafe0d3a7839373c60 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 10:48:27 +0530 Subject: [PATCH 02/30] feat: audit log http requests --- src/audit.rs | 164 ++++++++++++++++++++++++++------ src/handlers/http/middleware.rs | 26 ++++- src/lib.rs | 1 + src/main.rs | 4 +- src/rbac/map.rs | 13 ++- src/rbac/mod.rs | 15 +-- 6 files changed, 169 insertions(+), 54 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 8055d438b..4cea67008 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -1,15 +1,23 @@ -use std::{fmt::Debug, sync::Arc}; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; -use parseable::option::CONFIG; +use crate::handlers::http::modal::utils::rbac_utils::get_metadata; + +use super::option::CONFIG; +use actix_web::dev::ServiceRequest; +use chrono::Utc; use reqwest::Client; +use serde::Serialize; use serde_json::{json, Map, Value}; use tokio::runtime::Handle; +use tokio::time::Instant; +use tracing::info; use tracing::{ error, field::{Field, Visit}, Event, Metadata, Subscriber, }; use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; +use ulid::Ulid; use url::Url; pub struct AuditLayer { @@ -59,13 +67,9 @@ where let mut visitor = AuditVisitor::default(); event.record(&mut visitor); - // if the log line contains `audit = true`, construct an HTTP request and push to audit endpouint + // if the log line contains `audit` string with serialized json object, construct an HTTP request and push to configured audit endpoint // NOTE: We only support the ingest API of parseable for audit logging parseable if visitor.audit { - visitor - .json - .insert("message".to_owned(), json!(visitor.message)); - let mut req = self .client .post(self.log_endpoint.as_str()) @@ -91,46 +95,146 @@ where #[derive(Debug, Default)] struct AuditVisitor { - message: String, json: Map, audit: bool, } impl Visit for AuditVisitor { - fn record_bool(&mut self, field: &Field, value: bool) { + fn record_str(&mut self, field: &Field, value: &str) { if field.name() == "audit" { - self.audit = value; - } else { - self.json.insert(field.name().to_owned(), json!(value)); + if let Ok(Value::Object(json)) = serde_json::from_str(value) { + self.audit = true; + self.json = json; + } } } - fn record_str(&mut self, field: &Field, value: &str) { - if field.name() == "message" { - self.message = value.to_owned(); - } else { - self.json.insert(field.name().to_owned(), json!(value)); - } + fn record_debug(&mut self, _: &Field, _: &dyn Debug) {} +} + +#[non_exhaustive] +#[repr(u8)] +#[derive(Debug, Clone, Copy)] +pub enum AuditLogVersion { + V1 = 1, +} + +#[derive(Serialize, Default)] +pub struct ActorLog { + pub remote_host: String, + pub user_agent: String, + pub username: String, + pub authorization_method: String, +} + +#[derive(Serialize, Default)] +pub struct RequestLog { + pub method: String, + pub path: String, + pub host: String, + pub protocol: String, + pub headers: HashMap, +} + +#[derive(Serialize)] +pub struct ResponseLog { + pub status_code: u16, +} + +impl Default for ResponseLog { + fn default() -> Self { + // Server failed to respond + ResponseLog { status_code: 500 } } +} - fn record_f64(&mut self, field: &Field, value: f64) { - self.json.insert(field.name().to_owned(), json!(value)); +pub struct AuditLogBuilder { + version: AuditLogVersion, + pub deployment_id: Ulid, + audit_id: Ulid, + start_time: Instant, + pub stream: String, + pub actor: ActorLog, + pub request: RequestLog, + pub response: ResponseLog, +} + +impl Default for AuditLogBuilder { + fn default() -> Self { + AuditLogBuilder { + version: AuditLogVersion::V1, + deployment_id: Ulid::nil(), + audit_id: Ulid::new(), + start_time: Instant::now(), + stream: String::default(), + actor: ActorLog::default(), + request: RequestLog::default(), + response: ResponseLog::default(), + } } +} - fn record_i64(&mut self, field: &Field, value: i64) { - self.json.insert(field.name().to_owned(), json!(value)); +impl AuditLogBuilder { + pub async fn set_deployment_id(&mut self) { + self.deployment_id = get_metadata().await.unwrap().deployment_id; } - fn record_u64(&mut self, field: &Field, value: u64) { - self.json.insert(field.name().to_owned(), json!(value)); + pub fn set_stream_name(&mut self, stream: String) { + self.stream = stream; } - fn record_debug(&mut self, field: &Field, value: &dyn Debug) { - if field.name() == "message" { - self.message = format!("{value:?}"); - } else { - self.json - .insert(field.name().to_owned(), json!(format!("{value:?}"))); + pub fn update_from_http(&mut self, req: &ServiceRequest) { + let conn = req.connection_info(); + + self.request = RequestLog { + method: req.method().to_string(), + path: req.path().to_string(), + host: req.uri().host().unwrap_or("").to_owned(), + protocol: conn.scheme().to_owned(), + headers: req + .headers() + .iter() + .filter(|(name, _)| match name.as_str() { + "Authorization" => false, + "Cookie" => false, + // NOTE: drop any headers that are a risk to capture + _ => true, + }) + .filter_map(|(name, value)| { + if let Ok(value) = value.to_str() { + Some((name.to_string(), value.to_string())) + } else { + None + } + }) + .collect(), + }; + self.actor = ActorLog { + remote_host: conn.realip_remote_addr().unwrap_or_default().to_owned(), + user_agent: req + .headers() + .get("User-Agent") + .and_then(|a| a.to_str().ok()) + .unwrap_or_default() + .to_owned(), + ..Default::default() } } } + +impl Drop for AuditLogBuilder { + fn drop(&mut self) { + let audit_json = json!({ + "version": self.version as u8, + "deployment_id" : self.deployment_id, + "audit_id" : self.audit_id, + "timestamp" : Utc::now().to_rfc3339(), + "time_elapsed" : self.start_time.elapsed(), + "stream" : self.stream, + "actor" : self.actor, + "request" : self.request, + "response" : self.response, + }); + info!(audit = %audit_json) + } +} diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 79e7f6e15..2812727f5 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -23,11 +23,12 @@ use actix_web::{ dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}, error::{ErrorBadRequest, ErrorForbidden, ErrorUnauthorized}, http::header::{self, HeaderName}, - Error, Route, + Error, HttpMessage, Route, }; use futures_util::future::LocalBoxFuture; use crate::{ + audit::AuditLogBuilder, handlers::{ AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, STREAM_NAME_HEADER_KEY, @@ -137,6 +138,7 @@ where For requests made from other clients, no change. ## Section start */ + let mut stream_name = None; if let Some((_, kinesis_common_attributes)) = req .request() .headers() @@ -147,25 +149,39 @@ where let message: Message = serde_json::from_str(attribute_value).unwrap(); req.headers_mut().insert( HeaderName::from_static(AUTHORIZATION_KEY), - header::HeaderValue::from_str(&message.common_attributes.authorization.clone()) - .unwrap(), + header::HeaderValue::from_str(&message.common_attributes.authorization).unwrap(), ); req.headers_mut().insert( HeaderName::from_static(STREAM_NAME_HEADER_KEY), - header::HeaderValue::from_str(&message.common_attributes.x_p_stream.clone()) - .unwrap(), + header::HeaderValue::from_str(&message.common_attributes.x_p_stream).unwrap(), ); req.headers_mut().insert( HeaderName::from_static(LOG_SOURCE_KEY), header::HeaderValue::from_static(LOG_SOURCE_KINESIS), ); + stream_name = Some(message.common_attributes.x_p_stream); + } + + if let Some(stream) = req.match_info().get("logstream") { + stream_name = Some(stream.to_owned()); + } else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) { + if let Ok(stream) = value.to_str() { + stream_name = Some(stream.to_owned()) + } } /* ## Section end */ let auth_result: Result<_, Error> = (self.auth_method)(&mut req, self.action); + + // Ensures that log will be pushed to subscriber on drop + let mut log_builder = AuditLogBuilder::default(); + log_builder.set_stream_name(stream_name.unwrap_or_default()); + log_builder.update_from_http(&req); let fut = self.service.call(req); Box::pin(async move { + log_builder.set_deployment_id().await; + match auth_result? { rbac::Response::UnAuthorized => return Err( ErrorForbidden("You don't have permission to access this resource. Please contact your administrator for assistance.") diff --git a/src/lib.rs b/src/lib.rs index 5c8e09274..a4f161f9f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ mod about; mod alerts; pub mod analytics; +pub mod audit; pub mod banner; mod catalog; mod cli; diff --git a/src/main.rs b/src/main.rs index 6f6a06270..a89377886 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,8 +16,8 @@ * */ -use audit::AuditLayer; use parseable::{ + audit::AuditLayer, banner, kafka, option::{Mode, CONFIG}, rbac, storage, IngestServer, ParseableServer, QueryServer, Server, @@ -27,8 +27,6 @@ use tracing_subscriber::{ layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry, }; -mod audit; - #[actix_web::main] async fn main() -> anyhow::Result<()> { let stdout_layer = tracing_subscriber::fmt::layer() diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 49db5af7f..d9c31b8c9 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -20,6 +20,7 @@ use crate::rbac::user::User; use crate::{option::CONFIG, storage::StorageMetadata}; use std::{collections::HashMap, sync::Mutex}; +use super::Response; use super::{ role::{model::DefaultPrivilege, Action, Permission, RoleBuilder}, user, @@ -193,16 +194,16 @@ impl Sessions { } // returns None if user is not in the map - // Otherwise returns Some(is_authenticated) + // Otherwise returns Some(Response) where response is authorized/unauthorized pub fn check_auth( &self, key: &SessionKey, required_action: Action, context_stream: Option<&str>, context_user: Option<&str>, - ) -> Option { + ) -> Option { self.active_sessions.get(key).map(|(username, perms)| { - perms.iter().any(|user_perm| { + if perms.iter().any(|user_perm| { match *user_perm { // if any action is ALL then we we authorize Permission::Unit(action) => action == required_action || action == Action::All, @@ -221,7 +222,11 @@ impl Sessions { } _ => false, } - }) + }) { + Response::Authorized + } else { + Response::UnAuthorized + } }) } diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index 1679d7ffa..adee85920 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -126,11 +126,7 @@ impl Users { ) -> Response { // try fetch from auth map for faster auth flow if let Some(res) = sessions().check_auth(&key, action, context_stream, context_user) { - return if res { - Response::Authorized - } else { - Response::UnAuthorized - }; + return res; } // attempt reloading permissions into new session for basic auth user @@ -155,14 +151,9 @@ impl Users { DateTime::::MAX_UTC, roles_to_permission(user.roles()), ); - return if sessions + return sessions .check_auth(&key, action, context_stream, context_user) - .expect("entry for this key just added") - { - Response::Authorized - } else { - Response::UnAuthorized - }; + .expect("entry for this key just added"); } } From 84b473fe31d09e8567152f405555aeff6b72fc5a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 13:46:44 +0530 Subject: [PATCH 03/30] feat: extract response status code --- src/handlers/http/middleware.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 2812727f5..336791f79 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -191,7 +191,14 @@ where ), _ => {} } - fut.await + let res = fut.await; + + // Capture status_code information from response + if let Ok(res) = &res { + log_builder.response.status_code = res.status().as_u16(); + } + + res }) } } From 50b5a613388c49031038dfff67b026d41cfb3784 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 14:11:16 +0530 Subject: [PATCH 04/30] feat: audit ingestion in kafka --- src/kafka.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/kafka.rs b/src/kafka.rs index f65b954c6..d0dece3ed 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -33,6 +33,7 @@ use std::sync::Arc; use std::{collections::HashMap, fmt::Debug}; use tracing::{debug, error, info, warn}; +use crate::audit::AuditLogBuilder; use crate::option::CONFIG; use crate::{ event::{ @@ -274,6 +275,18 @@ pub async fn setup_integration() { let mut stream = consumer.stream(); while let Ok(curr) = stream.next().await.unwrap() { + // Constructs a log for each kafka request + let mut log_builder = AuditLogBuilder::default(); + log_builder.set_deployment_id().await; + log_builder.actor.user_agent = "KAFKA_CLIENT".to_owned(); + log_builder.actor.remote_host = CONFIG + .parseable + .kafka_host + .as_ref() + .cloned() + .unwrap_or_default(); + log_builder.request.protocol = "Kafka".to_owned(); + log_builder.set_stream_name(curr.topic().to_owned()); if let Err(err) = ingest_message(curr).await { error!("Unable to ingest incoming kafka message- {err}") } From 31baab98f2ae4930a50382d94b224df4d81cf234 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 14:24:28 +0530 Subject: [PATCH 05/30] fix: ensure headers for auth are dropped --- src/audit.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 4cea67008..c2b568fc9 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -194,9 +194,9 @@ impl AuditLogBuilder { headers: req .headers() .iter() - .filter(|(name, _)| match name.as_str() { - "Authorization" => false, - "Cookie" => false, + .filter(|(name, _)| match name.as_str().to_lowercase().as_str() { + "authorization" => false, + "cookie" => false, // NOTE: drop any headers that are a risk to capture _ => true, }) From 48c32538bff46b444b6880dcef50d4903ecad2b8 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 14:35:46 +0530 Subject: [PATCH 06/30] fix: obfuscate auth headers --- src/audit.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index c2b568fc9..479a5f8a9 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -194,19 +194,21 @@ impl AuditLogBuilder { headers: req .headers() .iter() - .filter(|(name, _)| match name.as_str().to_lowercase().as_str() { - "authorization" => false, - "cookie" => false, - // NOTE: drop any headers that are a risk to capture - _ => true, - }) - .filter_map(|(name, value)| { - if let Ok(value) = value.to_str() { - Some((name.to_string(), value.to_string())) - } else { - None - } - }) + .filter_map( + |(name, value)| match name.as_str().to_lowercase().as_str() { + // NOTE: obfuscate any headers that are a risk to capture + name if name == "authorization" || name == "cookie" => { + Some((name.to_owned(), "*".to_string())) + } + name => { + // NOTE: Drop headers that can't be parsed as string + value + .to_str() + .map(|value| (name.to_owned(), value.to_string())) + .ok() + } + }, + ) .collect(), }; self.actor = ActorLog { From b456371fb1bcd14c91c44fdb19f2fca9a08a93ef Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 14:41:10 +0530 Subject: [PATCH 07/30] fix: camelCase json field names --- src/audit.rs | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 479a5f8a9..e8761c835 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -120,6 +120,7 @@ pub enum AuditLogVersion { } #[derive(Serialize, Default)] +#[serde(rename_all = "camelCase")] pub struct ActorLog { pub remote_host: String, pub user_agent: String, @@ -137,6 +138,7 @@ pub struct RequestLog { } #[derive(Serialize)] +#[serde(rename_all = "camelCase")] pub struct ResponseLog { pub status_code: u16, } @@ -148,6 +150,8 @@ impl Default for ResponseLog { } } +const OBFUSCATE_HEADERS: [&str; 2] = ["authorization", "cookie"]; + pub struct AuditLogBuilder { version: AuditLogVersion, pub deployment_id: Ulid, @@ -194,21 +198,19 @@ impl AuditLogBuilder { headers: req .headers() .iter() - .filter_map( - |(name, value)| match name.as_str().to_lowercase().as_str() { - // NOTE: obfuscate any headers that are a risk to capture - name if name == "authorization" || name == "cookie" => { - Some((name.to_owned(), "*".to_string())) - } - name => { - // NOTE: Drop headers that can't be parsed as string - value - .to_str() - .map(|value| (name.to_owned(), value.to_string())) - .ok() - } - }, - ) + .filter_map(|(name, value)| match name.as_str() { + // NOTE: obfuscate any headers that are a risk to capture + name if OBFUSCATE_HEADERS.contains(&name.to_lowercase().as_str()) => { + Some((name.to_owned(), "*".to_string())) + } + name => { + // NOTE: Drop headers that can't be parsed as string + value + .to_str() + .map(|value| (name.to_owned(), value.to_string())) + .ok() + } + }) .collect(), }; self.actor = ActorLog { @@ -228,10 +230,10 @@ impl Drop for AuditLogBuilder { fn drop(&mut self) { let audit_json = json!({ "version": self.version as u8, - "deployment_id" : self.deployment_id, - "audit_id" : self.audit_id, + "deploymentId" : self.deployment_id, + "auditId" : self.audit_id, "timestamp" : Utc::now().to_rfc3339(), - "time_elapsed" : self.start_time.elapsed(), + "timeElapsed" : self.start_time.elapsed(), "stream" : self.stream, "actor" : self.actor, "request" : self.request, From 53ba72bd6483ac43e0dff5ad083a7c71f586dcf2 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 15:01:51 +0530 Subject: [PATCH 08/30] feat: get cause of error --- src/audit.rs | 10 +++++++++- src/handlers/http/middleware.rs | 7 ++++--- src/kafka.rs | 10 +++++++--- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index e8761c835..4c14fd915 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -141,12 +141,16 @@ pub struct RequestLog { #[serde(rename_all = "camelCase")] pub struct ResponseLog { pub status_code: u16, + pub error: Option, } impl Default for ResponseLog { fn default() -> Self { // Server failed to respond - ResponseLog { status_code: 500 } + ResponseLog { + status_code: 500, + error: None, + } } } @@ -183,6 +187,10 @@ impl AuditLogBuilder { self.deployment_id = get_metadata().await.unwrap().deployment_id; } + pub fn set_response_error(&mut self, err: String) { + self.response.error = Some(err); + } + pub fn set_stream_name(&mut self, stream: String) { self.stream = stream; } diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 336791f79..3109f58d1 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -193,9 +193,10 @@ where } let res = fut.await; - // Capture status_code information from response - if let Ok(res) = &res { - log_builder.response.status_code = res.status().as_u16(); + // Capture status_code and error information from response + match &res { + Ok(res) => log_builder.response.status_code = res.status().as_u16(), + Err(err) => log_builder.set_response_error(err.to_string()), } res diff --git a/src/kafka.rs b/src/kafka.rs index d0dece3ed..3bff16477 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -287,8 +287,12 @@ pub async fn setup_integration() { .unwrap_or_default(); log_builder.request.protocol = "Kafka".to_owned(); log_builder.set_stream_name(curr.topic().to_owned()); - if let Err(err) = ingest_message(curr).await { - error!("Unable to ingest incoming kafka message- {err}") - } + + let Err(err) = ingest_message(curr).await else { + log_builder.response.status_code = 200; + continue; + }; + log_builder.set_response_error(err.to_string()); + error!("Unable to ingest incoming kafka message- {err}") } } From d34cb76be17c9adfbcfb924fbabfc2877108cca1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 15:54:40 +0530 Subject: [PATCH 09/30] drop some headers, collect parseableVersion info, log as string --- src/audit.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 4c14fd915..faea87626 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use crate::about::current; use crate::handlers::http::modal::utils::rbac_utils::get_metadata; use super::option::CONFIG; @@ -154,7 +155,7 @@ impl Default for ResponseLog { } } -const OBFUSCATE_HEADERS: [&str; 2] = ["authorization", "cookie"]; +const DROP_HEADERS: [&str; 3] = ["authorization", "cookie", "user-agent"]; pub struct AuditLogBuilder { version: AuditLogVersion, @@ -207,10 +208,8 @@ impl AuditLogBuilder { .headers() .iter() .filter_map(|(name, value)| match name.as_str() { - // NOTE: obfuscate any headers that are a risk to capture - name if OBFUSCATE_HEADERS.contains(&name.to_lowercase().as_str()) => { - Some((name.to_owned(), "*".to_string())) - } + // NOTE: drop headers that are not required + name if DROP_HEADERS.contains(&name.to_lowercase().as_str()) => None, name => { // NOTE: Drop headers that can't be parsed as string value @@ -238,6 +237,7 @@ impl Drop for AuditLogBuilder { fn drop(&mut self) { let audit_json = json!({ "version": self.version as u8, + "parseableVersion": current().released_version.to_string(), "deploymentId" : self.deployment_id, "auditId" : self.audit_id, "timestamp" : Utc::now().to_rfc3339(), @@ -247,6 +247,6 @@ impl Drop for AuditLogBuilder { "request" : self.request, "response" : self.response, }); - info!(audit = %audit_json) + info!(audit = audit_json.to_string()) } } From 60843a6d027afd6b2e8fa39f7f7a510e1c1a9ab6 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 16:00:02 +0530 Subject: [PATCH 10/30] return start_time instead of elapsed --- src/audit.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index faea87626..d2b2a6752 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -5,12 +5,11 @@ use crate::handlers::http::modal::utils::rbac_utils::get_metadata; use super::option::CONFIG; use actix_web::dev::ServiceRequest; -use chrono::Utc; +use chrono::{DateTime, Utc}; use reqwest::Client; use serde::Serialize; use serde_json::{json, Map, Value}; use tokio::runtime::Handle; -use tokio::time::Instant; use tracing::info; use tracing::{ error, @@ -161,7 +160,7 @@ pub struct AuditLogBuilder { version: AuditLogVersion, pub deployment_id: Ulid, audit_id: Ulid, - start_time: Instant, + start_time: DateTime, pub stream: String, pub actor: ActorLog, pub request: RequestLog, @@ -174,7 +173,7 @@ impl Default for AuditLogBuilder { version: AuditLogVersion::V1, deployment_id: Ulid::nil(), audit_id: Ulid::new(), - start_time: Instant::now(), + start_time: Utc::now(), stream: String::default(), actor: ActorLog::default(), request: RequestLog::default(), @@ -240,8 +239,8 @@ impl Drop for AuditLogBuilder { "parseableVersion": current().released_version.to_string(), "deploymentId" : self.deployment_id, "auditId" : self.audit_id, - "timestamp" : Utc::now().to_rfc3339(), - "timeElapsed" : self.start_time.elapsed(), + "startTime" : self.start_time.to_rfc3339(), + "endTime" : Utc::now().to_rfc3339(), "stream" : self.stream, "actor" : self.actor, "request" : self.request, From 46a4f4ac532fda3217b0bcf8d6a80d134fc274ad Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 16:06:39 +0530 Subject: [PATCH 11/30] drop x-p-stream header --- src/audit.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/audit.rs b/src/audit.rs index d2b2a6752..efd0750da 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -154,7 +154,7 @@ impl Default for ResponseLog { } } -const DROP_HEADERS: [&str; 3] = ["authorization", "cookie", "user-agent"]; +const DROP_HEADERS: [&str; 4] = ["authorization", "cookie", "user-agent", "x-p-stream"]; pub struct AuditLogBuilder { version: AuditLogVersion, From 306b4be4de1cfab13b36842bcc1d0cee3b23699e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 16:09:10 +0530 Subject: [PATCH 12/30] rm request host --- src/audit.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index efd0750da..6266e6ee7 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -132,7 +132,6 @@ pub struct ActorLog { pub struct RequestLog { pub method: String, pub path: String, - pub host: String, pub protocol: String, pub headers: HashMap, } @@ -201,7 +200,6 @@ impl AuditLogBuilder { self.request = RequestLog { method: req.method().to_string(), path: req.path().to_string(), - host: req.uri().host().unwrap_or("").to_owned(), protocol: conn.scheme().to_owned(), headers: req .headers() From a8893326517cf387a0aef8d4baaa80a5b7c2cd05 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 17:07:58 +0530 Subject: [PATCH 13/30] fix: get auth details --- src/audit.rs | 32 +++++++++++++++++++++++++++++--- src/handlers/http/middleware.rs | 2 +- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 6266e6ee7..5e13bebcc 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -2,9 +2,12 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; use crate::about::current; use crate::handlers::http::modal::utils::rbac_utils::get_metadata; +use crate::rbac::map::SessionKey; +use crate::rbac::Users; use super::option::CONFIG; use actix_web::dev::ServiceRequest; +use actix_web_httpauth::extractors::basic::BasicAuth; use chrono::{DateTime, Utc}; use reqwest::Client; use serde::Serialize; @@ -194,9 +197,10 @@ impl AuditLogBuilder { self.stream = stream; } - pub fn update_from_http(&mut self, req: &ServiceRequest) { - let conn = req.connection_info(); + pub fn update_from_http(&mut self, req: &mut ServiceRequest) { + let (username, authorization_method) = get_auth_details(req); + let conn = req.connection_info(); self.request = RequestLog { method: req.method().to_string(), path: req.path().to_string(), @@ -225,11 +229,33 @@ impl AuditLogBuilder { .and_then(|a| a.to_str().ok()) .unwrap_or_default() .to_owned(), - ..Default::default() + username, + authorization_method, } } } +fn get_auth_details(req: &mut ServiceRequest) -> (String, String) { + let mut username = "Unknown".to_owned(); + let mut auth_method = "None".to_owned(); + + if let Ok(creds) = req.extract::().into_inner() { + return (creds.user_id().trim().to_owned(), "Basic Auth".to_owned()); + } + + if let Some(cookie) = req.cookie("session") { + auth_method = "Session Cookie".to_owned(); + if let Some(user_id) = Ulid::from_string(cookie.value()) + .ok() + .and_then(|ulid| Users.get_username_from_session(&SessionKey::SessionId(ulid))) + { + username = user_id; + } + } + + (username, auth_method) +} + impl Drop for AuditLogBuilder { fn drop(&mut self) { let audit_json = json!({ diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 3109f58d1..177e9ca19 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -177,7 +177,7 @@ where // Ensures that log will be pushed to subscriber on drop let mut log_builder = AuditLogBuilder::default(); log_builder.set_stream_name(stream_name.unwrap_or_default()); - log_builder.update_from_http(&req); + log_builder.update_from_http(&mut req); let fut = self.service.call(req); Box::pin(async move { log_builder.set_deployment_id().await; From 0819601d5c90590678085cea3888f566fbfb4466 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 17:15:00 +0530 Subject: [PATCH 14/30] add error info to log --- src/handlers/http/middleware.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 177e9ca19..3f417cfba 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -195,7 +195,14 @@ where // Capture status_code and error information from response match &res { - Ok(res) => log_builder.response.status_code = res.status().as_u16(), + Ok(res) => { + let status = res.status(); + log_builder.response.status_code = status.as_u16(); + // Use error information from reponse object if an error + if let Some(err) = res.response().error() { + log_builder.set_response_error(err.to_string()); + } + } Err(err) => log_builder.set_response_error(err.to_string()), } From 61cbf25529346cf0fbc58819e4ee3b96382656ec Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 17:31:45 +0530 Subject: [PATCH 15/30] refactor --- src/audit.rs | 43 +++++++++++++++------------------ src/handlers/http/middleware.rs | 23 ++++++------------ 2 files changed, 26 insertions(+), 40 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 5e13bebcc..35610d124 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -160,10 +160,10 @@ const DROP_HEADERS: [&str; 4] = ["authorization", "cookie", "user-agent", "x-p-s pub struct AuditLogBuilder { version: AuditLogVersion, - pub deployment_id: Ulid, + deployment_id: Ulid, audit_id: Ulid, start_time: DateTime, - pub stream: String, + stream: String, pub actor: ActorLog, pub request: RequestLog, pub response: ResponseLog, @@ -198,7 +198,23 @@ impl AuditLogBuilder { } pub fn update_from_http(&mut self, req: &mut ServiceRequest) { - let (username, authorization_method) = get_auth_details(req); + let mut username = "Unknown".to_owned(); + let mut authorization_method = "None".to_owned(); + + // Extract authorization details from request, either from basic auth + // header or cookie, else use default value. + if let Ok(creds) = req.extract::().into_inner() { + username = creds.user_id().trim().to_owned(); + authorization_method = "Basic Auth".to_owned(); + } else if let Some(cookie) = req.cookie("session") { + authorization_method = "Session Cookie".to_owned(); + if let Some(user_id) = Ulid::from_string(cookie.value()) + .ok() + .and_then(|ulid| Users.get_username_from_session(&SessionKey::SessionId(ulid))) + { + username = user_id; + } + } let conn = req.connection_info(); self.request = RequestLog { @@ -235,27 +251,6 @@ impl AuditLogBuilder { } } -fn get_auth_details(req: &mut ServiceRequest) -> (String, String) { - let mut username = "Unknown".to_owned(); - let mut auth_method = "None".to_owned(); - - if let Ok(creds) = req.extract::().into_inner() { - return (creds.user_id().trim().to_owned(), "Basic Auth".to_owned()); - } - - if let Some(cookie) = req.cookie("session") { - auth_method = "Session Cookie".to_owned(); - if let Some(user_id) = Ulid::from_string(cookie.value()) - .ok() - .and_then(|ulid| Users.get_username_from_session(&SessionKey::SessionId(ulid))) - { - username = user_id; - } - } - - (username, auth_method) -} - impl Drop for AuditLogBuilder { fn drop(&mut self) { let audit_json = json!({ diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 3f417cfba..3c0b92a87 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -139,11 +139,8 @@ where ## Section start */ let mut stream_name = None; - if let Some((_, kinesis_common_attributes)) = req - .request() - .headers() - .iter() - .find(|&(key, _)| key == KINESIS_COMMON_ATTRIBUTES_KEY) + if let Some(kinesis_common_attributes) = + req.request().headers().get(KINESIS_COMMON_ATTRIBUTES_KEY) { let attribute_value: &str = kinesis_common_attributes.to_str().unwrap(); let message: Message = serde_json::from_str(attribute_value).unwrap(); @@ -159,14 +156,12 @@ where HeaderName::from_static(LOG_SOURCE_KEY), header::HeaderValue::from_static(LOG_SOURCE_KINESIS), ); - stream_name = Some(message.common_attributes.x_p_stream); - } - - if let Some(stream) = req.match_info().get("logstream") { - stream_name = Some(stream.to_owned()); + stream_name.replace(message.common_attributes.x_p_stream); + } else if let Some(stream) = req.match_info().get("logstream") { + stream_name.replace(stream.to_owned()); } else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) { if let Ok(stream) = value.to_str() { - stream_name = Some(stream.to_owned()) + stream_name.replace(stream.to_owned()); } } @@ -223,11 +218,7 @@ pub fn auth_stream_context( let creds = extract_session_key(req); let mut stream = req.match_info().get("logstream"); if stream.is_none() { - if let Some((_, stream_name)) = req - .headers() - .iter() - .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) - { + if let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) { stream = Some(stream_name.to_str().unwrap()); } } From ea75bfae741bb6fa6eaf1d9c79b467f9d2d35a12 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 19:18:39 +0530 Subject: [PATCH 16/30] refactor as own middleware --- src/audit.rs | 59 ----------------- src/handlers/http/audit.rs | 109 ++++++++++++++++++++++++++++++++ src/handlers/http/middleware.rs | 39 ++---------- src/handlers/http/mod.rs | 1 + src/handlers/http/modal/mod.rs | 2 + 5 files changed, 117 insertions(+), 93 deletions(-) create mode 100644 src/handlers/http/audit.rs diff --git a/src/audit.rs b/src/audit.rs index 35610d124..cd5ff9a37 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -2,12 +2,8 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; use crate::about::current; use crate::handlers::http::modal::utils::rbac_utils::get_metadata; -use crate::rbac::map::SessionKey; -use crate::rbac::Users; use super::option::CONFIG; -use actix_web::dev::ServiceRequest; -use actix_web_httpauth::extractors::basic::BasicAuth; use chrono::{DateTime, Utc}; use reqwest::Client; use serde::Serialize; @@ -156,8 +152,6 @@ impl Default for ResponseLog { } } -const DROP_HEADERS: [&str; 4] = ["authorization", "cookie", "user-agent", "x-p-stream"]; - pub struct AuditLogBuilder { version: AuditLogVersion, deployment_id: Ulid, @@ -196,59 +190,6 @@ impl AuditLogBuilder { pub fn set_stream_name(&mut self, stream: String) { self.stream = stream; } - - pub fn update_from_http(&mut self, req: &mut ServiceRequest) { - let mut username = "Unknown".to_owned(); - let mut authorization_method = "None".to_owned(); - - // Extract authorization details from request, either from basic auth - // header or cookie, else use default value. - if let Ok(creds) = req.extract::().into_inner() { - username = creds.user_id().trim().to_owned(); - authorization_method = "Basic Auth".to_owned(); - } else if let Some(cookie) = req.cookie("session") { - authorization_method = "Session Cookie".to_owned(); - if let Some(user_id) = Ulid::from_string(cookie.value()) - .ok() - .and_then(|ulid| Users.get_username_from_session(&SessionKey::SessionId(ulid))) - { - username = user_id; - } - } - - let conn = req.connection_info(); - self.request = RequestLog { - method: req.method().to_string(), - path: req.path().to_string(), - protocol: conn.scheme().to_owned(), - headers: req - .headers() - .iter() - .filter_map(|(name, value)| match name.as_str() { - // NOTE: drop headers that are not required - name if DROP_HEADERS.contains(&name.to_lowercase().as_str()) => None, - name => { - // NOTE: Drop headers that can't be parsed as string - value - .to_str() - .map(|value| (name.to_owned(), value.to_string())) - .ok() - } - }) - .collect(), - }; - self.actor = ActorLog { - remote_host: conn.realip_remote_addr().unwrap_or_default().to_owned(), - user_agent: req - .headers() - .get("User-Agent") - .and_then(|a| a.to_str().ok()) - .unwrap_or_default() - .to_owned(), - username, - authorization_method, - } - } } impl Drop for AuditLogBuilder { diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs new file mode 100644 index 000000000..792e5f968 --- /dev/null +++ b/src/handlers/http/audit.rs @@ -0,0 +1,109 @@ +use super::middleware::Message; +use actix_web::{ + body::MessageBody, + dev::{ServiceRequest, ServiceResponse}, + middleware::Next, +}; +use actix_web_httpauth::extractors::basic::BasicAuth; +use ulid::Ulid; + +use crate::{ + audit::{ActorLog, AuditLogBuilder, RequestLog}, + handlers::{KINESIS_COMMON_ATTRIBUTES_KEY, STREAM_NAME_HEADER_KEY}, + rbac::{map::SessionKey, Users}, +}; + +const DROP_HEADERS: [&str; 4] = ["authorization", "cookie", "user-agent", "x-p-stream"]; + +pub async fn audit_log_middleware( + mut req: ServiceRequest, + next: Next, +) -> Result, actix_web::Error> { + // Ensures that log will be pushed to subscriber on drop + let mut log_builder = AuditLogBuilder::default(); + + if let Some(kinesis_common_attributes) = + req.request().headers().get(KINESIS_COMMON_ATTRIBUTES_KEY) + { + let attribute_value: &str = kinesis_common_attributes.to_str().unwrap(); + let message: Message = serde_json::from_str(attribute_value).unwrap(); + log_builder.set_stream_name(message.common_attributes.x_p_stream); + } else if let Some(stream) = req.match_info().get("logstream") { + log_builder.set_stream_name(stream.to_owned()); + } else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) { + if let Ok(stream) = value.to_str() { + log_builder.set_stream_name(stream.to_owned()); + } + } + let mut username = "Unknown".to_owned(); + let mut authorization_method = "None".to_owned(); + + // Extract authorization details from request, either from basic auth + // header or cookie, else use default value. + if let Ok(creds) = req.extract::().into_inner() { + username = creds.user_id().trim().to_owned(); + authorization_method = "Basic Auth".to_owned(); + } else if let Some(cookie) = req.cookie("session") { + authorization_method = "Session Cookie".to_owned(); + if let Some(user_id) = Ulid::from_string(cookie.value()) + .ok() + .and_then(|ulid| Users.get_username_from_session(&SessionKey::SessionId(ulid))) + { + username = user_id; + } + } + + log_builder.request = RequestLog { + method: req.method().to_string(), + path: req.path().to_string(), + protocol: req.connection_info().scheme().to_owned(), + headers: req + .headers() + .iter() + .filter_map(|(name, value)| match name.as_str() { + // NOTE: drop headers that are not required + name if DROP_HEADERS.contains(&name.to_lowercase().as_str()) => None, + name => { + // NOTE: Drop headers that can't be parsed as string + value + .to_str() + .map(|value| (name.to_owned(), value.to_string())) + .ok() + } + }) + .collect(), + }; + log_builder.actor = ActorLog { + remote_host: req + .connection_info() + .realip_remote_addr() + .unwrap_or_default() + .to_owned(), + user_agent: req + .headers() + .get("User-Agent") + .and_then(|a| a.to_str().ok()) + .unwrap_or_default() + .to_owned(), + username, + authorization_method, + }; + log_builder.set_deployment_id().await; + + let res = next.call(req).await; + + // Capture status_code and error information from response + match &res { + Ok(res) => { + let status = res.status(); + log_builder.response.status_code = status.as_u16(); + // Use error information from reponse object if an error + if let Some(err) = res.response().error() { + log_builder.set_response_error(err.to_string()); + } + } + Err(err) => log_builder.set_response_error(err.to_string()), + } + + res +} diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 3c0b92a87..d02513637 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -28,7 +28,6 @@ use actix_web::{ use futures_util::future::LocalBoxFuture; use crate::{ - audit::AuditLogBuilder, handlers::{ AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, STREAM_NAME_HEADER_KEY, @@ -46,16 +45,16 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -struct Message { - common_attributes: CommonAttributes, +pub struct Message { + pub common_attributes: CommonAttributes, } #[derive(Serialize, Deserialize, Debug)] -struct CommonAttributes { +pub struct CommonAttributes { #[serde(rename = "Authorization")] authorization: String, #[serde(rename = "X-P-Stream")] - x_p_stream: String, + pub x_p_stream: String, } pub trait RouteExt { @@ -138,7 +137,6 @@ where For requests made from other clients, no change. ## Section start */ - let mut stream_name = None; if let Some(kinesis_common_attributes) = req.request().headers().get(KINESIS_COMMON_ATTRIBUTES_KEY) { @@ -156,27 +154,14 @@ where HeaderName::from_static(LOG_SOURCE_KEY), header::HeaderValue::from_static(LOG_SOURCE_KINESIS), ); - stream_name.replace(message.common_attributes.x_p_stream); - } else if let Some(stream) = req.match_info().get("logstream") { - stream_name.replace(stream.to_owned()); - } else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) { - if let Ok(stream) = value.to_str() { - stream_name.replace(stream.to_owned()); - } } /* ## Section end */ let auth_result: Result<_, Error> = (self.auth_method)(&mut req, self.action); - // Ensures that log will be pushed to subscriber on drop - let mut log_builder = AuditLogBuilder::default(); - log_builder.set_stream_name(stream_name.unwrap_or_default()); - log_builder.update_from_http(&mut req); let fut = self.service.call(req); Box::pin(async move { - log_builder.set_deployment_id().await; - match auth_result? { rbac::Response::UnAuthorized => return Err( ErrorForbidden("You don't have permission to access this resource. Please contact your administrator for assistance.") @@ -186,22 +171,8 @@ where ), _ => {} } - let res = fut.await; - - // Capture status_code and error information from response - match &res { - Ok(res) => { - let status = res.status(); - log_builder.response.status_code = status.as_u16(); - // Use error information from reponse object if an error - if let Some(err) = res.response().error() { - log_builder.set_response_error(err.to_string()); - } - } - Err(err) => log_builder.set_response_error(err.to_string()), - } - res + fut.await }) } } diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 8fabc8f1f..99ab350c1 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -28,6 +28,7 @@ use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY}; use self::{cluster::get_ingestor_info, query::Query}; pub mod about; +mod audit; pub mod cluster; pub mod correlation; pub mod health_check; diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 5080f2038..17d2cbb4b 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -41,6 +41,7 @@ use ssl_acceptor::get_ssl_acceptor; use tokio::sync::{oneshot, Mutex}; use tracing::{error, info, warn}; +use super::audit; use super::cross_origin_config; use super::API_BASE_PATH; use super::API_VERSION; @@ -101,6 +102,7 @@ pub trait ParseableServer { .wrap(prometheus.clone()) .configure(|config| Self::configure_routes(config, oidc_client.clone())) .wrap(from_fn(health_check::check_shutdown_middleware)) + .wrap(from_fn(audit::audit_log_middleware)) .wrap(actix_web::middleware::Logger::default()) .wrap(actix_web::middleware::Compress::default()) .wrap(cross_origin_config()) From 200624a9a9f343a4a1ae5f9e571765d5910bb2a2 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 21:56:49 +0530 Subject: [PATCH 17/30] refactor: audit logging without tracing complexity --- src/audit.rs | 260 ++++++++++++++++++++++--------------- src/handlers/http/audit.rs | 83 ++++++------ src/kafka.rs | 31 +++-- src/main.rs | 11 +- 4 files changed, 217 insertions(+), 168 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index cd5ff9a37..47704be09 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -1,119 +1,104 @@ -use std::{collections::HashMap, fmt::Debug, sync::Arc}; +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{ + collections::HashMap, + fmt::{Debug, Display}, + sync::Arc, +}; use crate::about::current; use crate::handlers::http::modal::utils::rbac_utils::get_metadata; use super::option::CONFIG; use chrono::{DateTime, Utc}; +use once_cell::sync::Lazy; use reqwest::Client; use serde::Serialize; -use serde_json::{json, Map, Value}; -use tokio::runtime::Handle; -use tracing::info; -use tracing::{ - error, - field::{Field, Visit}, - Event, Metadata, Subscriber, -}; -use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; +use serde_json::{json, Value}; +use tracing::error; + use ulid::Ulid; use url::Url; -pub struct AuditLayer { +static AUDIT_LOGGER: Lazy> = Lazy::new(AuditLogger::new); + +pub struct AuditLogger { client: Arc, log_endpoint: Url, username: Option, password: Option, - runtime_handle: Handle, } -impl AuditLayer { - /// Create an audit layer that works with the tracing system to capture - /// and push audit logs to the appropriate logger over HTTP - pub fn new(runtime_handle: Handle) -> Option { - let audit_logger = CONFIG.parseable.audit_logger.as_ref()?; - let client = Arc::new(reqwest::Client::new()); - let log_endpoint = match audit_logger.join("/api/v1/ingest") { +impl AuditLogger { + /// Create an audit logger that can be used to capture + /// and push audit logs to the appropriate logging system over HTTP + pub fn new() -> Option { + let log_endpoint = match CONFIG + .parseable + .audit_logger + .as_ref()? + .join("/api/v1/ingest") + { Ok(url) => url, Err(err) => { - error!("Couldn't setup audit logger: {err}"); + eprintln!("Couldn't setup audit logger: {err}"); return None; } }; + let client = Arc::new(reqwest::Client::new()); + let username = CONFIG.parseable.audit_username.clone(); let password = CONFIG.parseable.audit_password.clone(); - Some(Self { + Some(AuditLogger { client, log_endpoint, username, password, - runtime_handle, }) } -} - -impl Layer for AuditLayer -where - S: Subscriber + for<'a> LookupSpan<'a>, -{ - fn enabled(&self, _: &Metadata<'_>, _: Context<'_, S>) -> bool { - true // log everything if it is auditable - } - - fn on_event(&self, event: &Event<'_>, _: Context<'_, S>) { - let mut visitor = AuditVisitor::default(); - event.record(&mut visitor); - - // if the log line contains `audit` string with serialized json object, construct an HTTP request and push to configured audit endpoint - // NOTE: We only support the ingest API of parseable for audit logging parseable - if visitor.audit { - let mut req = self - .client - .post(self.log_endpoint.as_str()) - .json(&visitor.json) - .header("x-p-stream", "audit_log"); - if let Some(username) = self.username.as_ref() { - req = req.basic_auth(username, self.password.as_ref()) - } - self.runtime_handle.spawn(async move { - match req.send().await { - Ok(r) => { - if let Err(e) = r.error_for_status() { - println!("{e}") - } - } - Err(e) => eprintln!("Failed to send audit event: {}", e), - } - }); + async fn send_log(&self, json: Value) { + let mut req = self + .client + .post(self.log_endpoint.as_str()) + .json(&json) + .header("x-p-stream", "audit_log"); + if let Some(username) = self.username.as_ref() { + req = req.basic_auth(username, self.password.as_ref()) } - } -} -#[derive(Debug, Default)] -struct AuditVisitor { - json: Map, - audit: bool, -} - -impl Visit for AuditVisitor { - fn record_str(&mut self, field: &Field, value: &str) { - if field.name() == "audit" { - if let Ok(Value::Object(json)) = serde_json::from_str(value) { - self.audit = true; - self.json = json; + match req.send().await { + Ok(r) => { + if let Err(e) = r.error_for_status() { + error!("{e}") + } } + Err(e) => error!("Failed to send audit event: {}", e), } } - - fn record_debug(&mut self, _: &Field, _: &dyn Debug) {} } #[non_exhaustive] #[repr(u8)] -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Serialize)] pub enum AuditLogVersion { V1 = 1, } @@ -152,60 +137,119 @@ impl Default for ResponseLog { } } -pub struct AuditLogBuilder { - version: AuditLogVersion, - deployment_id: Ulid, - audit_id: Ulid, - start_time: DateTime, - stream: String, +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AuditLog { + pub version: AuditLogVersion, + pub parseable_version: String, + pub deployment_id: Ulid, + pub audit_id: Ulid, + pub start_time: DateTime, + pub end_time: DateTime, + pub stream: String, pub actor: ActorLog, pub request: RequestLog, pub response: ResponseLog, } +pub struct AuditLogBuilder { + start_time: DateTime, + stream: String, + pub actor: Option, + pub request: Option, + pub response: Option, +} + impl Default for AuditLogBuilder { fn default() -> Self { AuditLogBuilder { - version: AuditLogVersion::V1, - deployment_id: Ulid::nil(), - audit_id: Ulid::new(), start_time: Utc::now(), stream: String::default(), - actor: ActorLog::default(), - request: RequestLog::default(), - response: ResponseLog::default(), + actor: None, + request: None, + response: None, } } } impl AuditLogBuilder { - pub async fn set_deployment_id(&mut self) { - self.deployment_id = get_metadata().await.unwrap().deployment_id; + pub fn set_stream_name(&mut self, stream: impl Into) { + if AUDIT_LOGGER.is_none() { + return; + } + self.stream = stream.into(); } - pub fn set_response_error(&mut self, err: String) { - self.response.error = Some(err); + pub fn set_actor( + &mut self, + host: impl Into, + username: impl Into, + user_agent: impl Into, + auth_method: impl Into, + ) { + if AUDIT_LOGGER.is_none() { + return; + } + self.actor = Some(ActorLog { + remote_host: host.into(), + user_agent: user_agent.into(), + username: username.into(), + authorization_method: auth_method.into(), + }); } - pub fn set_stream_name(&mut self, stream: String) { - self.stream = stream; + pub fn set_request( + &mut self, + method: impl Into, + path: impl Into, + protocol: impl Into, + headers: impl IntoIterator, + ) { + if AUDIT_LOGGER.is_none() { + return; + } + self.request = Some(RequestLog { + method: method.into(), + path: path.into(), + protocol: protocol.into(), + headers: headers.into_iter().collect(), + }); } -} -impl Drop for AuditLogBuilder { - fn drop(&mut self) { - let audit_json = json!({ - "version": self.version as u8, - "parseableVersion": current().released_version.to_string(), - "deploymentId" : self.deployment_id, - "auditId" : self.audit_id, - "startTime" : self.start_time.to_rfc3339(), - "endTime" : Utc::now().to_rfc3339(), - "stream" : self.stream, - "actor" : self.actor, - "request" : self.request, - "response" : self.response, - }); - info!(audit = audit_json.to_string()) + pub fn set_response(&mut self, status_code: u16, err: impl Display) { + if AUDIT_LOGGER.is_none() { + return; + } + let error = err.to_string(); + let error = error.is_empty().then(|| error); + self.response = Some(ResponseLog { status_code, error }); + } + + // NOTE: Ensure that the logger has been constructed by Default + pub async fn send(self) { + let AuditLogBuilder { + start_time, + stream, + actor, + request, + response, + } = self; + let Some(logger) = AUDIT_LOGGER.as_ref() else { + return; + }; + let audit_log = AuditLog { + version: AuditLogVersion::V1, + parseable_version: current().released_version.to_string(), + deployment_id: get_metadata().await.unwrap().deployment_id, + audit_id: Ulid::new(), + start_time, + end_time: Utc::now(), + stream, + actor: actor.unwrap_or_default(), + request: request.unwrap_or_default(), + response: response.unwrap_or_default(), + }; + + logger.send_log(json!(audit_log)).await } } diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs index 792e5f968..bb683e121 100644 --- a/src/handlers/http/audit.rs +++ b/src/handlers/http/audit.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use super::middleware::Message; use actix_web::{ body::MessageBody, @@ -8,7 +26,7 @@ use actix_web_httpauth::extractors::basic::BasicAuth; use ulid::Ulid; use crate::{ - audit::{ActorLog, AuditLogBuilder, RequestLog}, + audit::AuditLogBuilder, handlers::{KINESIS_COMMON_ATTRIBUTES_KEY, STREAM_NAME_HEADER_KEY}, rbac::{map::SessionKey, Users}, }; @@ -19,7 +37,6 @@ pub async fn audit_log_middleware( mut req: ServiceRequest, next: Next, ) -> Result, actix_web::Error> { - // Ensures that log will be pushed to subscriber on drop let mut log_builder = AuditLogBuilder::default(); if let Some(kinesis_common_attributes) = @@ -53,42 +70,33 @@ pub async fn audit_log_middleware( } } - log_builder.request = RequestLog { - method: req.method().to_string(), - path: req.path().to_string(), - protocol: req.connection_info().scheme().to_owned(), - headers: req - .headers() - .iter() - .filter_map(|(name, value)| match name.as_str() { - // NOTE: drop headers that are not required - name if DROP_HEADERS.contains(&name.to_lowercase().as_str()) => None, - name => { - // NOTE: Drop headers that can't be parsed as string - value - .to_str() - .map(|value| (name.to_owned(), value.to_string())) - .ok() - } - }) - .collect(), - }; - log_builder.actor = ActorLog { - remote_host: req - .connection_info() - .realip_remote_addr() - .unwrap_or_default() - .to_owned(), - user_agent: req - .headers() + let conn = req.connection_info(); + let headers = req + .headers() + .iter() + .filter_map(|(name, value)| match name.as_str() { + // NOTE: drop headers that are not required + name if DROP_HEADERS.contains(&name.to_lowercase().as_str()) => None, + name => { + // NOTE: Drop headers that can't be parsed as string + value + .to_str() + .map(|value| (name.to_owned(), value.to_string())) + .ok() + } + }); + log_builder.set_request(req.method().as_str(), req.path(), conn.scheme(), headers); + + log_builder.set_actor( + conn.realip_remote_addr().unwrap_or_default(), + req.headers() .get("User-Agent") .and_then(|a| a.to_str().ok()) - .unwrap_or_default() - .to_owned(), + .unwrap_or_default(), username, authorization_method, - }; - log_builder.set_deployment_id().await; + ); + drop(conn); let res = next.call(req).await; @@ -96,14 +104,15 @@ pub async fn audit_log_middleware( match &res { Ok(res) => { let status = res.status(); - log_builder.response.status_code = status.as_u16(); // Use error information from reponse object if an error if let Some(err) = res.response().error() { - log_builder.set_response_error(err.to_string()); + log_builder.set_response(status.as_u16(), err); } } - Err(err) => log_builder.set_response_error(err.to_string()), + Err(err) => log_builder.set_response(500, err), } + log_builder.send().await; + res } diff --git a/src/kafka.rs b/src/kafka.rs index 3bff16477..32bf24bb0 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -277,22 +277,27 @@ pub async fn setup_integration() { while let Ok(curr) = stream.next().await.unwrap() { // Constructs a log for each kafka request let mut log_builder = AuditLogBuilder::default(); - log_builder.set_deployment_id().await; - log_builder.actor.user_agent = "KAFKA_CLIENT".to_owned(); - log_builder.actor.remote_host = CONFIG - .parseable - .kafka_host - .as_ref() - .cloned() - .unwrap_or_default(); - log_builder.request.protocol = "Kafka".to_owned(); - log_builder.set_stream_name(curr.topic().to_owned()); + log_builder.set_actor( + CONFIG + .parseable + .kafka_host + .as_ref() + .map(|s| s.as_str()) + .unwrap_or(""), + "Kafka Client", + "", + "", + ); + + log_builder.set_request("", "", "Kafka", []); + log_builder.set_stream_name(curr.topic()); let Err(err) = ingest_message(curr).await else { - log_builder.response.status_code = 200; + log_builder.set_response(200, ""); continue; }; - log_builder.set_response_error(err.to_string()); - error!("Unable to ingest incoming kafka message- {err}") + log_builder.set_response(500, &err); + error!("Unable to ingest incoming kafka message- {err}"); + log_builder.send().await; } } diff --git a/src/main.rs b/src/main.rs index a89377886..8cc1ca19a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,12 +17,10 @@ */ use parseable::{ - audit::AuditLayer, banner, kafka, option::{Mode, CONFIG}, rbac, storage, IngestServer, ParseableServer, QueryServer, Server, }; -use tokio::runtime::Handle; use tracing_subscriber::{ layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry, }; @@ -33,14 +31,7 @@ async fn main() -> anyhow::Result<()> { .compact() .with_filter(EnvFilter::from_default_env()); let subscriber = Registry::default().with(stdout_layer); - - // Use audit logging endpoint if set, else only log to STDOUT - if let Some(audit_layer) = AuditLayer::new(Handle::current()) { - // All logs go through audit_layer before reaching the stdout_layer - subscriber.with(audit_layer).init(); - } else { - subscriber.init(); - }; + subscriber.init(); // these are empty ptrs so mem footprint should be minimal let server: Box = match CONFIG.parseable.mode { From 8419b644db8f1ee5bffbbaaebc48a7866101141f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 3 Jan 2025 22:11:35 +0530 Subject: [PATCH 18/30] ci: clippy suggestions --- src/audit.rs | 2 +- src/handlers/http/audit.rs | 13 +++++++++---- src/kafka.rs | 7 +------ 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 47704be09..b9c32051a 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -221,7 +221,7 @@ impl AuditLogBuilder { return; } let error = err.to_string(); - let error = error.is_empty().then(|| error); + let error = error.is_empty().then_some(error); self.response = Some(ResponseLog { status_code, error }); } diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs index bb683e121..50056896e 100644 --- a/src/handlers/http/audit.rs +++ b/src/handlers/http/audit.rs @@ -70,7 +70,6 @@ pub async fn audit_log_middleware( } } - let conn = req.connection_info(); let headers = req .headers() .iter() @@ -85,10 +84,17 @@ pub async fn audit_log_middleware( .ok() } }); - log_builder.set_request(req.method().as_str(), req.path(), conn.scheme(), headers); + log_builder.set_request( + req.method().as_str(), + req.path(), + req.connection_info().scheme(), + headers, + ); log_builder.set_actor( - conn.realip_remote_addr().unwrap_or_default(), + req.connection_info() + .realip_remote_addr() + .unwrap_or_default(), req.headers() .get("User-Agent") .and_then(|a| a.to_str().ok()) @@ -96,7 +102,6 @@ pub async fn audit_log_middleware( username, authorization_method, ); - drop(conn); let res = next.call(req).await; diff --git a/src/kafka.rs b/src/kafka.rs index 32bf24bb0..cf84a27d4 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -278,12 +278,7 @@ pub async fn setup_integration() { // Constructs a log for each kafka request let mut log_builder = AuditLogBuilder::default(); log_builder.set_actor( - CONFIG - .parseable - .kafka_host - .as_ref() - .map(|s| s.as_str()) - .unwrap_or(""), + CONFIG.parseable.kafka_host.as_deref().unwrap_or(""), "Kafka Client", "", "", From 4b6806a062cb472c38d971028ddaf3b53f3437e9 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 4 Jan 2025 09:10:52 +0530 Subject: [PATCH 19/30] refactor: revert changes in main --- src/main.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index 8cc1ca19a..d1663d539 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,17 +21,14 @@ use parseable::{ option::{Mode, CONFIG}, rbac, storage, IngestServer, ParseableServer, QueryServer, Server, }; -use tracing_subscriber::{ - layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry, -}; +use tracing_subscriber::EnvFilter; #[actix_web::main] async fn main() -> anyhow::Result<()> { - let stdout_layer = tracing_subscriber::fmt::layer() + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) .compact() - .with_filter(EnvFilter::from_default_env()); - let subscriber = Registry::default().with(stdout_layer); - subscriber.init(); + .init(); // these are empty ptrs so mem footprint should be minimal let server: Box = match CONFIG.parseable.mode { From bcf84a2407b7850079341debaf489a9a0c7b5b46 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 4 Jan 2025 16:36:57 +0530 Subject: [PATCH 20/30] refactor: builder pattern --- src/audit.rs | 28 ++++++++++++++--------- src/handlers/http/audit.rs | 46 +++++++++++++++++++------------------- src/kafka.rs | 24 +++++++++----------- 3 files changed, 52 insertions(+), 46 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index b9c32051a..504a7d79d 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -173,22 +173,24 @@ impl Default for AuditLogBuilder { } impl AuditLogBuilder { - pub fn set_stream_name(&mut self, stream: impl Into) { + pub fn set_stream_name(mut self, stream: impl Into) -> Self { if AUDIT_LOGGER.is_none() { - return; + return self; } self.stream = stream.into(); + + self } pub fn set_actor( - &mut self, + mut self, host: impl Into, username: impl Into, user_agent: impl Into, auth_method: impl Into, - ) { + ) -> Self { if AUDIT_LOGGER.is_none() { - return; + return self; } self.actor = Some(ActorLog { remote_host: host.into(), @@ -196,17 +198,19 @@ impl AuditLogBuilder { username: username.into(), authorization_method: auth_method.into(), }); + + self } pub fn set_request( - &mut self, + mut self, method: impl Into, path: impl Into, protocol: impl Into, headers: impl IntoIterator, - ) { + ) -> Self { if AUDIT_LOGGER.is_none() { - return; + return self; } self.request = Some(RequestLog { method: method.into(), @@ -214,15 +218,19 @@ impl AuditLogBuilder { protocol: protocol.into(), headers: headers.into_iter().collect(), }); + + self } - pub fn set_response(&mut self, status_code: u16, err: impl Display) { + pub fn set_response(mut self, status_code: u16, err: impl Display) -> Self { if AUDIT_LOGGER.is_none() { - return; + return self; } let error = err.to_string(); let error = error.is_empty().then_some(error); self.response = Some(ResponseLog { status_code, error }); + + self } // NOTE: Ensure that the logger has been constructed by Default diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs index 50056896e..5be002a82 100644 --- a/src/handlers/http/audit.rs +++ b/src/handlers/http/audit.rs @@ -44,12 +44,12 @@ pub async fn audit_log_middleware( { let attribute_value: &str = kinesis_common_attributes.to_str().unwrap(); let message: Message = serde_json::from_str(attribute_value).unwrap(); - log_builder.set_stream_name(message.common_attributes.x_p_stream); + log_builder = log_builder.set_stream_name(message.common_attributes.x_p_stream); } else if let Some(stream) = req.match_info().get("logstream") { - log_builder.set_stream_name(stream.to_owned()); + log_builder = log_builder.set_stream_name(stream.to_owned()); } else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) { if let Ok(stream) = value.to_str() { - log_builder.set_stream_name(stream.to_owned()); + log_builder = log_builder.set_stream_name(stream.to_owned()); } } let mut username = "Unknown".to_owned(); @@ -84,24 +84,24 @@ pub async fn audit_log_middleware( .ok() } }); - log_builder.set_request( - req.method().as_str(), - req.path(), - req.connection_info().scheme(), - headers, - ); - - log_builder.set_actor( - req.connection_info() - .realip_remote_addr() - .unwrap_or_default(), - req.headers() - .get("User-Agent") - .and_then(|a| a.to_str().ok()) - .unwrap_or_default(), - username, - authorization_method, - ); + log_builder = log_builder + .set_request( + req.method().as_str(), + req.path(), + req.connection_info().scheme(), + headers, + ) + .set_actor( + req.connection_info() + .realip_remote_addr() + .unwrap_or_default(), + req.headers() + .get("User-Agent") + .and_then(|a| a.to_str().ok()) + .unwrap_or_default(), + username, + authorization_method, + ); let res = next.call(req).await; @@ -111,10 +111,10 @@ pub async fn audit_log_middleware( let status = res.status(); // Use error information from reponse object if an error if let Some(err) = res.response().error() { - log_builder.set_response(status.as_u16(), err); + log_builder = log_builder.set_response(status.as_u16(), err); } } - Err(err) => log_builder.set_response(500, err), + Err(err) => log_builder = log_builder.set_response(500, err), } log_builder.send().await; diff --git a/src/kafka.rs b/src/kafka.rs index cf84a27d4..b4b08bf55 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -276,23 +276,21 @@ pub async fn setup_integration() { while let Ok(curr) = stream.next().await.unwrap() { // Constructs a log for each kafka request - let mut log_builder = AuditLogBuilder::default(); - log_builder.set_actor( - CONFIG.parseable.kafka_host.as_deref().unwrap_or(""), - "Kafka Client", - "", - "", - ); - - log_builder.set_request("", "", "Kafka", []); - log_builder.set_stream_name(curr.topic()); + let log_builder = AuditLogBuilder::default() + .set_actor( + CONFIG.parseable.kafka_host.as_deref().unwrap_or(""), + "Kafka Client", + "", + "", + ) + .set_request("", "", "Kafka", []) + .set_stream_name(curr.topic()); let Err(err) = ingest_message(curr).await else { - log_builder.set_response(200, ""); + log_builder.set_response(200, "").send().await; continue; }; - log_builder.set_response(500, &err); error!("Unable to ingest incoming kafka message- {err}"); - log_builder.send().await; + log_builder.set_response(500, &err).send().await; } } From 3810c8e7b85a6d90c58cd6a05ed2c76d2f614f01 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 4 Jan 2025 21:23:48 +0530 Subject: [PATCH 21/30] fix: error message --- src/audit.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/audit.rs b/src/audit.rs index 504a7d79d..aaa2c8fa6 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -227,7 +227,7 @@ impl AuditLogBuilder { return self; } let error = err.to_string(); - let error = error.is_empty().then_some(error); + let error = (!error.is_empty()).then_some(error); self.response = Some(ResponseLog { status_code, error }); self From b37418308ed411961753d3a13723c9828029514a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 4 Jan 2025 21:29:51 +0530 Subject: [PATCH 22/30] feat: save cost of atomic access --- src/audit.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index aaa2c8fa6..de0729f9e 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -153,6 +153,8 @@ pub struct AuditLog { } pub struct AuditLogBuilder { + // Used to ensure that log is only constructed if the logger is enabled + enabled: bool, start_time: DateTime, stream: String, pub actor: Option, @@ -163,6 +165,7 @@ pub struct AuditLogBuilder { impl Default for AuditLogBuilder { fn default() -> Self { AuditLogBuilder { + enabled: AUDIT_LOGGER.is_some(), start_time: Utc::now(), stream: String::default(), actor: None, @@ -174,7 +177,7 @@ impl Default for AuditLogBuilder { impl AuditLogBuilder { pub fn set_stream_name(mut self, stream: impl Into) -> Self { - if AUDIT_LOGGER.is_none() { + if !self.enabled { return self; } self.stream = stream.into(); @@ -189,7 +192,7 @@ impl AuditLogBuilder { user_agent: impl Into, auth_method: impl Into, ) -> Self { - if AUDIT_LOGGER.is_none() { + if !self.enabled { return self; } self.actor = Some(ActorLog { @@ -209,7 +212,7 @@ impl AuditLogBuilder { protocol: impl Into, headers: impl IntoIterator, ) -> Self { - if AUDIT_LOGGER.is_none() { + if !self.enabled { return self; } self.request = Some(RequestLog { @@ -223,7 +226,7 @@ impl AuditLogBuilder { } pub fn set_response(mut self, status_code: u16, err: impl Display) -> Self { - if AUDIT_LOGGER.is_none() { + if !self.enabled { return self; } let error = err.to_string(); @@ -241,6 +244,7 @@ impl AuditLogBuilder { actor, request, response, + .. } = self; let Some(logger) = AUDIT_LOGGER.as_ref() else { return; From 382fada24a40abae16a570a07825598920ad272e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 4 Jan 2025 21:31:30 +0530 Subject: [PATCH 23/30] refactor: don't clone --- src/audit.rs | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index de0729f9e..80e898247 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -39,10 +39,8 @@ use url::Url; static AUDIT_LOGGER: Lazy> = Lazy::new(AuditLogger::new); pub struct AuditLogger { - client: Arc, + client: Client, log_endpoint: Url, - username: Option, - password: Option, } impl AuditLogger { @@ -62,16 +60,9 @@ impl AuditLogger { } }; - let client = Arc::new(reqwest::Client::new()); - - let username = CONFIG.parseable.audit_username.clone(); - let password = CONFIG.parseable.audit_password.clone(); - Some(AuditLogger { - client, + client: reqwest::Client::new(), log_endpoint, - username, - password, }) } @@ -81,8 +72,10 @@ impl AuditLogger { .post(self.log_endpoint.as_str()) .json(&json) .header("x-p-stream", "audit_log"); - if let Some(username) = self.username.as_ref() { - req = req.basic_auth(username, self.password.as_ref()) + + // Use basic auth if credentials are configured + if let Some(username) = CONFIG.parseable.audit_username.as_ref() { + req = req.basic_auth(username, CONFIG.parseable.audit_password.as_ref()) } match req.send().await { From 7d5f0118e6ca6620e8592da78135df555fc0038b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 4 Jan 2025 21:53:35 +0530 Subject: [PATCH 24/30] cleanup code --- src/audit.rs | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 80e898247..eefa11287 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -19,7 +19,6 @@ use std::{ collections::HashMap, fmt::{Debug, Display}, - sync::Arc, }; use crate::about::current; @@ -38,15 +37,19 @@ use url::Url; static AUDIT_LOGGER: Lazy> = Lazy::new(AuditLogger::new); +// AuditLogger handles sending audit logs to a remote logging system pub struct AuditLogger { client: Client, log_endpoint: Url, } impl AuditLogger { - /// Create an audit logger that can be used to capture - /// and push audit logs to the appropriate logging system over HTTP + /// Create an audit logger that can be used to capture and push + /// audit logs to the appropriate logging system over HTTP pub fn new() -> Option { + // Try to construct the log endpoint URL by joining the base URL + // with the ingest path, This can fail if the URL is not valid, + // when the base URL is not set or the ingest path is not valid let log_endpoint = match CONFIG .parseable .audit_logger @@ -66,6 +69,7 @@ impl AuditLogger { }) } + // Sends the audit log to the configured endpoint with proper authentication async fn send_log(&self, json: Value) { let mut req = self .client @@ -89,6 +93,7 @@ impl AuditLogger { } } +// Represents the version of the audit log format #[non_exhaustive] #[repr(u8)] #[derive(Debug, Clone, Copy, Serialize)] @@ -96,6 +101,7 @@ pub enum AuditLogVersion { V1 = 1, } +// Contains information about the actor (user) who performed the action #[derive(Serialize, Default)] #[serde(rename_all = "camelCase")] pub struct ActorLog { @@ -105,6 +111,7 @@ pub struct ActorLog { pub authorization_method: String, } +// Contains details about the HTTP request that was made #[derive(Serialize, Default)] pub struct RequestLog { pub method: String, @@ -113,23 +120,15 @@ pub struct RequestLog { pub headers: HashMap, } -#[derive(Serialize)] +/// Contains information about the response sent back to the client +#[derive(Default, Serialize)] #[serde(rename_all = "camelCase")] pub struct ResponseLog { pub status_code: u16, pub error: Option, } -impl Default for ResponseLog { - fn default() -> Self { - // Server failed to respond - ResponseLog { - status_code: 500, - error: None, - } - } -} - +/// The main audit log structure that combines all audit information #[derive(Serialize)] #[serde(rename_all = "camelCase")] pub struct AuditLog { @@ -145,6 +144,7 @@ pub struct AuditLog { pub response: ResponseLog, } +/// Builder pattern implementation for constructing audit logs pub struct AuditLogBuilder { // Used to ensure that log is only constructed if the logger is enabled enabled: bool, @@ -169,6 +169,7 @@ impl Default for AuditLogBuilder { } impl AuditLogBuilder { + /// Sets the stream name for the audit log if logger is set pub fn set_stream_name(mut self, stream: impl Into) -> Self { if !self.enabled { return self; @@ -178,6 +179,7 @@ impl AuditLogBuilder { self } + /// Sets the actor details for the audit log if logger is set pub fn set_actor( mut self, host: impl Into, @@ -198,6 +200,7 @@ impl AuditLogBuilder { self } + /// Sets the request details for the audit log if logger is set pub fn set_request( mut self, method: impl Into, @@ -218,6 +221,7 @@ impl AuditLogBuilder { self } + /// Sets the response details for the audit log if logger is set pub fn set_response(mut self, status_code: u16, err: impl Display) -> Self { if !self.enabled { return self; @@ -229,7 +233,7 @@ impl AuditLogBuilder { self } - // NOTE: Ensure that the logger has been constructed by Default + /// Sends the audit log to the logging server if configured pub async fn send(self) { let AuditLogBuilder { start_time, From c595f6ba64c3fcede9ef6be2c5f2528d78b9cd1d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 4 Jan 2025 21:57:06 +0530 Subject: [PATCH 25/30] ci: fix fmt --- src/audit.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index eefa11287..09e8b181b 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -44,11 +44,11 @@ pub struct AuditLogger { } impl AuditLogger { - /// Create an audit logger that can be used to capture and push + /// Create an audit logger that can be used to capture and push /// audit logs to the appropriate logging system over HTTP pub fn new() -> Option { - // Try to construct the log endpoint URL by joining the base URL - // with the ingest path, This can fail if the URL is not valid, + // Try to construct the log endpoint URL by joining the base URL + // with the ingest path, This can fail if the URL is not valid, // when the base URL is not set or the ingest path is not valid let log_endpoint = match CONFIG .parseable @@ -76,7 +76,7 @@ impl AuditLogger { .post(self.log_endpoint.as_str()) .json(&json) .header("x-p-stream", "audit_log"); - + // Use basic auth if credentials are configured if let Some(username) = CONFIG.parseable.audit_username.as_ref() { req = req.basic_auth(username, CONFIG.parseable.audit_password.as_ref()) From 39732aa59f4519d0df89933465d5260138d6b722 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 5 Jan 2025 21:59:48 +0530 Subject: [PATCH 26/30] feat: adhere to decided format and improve builder pattern impl --- src/audit.rs | 231 ++++++++++++++++++++++++------------- src/handlers/http/audit.rs | 63 +++++----- src/kafka.rs | 23 ++-- 3 files changed, 197 insertions(+), 120 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 09e8b181b..35a173019 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -22,7 +22,6 @@ use std::{ }; use crate::about::current; -use crate::handlers::http::modal::utils::rbac_utils::get_metadata; use super::option::CONFIG; use chrono::{DateTime, Utc}; @@ -101,10 +100,22 @@ pub enum AuditLogVersion { V1 = 1, } +#[derive(Serialize)] +pub struct AuditDetails { + pub version: AuditLogVersion, + pub id: Ulid, + pub generated_at: DateTime, +} + +#[derive(Serialize)] +pub struct ServerDetails { + pub version: String, + pub deployment_id: Ulid, +} + // Contains information about the actor (user) who performed the action #[derive(Serialize, Default)] -#[serde(rename_all = "camelCase")] -pub struct ActorLog { +pub struct ActorDetails { pub remote_host: String, pub user_agent: String, pub username: String, @@ -113,7 +124,10 @@ pub struct ActorLog { // Contains details about the HTTP request that was made #[derive(Serialize, Default)] -pub struct RequestLog { +pub struct RequestDetails { + pub stream: String, + pub start_time: DateTime, + pub end_time: DateTime, pub method: String, pub path: String, pub protocol: String, @@ -122,45 +136,34 @@ pub struct RequestLog { /// Contains information about the response sent back to the client #[derive(Default, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct ResponseLog { +pub struct ResponseDetails { pub status_code: u16, pub error: Option, } /// The main audit log structure that combines all audit information #[derive(Serialize)] -#[serde(rename_all = "camelCase")] pub struct AuditLog { - pub version: AuditLogVersion, - pub parseable_version: String, - pub deployment_id: Ulid, - pub audit_id: Ulid, - pub start_time: DateTime, - pub end_time: DateTime, - pub stream: String, - pub actor: ActorLog, - pub request: RequestLog, - pub response: ResponseLog, + pub audit: AuditDetails, + pub parseable_server: ServerDetails, + pub actor: ActorDetails, + pub request: RequestDetails, + pub response: ResponseDetails, } /// Builder pattern implementation for constructing audit logs pub struct AuditLogBuilder { // Used to ensure that log is only constructed if the logger is enabled enabled: bool, - start_time: DateTime, - stream: String, - pub actor: Option, - pub request: Option, - pub response: Option, + pub actor: Option, + pub request: Option, + pub response: Option, } impl Default for AuditLogBuilder { fn default() -> Self { AuditLogBuilder { enabled: AUDIT_LOGGER.is_some(), - start_time: Utc::now(), - stream: String::default(), actor: None, request: None, response: None, @@ -169,91 +172,159 @@ impl Default for AuditLogBuilder { } impl AuditLogBuilder { - /// Sets the stream name for the audit log if logger is set - pub fn set_stream_name(mut self, stream: impl Into) -> Self { - if !self.enabled { - return self; + /// Sets the remote host for the audit log + pub fn with_host(mut self, host: impl Into) -> Self { + if self.enabled { + self.actor + .get_or_insert_with(ActorDetails::default) + .remote_host = host.into(); } - self.stream = stream.into(); + self + } + /// Sets the username for the audit log + pub fn with_username(mut self, username: impl Into) -> Self { + if self.enabled { + self.actor + .get_or_insert_with(ActorDetails::default) + .username = username.into(); + } self } - /// Sets the actor details for the audit log if logger is set - pub fn set_actor( - mut self, - host: impl Into, - username: impl Into, - user_agent: impl Into, - auth_method: impl Into, - ) -> Self { - if !self.enabled { - return self; + /// Sets the user agent for the audit log + pub fn with_user_agent(mut self, user_agent: impl Into) -> Self { + if self.enabled { + self.actor + .get_or_insert_with(ActorDetails::default) + .user_agent = user_agent.into(); } - self.actor = Some(ActorLog { - remote_host: host.into(), - user_agent: user_agent.into(), - username: username.into(), - authorization_method: auth_method.into(), - }); + self + } + /// Sets the authorization method for the audit log + pub fn with_auth_method(mut self, auth_method: impl Into) -> Self { + if self.enabled { + self.actor + .get_or_insert_with(ActorDetails::default) + .authorization_method = auth_method.into(); + } self } - /// Sets the request details for the audit log if logger is set - pub fn set_request( - mut self, - method: impl Into, - path: impl Into, - protocol: impl Into, - headers: impl IntoIterator, - ) -> Self { - if !self.enabled { - return self; + /// Sets the stream for the request details + pub fn with_stream(mut self, stream: impl Into) -> Self { + if self.enabled { + self.request + .get_or_insert_with(RequestDetails::default) + .stream = stream.into(); } - self.request = Some(RequestLog { - method: method.into(), - path: path.into(), - protocol: protocol.into(), - headers: headers.into_iter().collect(), - }); + self + } + /// Sets the request timing details + pub fn with_timing(mut self, start_time: DateTime, end_time: DateTime) -> Self { + if self.enabled { + let request = self.request.get_or_insert_with(RequestDetails::default); + request.start_time = start_time; + request.end_time = end_time; + } self } - /// Sets the response details for the audit log if logger is set - pub fn set_response(mut self, status_code: u16, err: impl Display) -> Self { - if !self.enabled { - return self; + /// Sets the request method details + pub fn with_method(mut self, method: impl Into) -> Self { + if self.enabled { + self.request + .get_or_insert_with(RequestDetails::default) + .method = method.into(); } - let error = err.to_string(); - let error = (!error.is_empty()).then_some(error); - self.response = Some(ResponseLog { status_code, error }); + self + } + + /// Sets the request path + pub fn with_path(mut self, path: impl Into) -> Self { + if self.enabled { + self.request + .get_or_insert_with(RequestDetails::default) + .path = path.into(); + } + self + } + + /// Sets the request protocol + pub fn with_protocol(mut self, protocol: impl Into) -> Self { + if self.enabled { + self.request + .get_or_insert_with(RequestDetails::default) + .protocol = protocol.into(); + } + self + } + + /// Sets the request headers + pub fn with_headers(mut self, headers: impl IntoIterator) -> Self { + if self.enabled { + self.request + .get_or_insert_with(RequestDetails::default) + .headers = headers.into_iter().collect(); + } + self + } + /// Sets the response status code + pub fn with_status(mut self, status_code: u16) -> Self { + if self.enabled { + self.response + .get_or_insert_with(ResponseDetails::default) + .status_code = status_code; + } + self + } + + /// Sets the response error if any + pub fn with_error(mut self, err: impl Display) -> Self { + if self.enabled { + let error = err.to_string(); + if !error.is_empty() { + self.response + .get_or_insert_with(ResponseDetails::default) + .error = Some(error); + } + } self } /// Sends the audit log to the logging server if configured pub async fn send(self) { + // ensures that we don't progress if logger is not enabled + if !self.enabled { + return; + } + + // build the audit log let AuditLogBuilder { - start_time, - stream, actor, request, response, .. } = self; - let Some(logger) = AUDIT_LOGGER.as_ref() else { - return; - }; + + // get the logger + let logger = AUDIT_LOGGER.as_ref().unwrap(); + + // build the audit log + let now = Utc::now(); let audit_log = AuditLog { - version: AuditLogVersion::V1, - parseable_version: current().released_version.to_string(), - deployment_id: get_metadata().await.unwrap().deployment_id, - audit_id: Ulid::new(), - start_time, - end_time: Utc::now(), - stream, + audit: AuditDetails { + version: AuditLogVersion::V1, + id: Ulid::new(), + generated_at: now, + }, + parseable_server: ServerDetails { + version: current().released_version.to_string(), + deployment_id: Ulid::new(), + }, actor: actor.unwrap_or_default(), request: request.unwrap_or_default(), response: response.unwrap_or_default(), diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs index 5be002a82..4bb1b3f1b 100644 --- a/src/handlers/http/audit.rs +++ b/src/handlers/http/audit.rs @@ -23,6 +23,7 @@ use actix_web::{ middleware::Next, }; use actix_web_httpauth::extractors::basic::BasicAuth; +use chrono::Utc; use ulid::Ulid; use crate::{ @@ -37,6 +38,7 @@ pub async fn audit_log_middleware( mut req: ServiceRequest, next: Next, ) -> Result, actix_web::Error> { + let start_time = Utc::now(); let mut log_builder = AuditLogBuilder::default(); if let Some(kinesis_common_attributes) = @@ -44,12 +46,12 @@ pub async fn audit_log_middleware( { let attribute_value: &str = kinesis_common_attributes.to_str().unwrap(); let message: Message = serde_json::from_str(attribute_value).unwrap(); - log_builder = log_builder.set_stream_name(message.common_attributes.x_p_stream); + log_builder = log_builder.with_stream(message.common_attributes.x_p_stream); } else if let Some(stream) = req.match_info().get("logstream") { - log_builder = log_builder.set_stream_name(stream.to_owned()); + log_builder = log_builder.with_stream(stream); } else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) { if let Ok(stream) = value.to_str() { - log_builder = log_builder.set_stream_name(stream.to_owned()); + log_builder = log_builder.with_stream(stream); } } let mut username = "Unknown".to_owned(); @@ -70,39 +72,40 @@ pub async fn audit_log_middleware( } } - let headers = req - .headers() - .iter() - .filter_map(|(name, value)| match name.as_str() { - // NOTE: drop headers that are not required - name if DROP_HEADERS.contains(&name.to_lowercase().as_str()) => None, - name => { - // NOTE: Drop headers that can't be parsed as string - value - .to_str() - .map(|value| (name.to_owned(), value.to_string())) - .ok() - } - }); log_builder = log_builder - .set_request( - req.method().as_str(), - req.path(), - req.connection_info().scheme(), - headers, - ) - .set_actor( + .with_host( req.connection_info() .realip_remote_addr() .unwrap_or_default(), + ) + .with_user_agent( req.headers() .get("User-Agent") .and_then(|a| a.to_str().ok()) .unwrap_or_default(), - username, - authorization_method, - ); + ) + .with_username(username) + .with_auth_method(authorization_method); + log_builder = log_builder + .with_method(req.method().as_str()) + .with_path(req.path()) + .with_protocol(req.connection_info().scheme().to_string()) + .with_headers( + req.headers() + .iter() + .filter_map(|(name, value)| match name.as_str() { + // NOTE: drop headers that are not required + name if DROP_HEADERS.contains(&name.to_lowercase().as_str()) => None, + name => { + // NOTE: Drop headers that can't be parsed as string + value + .to_str() + .map(|value| (name.to_owned(), value.to_string())) + .ok() + } + }), + ); let res = next.call(req).await; // Capture status_code and error information from response @@ -111,13 +114,13 @@ pub async fn audit_log_middleware( let status = res.status(); // Use error information from reponse object if an error if let Some(err) = res.response().error() { - log_builder = log_builder.set_response(status.as_u16(), err); + log_builder = log_builder.with_status(status.as_u16()).with_error(err); } } - Err(err) => log_builder = log_builder.set_response(500, err), + Err(err) => log_builder = log_builder.with_status(500).with_error(err), } - log_builder.send().await; + log_builder.with_timing(start_time, Utc::now()).send().await; res } diff --git a/src/kafka.rs b/src/kafka.rs index b4b08bf55..52b49b99e 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -275,22 +275,25 @@ pub async fn setup_integration() { let mut stream = consumer.stream(); while let Ok(curr) = stream.next().await.unwrap() { + let start_time = Utc::now(); // Constructs a log for each kafka request let log_builder = AuditLogBuilder::default() - .set_actor( - CONFIG.parseable.kafka_host.as_deref().unwrap_or(""), - "Kafka Client", - "", - "", - ) - .set_request("", "", "Kafka", []) - .set_stream_name(curr.topic()); + .with_host(CONFIG.parseable.kafka_host.as_deref().unwrap_or("")) + .with_user_agent("Kafka Client") + .with_stream(curr.topic()); let Err(err) = ingest_message(curr).await else { - log_builder.set_response(200, "").send().await; + log_builder.with_status(200).send().await; continue; }; error!("Unable to ingest incoming kafka message- {err}"); - log_builder.set_response(500, &err).send().await; + + log_builder + .with_status(500) + .with_error(err) + .with_timing(start_time, Utc::now()) + .with_protocol("Kafka") + .send() + .await; } } From 1bbe5c58d3fcc834fd3f569c5cd82eaba2830999 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 5 Jan 2025 22:30:25 +0530 Subject: [PATCH 27/30] fix: ensure `deployment_id` is set at send and improve codeflow --- src/audit.rs | 103 +++++++++++++++++++-------------------------------- 1 file changed, 39 insertions(+), 64 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 35a173019..5ad0c611b 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -21,7 +21,7 @@ use std::{ fmt::{Debug, Display}, }; -use crate::about::current; +use crate::{about::current, handlers::http::modal::utils::rbac_utils::get_metadata}; use super::option::CONFIG; use chrono::{DateTime, Utc}; @@ -107,7 +107,7 @@ pub struct AuditDetails { pub generated_at: DateTime, } -#[derive(Serialize)] +#[derive(Serialize, Default)] pub struct ServerDetails { pub version: String, pub deployment_id: Ulid, @@ -155,18 +155,27 @@ pub struct AuditLog { pub struct AuditLogBuilder { // Used to ensure that log is only constructed if the logger is enabled enabled: bool, - pub actor: Option, - pub request: Option, - pub response: Option, + inner: AuditLog, } impl Default for AuditLogBuilder { fn default() -> Self { AuditLogBuilder { enabled: AUDIT_LOGGER.is_some(), - actor: None, - request: None, - response: None, + inner: AuditLog { + audit: AuditDetails { + version: AuditLogVersion::V1, + id: Ulid::new(), + generated_at: Utc::now(), + }, + parseable_server: ServerDetails { + version: current().released_version.to_string(), + deployment_id: Ulid::nil(), + }, + actor: ActorDetails::default(), + request: RequestDetails::default(), + response: ResponseDetails::default(), + }, } } } @@ -175,9 +184,7 @@ impl AuditLogBuilder { /// Sets the remote host for the audit log pub fn with_host(mut self, host: impl Into) -> Self { if self.enabled { - self.actor - .get_or_insert_with(ActorDetails::default) - .remote_host = host.into(); + self.inner.actor.remote_host = host.into(); } self } @@ -185,9 +192,7 @@ impl AuditLogBuilder { /// Sets the username for the audit log pub fn with_username(mut self, username: impl Into) -> Self { if self.enabled { - self.actor - .get_or_insert_with(ActorDetails::default) - .username = username.into(); + self.inner.actor.username = username.into(); } self } @@ -195,9 +200,7 @@ impl AuditLogBuilder { /// Sets the user agent for the audit log pub fn with_user_agent(mut self, user_agent: impl Into) -> Self { if self.enabled { - self.actor - .get_or_insert_with(ActorDetails::default) - .user_agent = user_agent.into(); + self.inner.actor.user_agent = user_agent.into(); } self } @@ -205,9 +208,7 @@ impl AuditLogBuilder { /// Sets the authorization method for the audit log pub fn with_auth_method(mut self, auth_method: impl Into) -> Self { if self.enabled { - self.actor - .get_or_insert_with(ActorDetails::default) - .authorization_method = auth_method.into(); + self.inner.actor.authorization_method = auth_method.into(); } self } @@ -215,9 +216,7 @@ impl AuditLogBuilder { /// Sets the stream for the request details pub fn with_stream(mut self, stream: impl Into) -> Self { if self.enabled { - self.request - .get_or_insert_with(RequestDetails::default) - .stream = stream.into(); + self.inner.request.stream = stream.into(); } self } @@ -225,9 +224,9 @@ impl AuditLogBuilder { /// Sets the request timing details pub fn with_timing(mut self, start_time: DateTime, end_time: DateTime) -> Self { if self.enabled { - let request = self.request.get_or_insert_with(RequestDetails::default); - request.start_time = start_time; - request.end_time = end_time; + self.inner.request.start_time = start_time; + self.inner.request.end_time = end_time; + self.inner.audit.generated_at = start_time; } self } @@ -235,9 +234,7 @@ impl AuditLogBuilder { /// Sets the request method details pub fn with_method(mut self, method: impl Into) -> Self { if self.enabled { - self.request - .get_or_insert_with(RequestDetails::default) - .method = method.into(); + self.inner.request.method = method.into(); } self } @@ -245,9 +242,7 @@ impl AuditLogBuilder { /// Sets the request path pub fn with_path(mut self, path: impl Into) -> Self { if self.enabled { - self.request - .get_or_insert_with(RequestDetails::default) - .path = path.into(); + self.inner.request.path = path.into(); } self } @@ -255,9 +250,7 @@ impl AuditLogBuilder { /// Sets the request protocol pub fn with_protocol(mut self, protocol: impl Into) -> Self { if self.enabled { - self.request - .get_or_insert_with(RequestDetails::default) - .protocol = protocol.into(); + self.inner.request.protocol = protocol.into(); } self } @@ -265,9 +258,7 @@ impl AuditLogBuilder { /// Sets the request headers pub fn with_headers(mut self, headers: impl IntoIterator) -> Self { if self.enabled { - self.request - .get_or_insert_with(RequestDetails::default) - .headers = headers.into_iter().collect(); + self.inner.request.headers = headers.into_iter().collect(); } self } @@ -275,9 +266,7 @@ impl AuditLogBuilder { /// Sets the response status code pub fn with_status(mut self, status_code: u16) -> Self { if self.enabled { - self.response - .get_or_insert_with(ResponseDetails::default) - .status_code = status_code; + self.inner.response.status_code = status_code; } self } @@ -287,9 +276,7 @@ impl AuditLogBuilder { if self.enabled { let error = err.to_string(); if !error.is_empty() { - self.response - .get_or_insert_with(ResponseDetails::default) - .error = Some(error); + self.inner.response.error = Some(error); } } self @@ -304,32 +291,20 @@ impl AuditLogBuilder { // build the audit log let AuditLogBuilder { - actor, - request, - response, + inner: mut audit_log, .. } = self; + // get the deployment id from metadata + // NOTE: this fails if the metadata couldn't be loaded due to network issue, etc. + audit_log.parseable_server.deployment_id = get_metadata() + .await + .expect("Metadata should have been loaded") + .deployment_id; + // get the logger let logger = AUDIT_LOGGER.as_ref().unwrap(); - // build the audit log - let now = Utc::now(); - let audit_log = AuditLog { - audit: AuditDetails { - version: AuditLogVersion::V1, - id: Ulid::new(), - generated_at: now, - }, - parseable_server: ServerDetails { - version: current().released_version.to_string(), - deployment_id: Ulid::new(), - }, - actor: actor.unwrap_or_default(), - request: request.unwrap_or_default(), - response: response.unwrap_or_default(), - }; - logger.send_log(json!(audit_log)).await } } From 6ee8fe1aee59ff2c3d85da92bf28d91a123486fd Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 5 Jan 2025 22:46:13 +0530 Subject: [PATCH 28/30] refactor: implicitly capture start/end time at construction and send --- src/audit.rs | 29 +++++++++++++---------------- src/handlers/http/audit.rs | 8 +++----- src/kafka.rs | 2 -- 3 files changed, 16 insertions(+), 23 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 5ad0c611b..3bd355d16 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -95,12 +95,14 @@ impl AuditLogger { // Represents the version of the audit log format #[non_exhaustive] #[repr(u8)] -#[derive(Debug, Clone, Copy, Serialize)] +#[derive(Debug, Clone, Copy, Serialize, Default)] pub enum AuditLogVersion { + // NOTE: default should be latest version + #[default] V1 = 1, } -#[derive(Serialize)] +#[derive(Serialize, Default)] pub struct AuditDetails { pub version: AuditLogVersion, pub id: Ulid, @@ -166,14 +168,17 @@ impl Default for AuditLogBuilder { audit: AuditDetails { version: AuditLogVersion::V1, id: Ulid::new(), - generated_at: Utc::now(), + ..Default::default() }, parseable_server: ServerDetails { version: current().released_version.to_string(), - deployment_id: Ulid::nil(), + ..Default::default() + }, + request: RequestDetails { + start_time: Utc::now(), + ..Default::default() }, actor: ActorDetails::default(), - request: RequestDetails::default(), response: ResponseDetails::default(), }, } @@ -221,16 +226,6 @@ impl AuditLogBuilder { self } - /// Sets the request timing details - pub fn with_timing(mut self, start_time: DateTime, end_time: DateTime) -> Self { - if self.enabled { - self.inner.request.start_time = start_time; - self.inner.request.end_time = end_time; - self.inner.audit.generated_at = start_time; - } - self - } - /// Sets the request method details pub fn with_method(mut self, method: impl Into) -> Self { if self.enabled { @@ -301,7 +296,9 @@ impl AuditLogBuilder { .await .expect("Metadata should have been loaded") .deployment_id; - + let now = Utc::now(); + audit_log.audit.generated_at = now; + audit_log.request.end_time = now; // get the logger let logger = AUDIT_LOGGER.as_ref().unwrap(); diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs index 4bb1b3f1b..83ba6a471 100644 --- a/src/handlers/http/audit.rs +++ b/src/handlers/http/audit.rs @@ -23,7 +23,6 @@ use actix_web::{ middleware::Next, }; use actix_web_httpauth::extractors::basic::BasicAuth; -use chrono::Utc; use ulid::Ulid; use crate::{ @@ -38,7 +37,6 @@ pub async fn audit_log_middleware( mut req: ServiceRequest, next: Next, ) -> Result, actix_web::Error> { - let start_time = Utc::now(); let mut log_builder = AuditLogBuilder::default(); if let Some(kinesis_common_attributes) = @@ -111,16 +109,16 @@ pub async fn audit_log_middleware( // Capture status_code and error information from response match &res { Ok(res) => { - let status = res.status(); + log_builder = log_builder.with_status(res.status().as_u16()); // Use error information from reponse object if an error if let Some(err) = res.response().error() { - log_builder = log_builder.with_status(status.as_u16()).with_error(err); + log_builder = log_builder.with_error(err); } } Err(err) => log_builder = log_builder.with_status(500).with_error(err), } - log_builder.with_timing(start_time, Utc::now()).send().await; + log_builder.send().await; res } diff --git a/src/kafka.rs b/src/kafka.rs index 52b49b99e..314133dcd 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -275,7 +275,6 @@ pub async fn setup_integration() { let mut stream = consumer.stream(); while let Ok(curr) = stream.next().await.unwrap() { - let start_time = Utc::now(); // Constructs a log for each kafka request let log_builder = AuditLogBuilder::default() .with_host(CONFIG.parseable.kafka_host.as_deref().unwrap_or("")) @@ -291,7 +290,6 @@ pub async fn setup_integration() { log_builder .with_status(500) .with_error(err) - .with_timing(start_time, Utc::now()) .with_protocol("Kafka") .send() .await; From f1386c24d0354a3e83ddadf62a53e6e580f5a40c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 5 Jan 2025 22:57:54 +0530 Subject: [PATCH 29/30] doc: why we log the way we log --- src/handlers/http/audit.rs | 16 ++++++++++++---- src/kafka.rs | 10 +++------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs index 83ba6a471..79e180d71 100644 --- a/src/handlers/http/audit.rs +++ b/src/handlers/http/audit.rs @@ -33,12 +33,15 @@ use crate::{ const DROP_HEADERS: [&str; 4] = ["authorization", "cookie", "user-agent", "x-p-stream"]; +/// A middleware that logs incoming requests and outgoing responses from parseable's HTTP API to an audit service pub async fn audit_log_middleware( mut req: ServiceRequest, next: Next, ) -> Result, actix_web::Error> { + // Start building the audit log entry let mut log_builder = AuditLogBuilder::default(); + // Get stream name from request headers, if available if let Some(kinesis_common_attributes) = req.request().headers().get(KINESIS_COMMON_ATTRIBUTES_KEY) { @@ -52,6 +55,8 @@ pub async fn audit_log_middleware( log_builder = log_builder.with_stream(stream); } } + + // Get username and authorization method let mut username = "Unknown".to_owned(); let mut authorization_method = "None".to_owned(); @@ -70,6 +75,8 @@ pub async fn audit_log_middleware( } } + // Add details to the audit log, based on the incoming request + // NOTE: we save on the cost of cloning by doing so only if audit logger is configured log_builder = log_builder .with_host( req.connection_info() @@ -83,9 +90,7 @@ pub async fn audit_log_middleware( .unwrap_or_default(), ) .with_username(username) - .with_auth_method(authorization_method); - - log_builder = log_builder + .with_auth_method(authorization_method) .with_method(req.method().as_str()) .with_path(req.path()) .with_protocol(req.connection_info().scheme().to_string()) @@ -104,9 +109,11 @@ pub async fn audit_log_middleware( } }), ); + + // forward request to parseable let res = next.call(req).await; - // Capture status_code and error information from response + // Capture status_code and error information from outgoing response match &res { Ok(res) => { log_builder = log_builder.with_status(res.status().as_u16()); @@ -118,6 +125,7 @@ pub async fn audit_log_middleware( Err(err) => log_builder = log_builder.with_status(500).with_error(err), } + // Send the audit log to audit service, if configured log_builder.send().await; res diff --git a/src/kafka.rs b/src/kafka.rs index 314133dcd..7ff9d47fd 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -275,10 +275,11 @@ pub async fn setup_integration() { let mut stream = consumer.stream(); while let Ok(curr) = stream.next().await.unwrap() { - // Constructs a log for each kafka request + // TODO: maybe we should not constructs an audit log for each kafka message, but do so at the batch level let log_builder = AuditLogBuilder::default() .with_host(CONFIG.parseable.kafka_host.as_deref().unwrap_or("")) .with_user_agent("Kafka Client") + .with_protocol("Kafka") .with_stream(curr.topic()); let Err(err) = ingest_message(curr).await else { @@ -287,11 +288,6 @@ pub async fn setup_integration() { }; error!("Unable to ingest incoming kafka message- {err}"); - log_builder - .with_status(500) - .with_error(err) - .with_protocol("Kafka") - .send() - .await; + log_builder.with_status(500).with_error(err).send().await; } } From a123e3ae4329ec88ab9b18f30a07eab22e2e5ae6 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 6 Jan 2025 10:29:12 +0530 Subject: [PATCH 30/30] refactor: use in-memory static metadata --- src/audit.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 3bd355d16..0cddb602b 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -21,7 +21,7 @@ use std::{ fmt::{Debug, Display}, }; -use crate::{about::current, handlers::http::modal::utils::rbac_utils::get_metadata}; +use crate::{about::current, storage::StorageMetadata}; use super::option::CONFIG; use chrono::{DateTime, Utc}; @@ -172,7 +172,7 @@ impl Default for AuditLogBuilder { }, parseable_server: ServerDetails { version: current().released_version.to_string(), - ..Default::default() + deployment_id: StorageMetadata::global().deployment_id, }, request: RequestDetails { start_time: Utc::now(), @@ -290,18 +290,14 @@ impl AuditLogBuilder { .. } = self; - // get the deployment id from metadata - // NOTE: this fails if the metadata couldn't be loaded due to network issue, etc. - audit_log.parseable_server.deployment_id = get_metadata() - .await - .expect("Metadata should have been loaded") - .deployment_id; let now = Utc::now(); audit_log.audit.generated_at = now; audit_log.request.end_time = now; - // get the logger - let logger = AUDIT_LOGGER.as_ref().unwrap(); - logger.send_log(json!(audit_log)).await + AUDIT_LOGGER + .as_ref() + .unwrap() + .send_log(json!(audit_log)) + .await } }