Skip to content

feat: batch audit logs and ensure they are not lost #1080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 16 additions & 130 deletions src/audit.rs → src/audit/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,136 +16,18 @@
*
*/

use std::{
collections::HashMap,
fmt::{Debug, Display},
};

use crate::{about::current, storage::StorageMetadata, HTTP_CLIENT};
use std::fmt::Display;

use super::option::CONFIG;
use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
use serde::Serialize;
use serde_json::{json, Value};
use chrono::Utc;
use tracing::error;

use ulid::Ulid;
use url::Url;

static AUDIT_LOGGER: Lazy<Option<AuditLogger>> = Lazy::new(AuditLogger::new);

// AuditLogger handles sending audit logs to a remote logging system
pub struct AuditLogger {
log_endpoint: Url,
}

impl AuditLogger {
/// Create an audit logger that can be used to capture and push
/// audit logs to the appropriate logging system over HTTP
pub fn new() -> Option<AuditLogger> {
// Try to construct the log endpoint URL by joining the base URL
// with the ingest path, This can fail if the URL is not valid,
// when the base URL is not set or the ingest path is not valid
let log_endpoint = match CONFIG
.parseable
.audit_logger
.as_ref()?
.join("/api/v1/ingest")
{
Ok(url) => url,
Err(err) => {
eprintln!("Couldn't setup audit logger: {err}");
return None;
}
};

Some(AuditLogger { log_endpoint })
}

// Sends the audit log to the configured endpoint with proper authentication
async fn send_log(&self, json: Value) {
let mut req = HTTP_CLIENT
.post(self.log_endpoint.as_str())
.json(&json)
.header("x-p-stream", "audit_log");

// Use basic auth if credentials are configured
if let Some(username) = CONFIG.parseable.audit_username.as_ref() {
req = req.basic_auth(username, CONFIG.parseable.audit_password.as_ref())
}

match req.send().await {
Ok(r) => {
if let Err(e) = r.error_for_status() {
error!("{e}")
}
}
Err(e) => error!("Failed to send audit event: {}", e),
}
}
}

// Represents the version of the audit log format
#[non_exhaustive]
#[repr(u8)]
#[derive(Debug, Clone, Copy, Serialize, Default)]
pub enum AuditLogVersion {
// NOTE: default should be latest version
#[default]
V1 = 1,
}

#[derive(Serialize, Default)]
pub struct AuditDetails {
pub version: AuditLogVersion,
pub id: Ulid,
pub generated_at: DateTime<Utc>,
}

#[derive(Serialize, Default)]
pub struct ServerDetails {
pub version: String,
pub deployment_id: Ulid,
}

// Contains information about the actor (user) who performed the action
#[derive(Serialize, Default)]
pub struct ActorDetails {
pub remote_host: String,
pub user_agent: String,
pub username: String,
pub authorization_method: String,
}
use crate::{about::current, storage::StorageMetadata};

// Contains details about the HTTP request that was made
#[derive(Serialize, Default)]
pub struct RequestDetails {
pub stream: String,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub method: String,
pub path: String,
pub protocol: String,
pub headers: HashMap<String, String>,
}

/// Contains information about the response sent back to the client
#[derive(Default, Serialize)]
pub struct ResponseDetails {
pub status_code: u16,
pub error: Option<String>,
}

/// The main audit log structure that combines all audit information
#[derive(Serialize)]
pub struct AuditLog {
pub audit: AuditDetails,
pub parseable_server: ServerDetails,
pub actor: ActorDetails,
pub request: RequestDetails,
pub response: ResponseDetails,
}
use super::{
ActorDetails, AuditDetails, AuditLog, AuditLogVersion, RequestDetails, ResponseDetails,
ServerDetails, AUDIT_LOG_TX,
Comment on lines +27 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify that all imported entities are still needed after refactoring.
The references (ActorDetails, AuditDetails, AuditLog, etc.) are presumably used within this file, but do ensure none of them are left unused after the structural changes and removals in related modules (e.g., old AuditLogger removal).


🏁 Script executed:

#!/bin/bash
# Description: Search for references of removed or unused structs in the codebase.
fd -e rs --exec rg 'ActorDetails|AuditDetails|AuditLogVersion|...'

Length of output: 65605


Audit Import Cleanup: Remove Unused Entities

The grep search did not reveal any further usage of the imported items (ActorDetails, AuditDetails, AuditLog, AuditLogVersion, RequestDetails, ResponseDetails, ServerDetails, AUDIT_LOG_TX) beyond their inclusion in this file. Please double‑check the implementation in src/audit/builder.rs—if none of these entities are actually referenced (including potential macro or conditional usages), remove the unused imports to keep the file clean and prevent confusion.

  • Verify each imported entity in src/audit/builder.rs is actually referenced.
  • If an import is no longer needed due to the recent refactoring (e.g., after removing the old AuditLogger), please delete it.

};

/// Builder pattern implementation for constructing audit logs
pub struct AuditLogBuilder {
Expand All @@ -157,7 +39,7 @@ pub struct AuditLogBuilder {
impl Default for AuditLogBuilder {
fn default() -> Self {
AuditLogBuilder {
enabled: AUDIT_LOGGER.is_some(),
enabled: AUDIT_LOG_TX.get().is_some(),
inner: AuditLog {
audit: AuditDetails {
version: AuditLogVersion::V1,
Expand Down Expand Up @@ -288,10 +170,14 @@ impl AuditLogBuilder {
audit_log.audit.generated_at = now;
audit_log.request.end_time = now;

AUDIT_LOGGER
.as_ref()
.unwrap()
.send_log(json!(audit_log))
// NOTE: we are fine with blocking here as user expects audit logs to be sent at all costs
if let Err(e) = AUDIT_LOG_TX
.get()
.expect("Audit logger not initialized")
.send(audit_log)
.await
{
error!("Couldn't send to logger: {e}")
}
Comment on lines +173 to +181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Assess the blocking behavior under heavy load.
By design, this code blocks while awaiting the send operation. Under significant load or if the channel is full, requests may stall. This aligns with a near-synchronous approach but may impact throughput. Consider:

  1. Using a bounded channel with a fallback strategy to avoid request stalls.
  2. Logging to disk or an internal buffer if the channel blocks, then retrying asynchronously.

Below is a suggested fallback approach, if you want to avoid fully blocking:

 if let Err(e) = AUDIT_LOG_TX
     .get()
     .expect("Audit logger not initialized")
-    .send(audit_log)
+    .try_send(audit_log)  // Non-blocking attempt
     .await
 {
     error!("Couldn't send to logger: {e}")
 }

Would you like assistance in implementing a buffered or retry-based logging mechanism?

Committable suggestion skipped: line range outside the PR's diff.

}
}
221 changes: 221 additions & 0 deletions src/audit/logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::fs::File;

use tokio::{fs::OpenOptions, io::AsyncWriteExt, select, sync::mpsc::channel, time::interval};
use tracing::{error, warn};
use url::Url;

use crate::{option::CONFIG, HTTP_CLIENT};

use super::{AuditLog, AUDIT_LOG_TX};

// AuditLogger handles sending audit logs to a remote logging system
pub struct AuditLogger {
log_endpoint: Option<Url>,
batch: Vec<AuditLog>,

// NOTE: good until usize overflows
next_log_file_id: usize,
oldest_log_file_id: usize,
}

impl Default for AuditLogger {
/// Create an audit logger that can be used to capture and push
/// audit logs to the appropriate logging system over HTTP
fn default() -> Self {
let mut logger = AuditLogger {
log_endpoint: None,
batch: Vec::with_capacity(CONFIG.parseable.audit_batch_size),
next_log_file_id: 0,
oldest_log_file_id: 0,
};

// Try to construct the log endpoint URL by joining the base URL
// with the ingest path, This can fail if the URL is not valid,
// when the base URL is not set or the ingest path is not valid
let Some(url) = CONFIG.parseable.audit_logger.as_ref() else {
return logger;
};

logger.log_endpoint = url
.join("/api/v1/ingest")
.inspect_err(|err| eprintln!("Couldn't setup audit logger: {err}"))
.ok();

// Created directory for audit logs if it doesn't exist
std::fs::create_dir_all(&CONFIG.parseable.audit_log_dir)
.expect("Failed to create audit log directory");

// Figure out the latest and oldest log file in directory
let files = std::fs::read_dir(&CONFIG.parseable.audit_log_dir)
.expect("Failed to read audit log directory");
let (oldest_log_file_id, latest_log_file_id) =
files.fold((usize::MAX, 0), |(oldest, latest), r| {
let file_name = r.unwrap().file_name();
let Ok(file_id) = file_name
.to_str()
.expect("File name is not utf8")
.split('.')
.next()
.expect("File name is not valid")
.parse::<usize>()
.inspect_err(|e| warn!("Unexpected file in logs directory: {e}"))
else {
return (oldest, latest);
};
(oldest.min(file_id), latest.max(file_id))
});

logger.next_log_file_id = latest_log_file_id + 1;
if oldest_log_file_id != usize::MAX {
logger.oldest_log_file_id = oldest_log_file_id;
}

logger
}
}

impl AuditLogger {
/// Flushes audit logs to the remote logging system
async fn flush(&mut self) {
if self.batch.is_empty() {
return;
}

// swap the old batch with a new empty one
let mut logs_to_send = Vec::with_capacity(CONFIG.parseable.audit_batch_size);
std::mem::swap(&mut self.batch, &mut logs_to_send);

// send the logs to the remote logging system, if no backlog, else write to disk
if self.oldest_log_file_id >= self.next_log_file_id
&& self.send_logs_to_remote(&logs_to_send).await.is_ok()
{
return;
}

// write the logs to the next log file
let log_file_path = CONFIG
.parseable
.audit_log_dir
.join(format!("{}.json", self.next_log_file_id));
let mut log_file = OpenOptions::new()
.create(true)
.truncate(true)
.open(log_file_path)
.await
.expect("Failed to open audit log file");
let buf = serde_json::to_vec(&logs_to_send).expect("Failed to serialize audit logs");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly it looks like it never happens, but unwrap() can cause panic in production. It would be nice to have proper error handling with context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be an issue in most places are serialization is not a problem for these types. But still we have expects to document why it is safe to assume it won't

log_file.write_all(&buf).await.unwrap();

// increment the next log file id
self.next_log_file_id += 1;
}

/// Inserts an audit log into the batch, and flushes the batch if it exceeds the configured batch size
async fn insert(&mut self, log: AuditLog) {
self.batch.push(log);

// Flush if batch size exceeds threshold
if self.batch.len() >= CONFIG.parseable.audit_batch_size {
self.flush().await
}
}

/// Reads the oldest log file and sends it to the audit logging backend
async fn send_logs(&self) -> Result<(), anyhow::Error> {
// if there are no logs to send, do nothing
if self.oldest_log_file_id >= self.next_log_file_id {
return Ok(());
}

// read the oldest log file
let oldest_file_path = CONFIG
.parseable
.audit_log_dir
.join(format!("{}.json", self.oldest_log_file_id));
let mut oldest_file = File::open(&oldest_file_path)?;
let logs_to_send: Vec<AuditLog> = serde_json::from_reader(&mut oldest_file)?;
self.send_logs_to_remote(&logs_to_send).await?;

// Delete the oldest log file
std::fs::remove_file(oldest_file_path)?;

Ok(())
}

async fn send_logs_to_remote(&self, logs: &Vec<AuditLog>) -> Result<(), anyhow::Error> {
// send the logs to the audit logging backend
let log_endpoint = self
.log_endpoint
.as_ref()
.expect("Audit logger was initialized!");
let mut req = HTTP_CLIENT
.post(log_endpoint.as_str())
.json(&logs)
.header("x-p-stream", "audit_log");

// Use basic auth if credentials are configured
if let Some(username) = CONFIG.parseable.audit_username.as_ref() {
req = req.basic_auth(username, CONFIG.parseable.audit_password.as_ref())
}

// Send batched logs to the audit logging backend
req.send().await?.error_for_status()?;

Ok(())
}

/// Spawns a background task for periodic flushing of audit logs, if configured
pub async fn spawn_batcher(mut self) {
if self.log_endpoint.is_none() {
return;
}

// setup the audit log channel
let (audit_log_tx, mut audit_log_rx) = channel(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the initial size is 0 ?

Copy link
Contributor Author

@de-sh de-sh Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to create a "Glienicke Bridge" so that we block at the request level everytime the underlying audit logger is unable to keep-up.

AUDIT_LOG_TX
.set(audit_log_tx)
.expect("Failed to set audit logger tx");

// spawn the batcher
tokio::spawn(async move {
let mut interval = interval(CONFIG.parseable.audit_flush_interval);
loop {
select! {
_ = interval.tick() => {
self.flush().await;
}

Some(log) = audit_log_rx.recv() => {
self.insert(log).await;
}

r = self.send_logs() => {
if let Err(e) = r {
error!("Failed to send logs: {e}");
continue;
}
self.oldest_log_file_id += 1;
},
}
}
});
}
}
Loading
Loading