Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4fce69d
feat: setup audit logging infrastructure
Jan 2, 2025
1d5ff45
feat: audit log http requests
Jan 3, 2025
84b473f
feat: extract response status code
Jan 3, 2025
50b5a61
feat: audit ingestion in kafka
Jan 3, 2025
31baab9
fix: ensure headers for auth are dropped
Jan 3, 2025
48c3253
fix: obfuscate auth headers
Jan 3, 2025
b456371
fix: camelCase json field names
Jan 3, 2025
53ba72b
feat: get cause of error
Jan 3, 2025
d34cb76
drop some headers, collect parseableVersion info, log as string
Jan 3, 2025
60843a6
return start_time instead of elapsed
Jan 3, 2025
46a4f4a
drop x-p-stream header
Jan 3, 2025
306b4be
rm request host
Jan 3, 2025
a889332
fix: get auth details
Jan 3, 2025
0819601
add error info to log
Jan 3, 2025
61cbf25
refactor
Jan 3, 2025
ea75bfa
refactor as own middleware
Jan 3, 2025
200624a
refactor: audit logging without tracing complexity
Jan 3, 2025
8419b64
ci: clippy suggestions
Jan 3, 2025
4b6806a
refactor: revert changes in main
Jan 4, 2025
bcf84a2
refactor: builder pattern
Jan 4, 2025
3810c8e
fix: error message
Jan 4, 2025
b374183
feat: save cost of atomic access
Jan 4, 2025
382fada
refactor: don't clone
Jan 4, 2025
7d5f011
cleanup code
Jan 4, 2025
c595f6b
ci: fix fmt
Jan 4, 2025
39732aa
feat: adhere to decided format and improve builder pattern impl
Jan 5, 2025
1bbe5c5
fix: ensure `deployment_id` is set at send and improve codeflow
Jan 5, 2025
6ee8fe1
refactor: implicitly capture start/end time
Jan 5, 2025
f1386c2
doc: why we log the way we log
Jan 5, 2025
5c79704
Merge branch 'main' into audit-logging
Jan 5, 2025
a123e3a
refactor: use in-memory static metadata
Jan 6, 2025
f953cb2
Merge branch 'main' into audit-logging
Jan 6, 2025
e345c38
Merge branch 'main' into audit-logging
Jan 6, 2025
46e7737
Merge branch 'main' into audit-logging
nikhilsinhaparseable Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 255 additions & 0 deletions src/audit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::{
collections::HashMap,
fmt::{Debug, Display},
sync::Arc,
};

use crate::about::current;
use crate::handlers::http::modal::utils::rbac_utils::get_metadata;

use super::option::CONFIG;
use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
use reqwest::Client;
use serde::Serialize;
use serde_json::{json, Value};
use tracing::error;

use ulid::Ulid;
use url::Url;

static AUDIT_LOGGER: Lazy<Option<AuditLogger>> = Lazy::new(AuditLogger::new);

pub struct AuditLogger {
client: Arc<Client>,
log_endpoint: Url,
username: Option<String>,
password: Option<String>,
}

impl AuditLogger {
/// Create an audit logger that can be used to capture
/// and push audit logs to the appropriate logging system over HTTP
pub fn new() -> Option<AuditLogger> {
let log_endpoint = match CONFIG
.parseable
.audit_logger
.as_ref()?
.join("/api/v1/ingest")
{
Ok(url) => url,
Err(err) => {
eprintln!("Couldn't setup audit logger: {err}");
return None;
}
};

let client = Arc::new(reqwest::Client::new());

let username = CONFIG.parseable.audit_username.clone();
let password = CONFIG.parseable.audit_password.clone();

Some(AuditLogger {
client,
log_endpoint,
username,
password,
})
}

async fn send_log(&self, json: Value) {
let mut req = self
.client
.post(self.log_endpoint.as_str())
.json(&json)
.header("x-p-stream", "audit_log");
if let Some(username) = self.username.as_ref() {
req = req.basic_auth(username, self.password.as_ref())
}

match req.send().await {
Ok(r) => {
if let Err(e) = r.error_for_status() {
error!("{e}")
}
}
Err(e) => error!("Failed to send audit event: {}", e),
}
}
}

#[non_exhaustive]
#[repr(u8)]
#[derive(Debug, Clone, Copy, Serialize)]
pub enum AuditLogVersion {
V1 = 1,
}

#[derive(Serialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct ActorLog {
pub remote_host: String,
pub user_agent: String,
pub username: String,
pub authorization_method: String,
}

#[derive(Serialize, Default)]
pub struct RequestLog {
pub method: String,
pub path: String,
pub protocol: String,
pub headers: HashMap<String, String>,
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ResponseLog {
pub status_code: u16,
pub error: Option<String>,
}

impl Default for ResponseLog {
fn default() -> Self {
// Server failed to respond
ResponseLog {
status_code: 500,
error: None,
}
}
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AuditLog {
pub version: AuditLogVersion,
pub parseable_version: String,
pub deployment_id: Ulid,
pub audit_id: Ulid,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub stream: String,
pub actor: ActorLog,
pub request: RequestLog,
pub response: ResponseLog,
}

pub struct AuditLogBuilder {
start_time: DateTime<Utc>,
stream: String,
pub actor: Option<ActorLog>,
pub request: Option<RequestLog>,
pub response: Option<ResponseLog>,
}

impl Default for AuditLogBuilder {
fn default() -> Self {
AuditLogBuilder {
start_time: Utc::now(),
stream: String::default(),
actor: None,
request: None,
response: None,
}
}
}

impl AuditLogBuilder {
pub fn set_stream_name(&mut self, stream: impl Into<String>) {
if AUDIT_LOGGER.is_none() {
return;
}
self.stream = stream.into();
}

pub fn set_actor(
&mut self,
host: impl Into<String>,
username: impl Into<String>,
user_agent: impl Into<String>,
auth_method: impl Into<String>,
) {
if AUDIT_LOGGER.is_none() {
return;
}
self.actor = Some(ActorLog {
remote_host: host.into(),
user_agent: user_agent.into(),
username: username.into(),
authorization_method: auth_method.into(),
});
}

pub fn set_request(
&mut self,
method: impl Into<String>,
path: impl Into<String>,
protocol: impl Into<String>,
headers: impl IntoIterator<Item = (String, String)>,
) {
if AUDIT_LOGGER.is_none() {
return;
}
self.request = Some(RequestLog {
method: method.into(),
path: path.into(),
protocol: protocol.into(),
headers: headers.into_iter().collect(),
});
}

pub fn set_response(&mut self, status_code: u16, err: impl Display) {
if AUDIT_LOGGER.is_none() {
return;
}
let error = err.to_string();
let error = error.is_empty().then_some(error);
self.response = Some(ResponseLog { status_code, error });
}

// NOTE: Ensure that the logger has been constructed by Default
pub async fn send(self) {
let AuditLogBuilder {
start_time,
stream,
actor,
request,
response,
} = self;
let Some(logger) = AUDIT_LOGGER.as_ref() else {
return;
};
let audit_log = AuditLog {
version: AuditLogVersion::V1,
parseable_version: current().released_version.to_string(),
deployment_id: get_metadata().await.unwrap().deployment_id,
audit_id: Ulid::new(),
start_time,
end_time: Utc::now(),
stream,
actor: actor.unwrap_or_default(),
request: request.unwrap_or_default(),
response: response.unwrap_or_default(),
};

logger.send_log(json!(audit_log)).await
}
}
36 changes: 36 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ pub struct Cli {
pub kafka_client_id: Option<String>,
pub kafka_security_protocol: Option<SslProtocol>,
pub kafka_partitions: Option<String>,

// Audit Logging env vars
pub audit_logger: Option<Url>,
pub audit_username: Option<String>,
pub audit_password: Option<String>,
}

impl Cli {
Expand Down Expand Up @@ -165,6 +170,10 @@ impl Cli {
pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol";
pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions";

pub const AUDIT_LOGGER: &'static str = "audit-logger";
pub const AUDIT_USERNAME: &'static str = "audit-username";
pub const AUDIT_PASSWORD: &'static str = "audit-password";

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_staging_path.join(stream_name)
}
Expand Down Expand Up @@ -219,6 +228,29 @@ impl Cli {
.env("P_KAFKA_PARTITIONS")
.value_name("STRING")
.help("Kafka partitions"),
)
.arg(
Arg::new(Self::AUDIT_LOGGER)
.long(Self::AUDIT_LOGGER)
.env("P_AUDIT_LOGGER")
.value_name("URL")
.required(false)
.value_parser(validation::url)
.help("Audit logger endpoint"),
)
.arg(
Arg::new(Self::AUDIT_USERNAME)
.long(Self::AUDIT_USERNAME)
.env("P_AUDIT_USERNAME")
.value_name("STRING")
.help("Audit logger username"),
)
.arg(
Arg::new(Self::AUDIT_PASSWORD)
.long(Self::AUDIT_PASSWORD)
.env("P_AUDIT_PASSWORD")
.value_name("STRING")
.help("Audit logger password"),
)
.arg(
Arg::new(Self::TRINO_ENDPOINT)
Expand Down Expand Up @@ -536,6 +568,10 @@ impl FromArgMatches for Cli {
.cloned();
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();

self.audit_logger = m.get_one::<Url>(Self::AUDIT_LOGGER).cloned();
self.audit_username = m.get_one::<String>(Self::AUDIT_USERNAME).cloned();
self.audit_password = m.get_one::<String>(Self::AUDIT_PASSWORD).cloned();

self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
self.trusted_ca_certs_path = m.get_one::<PathBuf>(Self::TRUSTED_CA_CERTS_PATH).cloned();
Expand Down
Loading
Loading