Skip to content

feat: batch audit logs and ensure they are not lost #1080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 16 additions & 129 deletions src/audit.rs → src/audit/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,135 +16,18 @@
*
*/

use std::{
collections::HashMap,
fmt::{Debug, Display},
};
use std::fmt::Display;

use crate::{about::current, parseable::PARSEABLE, storage::StorageMetadata, HTTP_CLIENT};
use crate::{about::current, storage::StorageMetadata};

use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
use serde::Serialize;
use serde_json::{json, Value};
use chrono::Utc;
use tracing::error;

use ulid::Ulid;
use url::Url;

static AUDIT_LOGGER: Lazy<Option<AuditLogger>> = Lazy::new(AuditLogger::new);

// AuditLogger handles sending audit logs to a remote logging system
pub struct AuditLogger {
log_endpoint: Url,
}

impl AuditLogger {
/// Create an audit logger that can be used to capture and push
/// audit logs to the appropriate logging system over HTTP
pub fn new() -> Option<AuditLogger> {
// Try to construct the log endpoint URL by joining the base URL
// with the ingest path, This can fail if the URL is not valid,
// when the base URL is not set or the ingest path is not valid
let log_endpoint = match PARSEABLE
.options
.audit_logger
.as_ref()?
.join("/api/v1/ingest")
{
Ok(url) => url,
Err(err) => {
eprintln!("Couldn't setup audit logger: {err}");
return None;
}
};

Some(AuditLogger { log_endpoint })
}

// Sends the audit log to the configured endpoint with proper authentication
async fn send_log(&self, json: Value) {
let mut req = HTTP_CLIENT
.post(self.log_endpoint.as_str())
.json(&json)
.header("x-p-stream", "audit_log");

// Use basic auth if credentials are configured
if let Some(username) = PARSEABLE.options.audit_username.as_ref() {
req = req.basic_auth(username, PARSEABLE.options.audit_password.as_ref())
}

match req.send().await {
Ok(r) => {
if let Err(e) = r.error_for_status() {
error!("{e}")
}
}
Err(e) => error!("Failed to send audit event: {}", e),
}
}
}

// Represents the version of the audit log format
#[non_exhaustive]
#[repr(u8)]
#[derive(Debug, Clone, Copy, Serialize, Default)]
pub enum AuditLogVersion {
// NOTE: default should be latest version
#[default]
V1 = 1,
}

#[derive(Serialize, Default)]
pub struct AuditDetails {
pub version: AuditLogVersion,
pub id: Ulid,
pub generated_at: DateTime<Utc>,
}

#[derive(Serialize, Default)]
pub struct ServerDetails {
pub version: String,
pub deployment_id: Ulid,
}

// Contains information about the actor (user) who performed the action
#[derive(Serialize, Default)]
pub struct ActorDetails {
pub remote_host: String,
pub user_agent: String,
pub username: String,
pub authorization_method: String,
}

// Contains details about the HTTP request that was made
#[derive(Serialize, Default)]
pub struct RequestDetails {
pub stream: String,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub method: String,
pub path: String,
pub protocol: String,
pub headers: HashMap<String, String>,
}

/// Contains information about the response sent back to the client
#[derive(Default, Serialize)]
pub struct ResponseDetails {
pub status_code: u16,
pub error: Option<String>,
}

/// The main audit log structure that combines all audit information
#[derive(Serialize)]
pub struct AuditLog {
pub audit: AuditDetails,
pub parseable_server: ServerDetails,
pub actor: ActorDetails,
pub request: RequestDetails,
pub response: ResponseDetails,
}
use super::{
ActorDetails, AuditDetails, AuditLog, AuditLogVersion, RequestDetails, ResponseDetails,
ServerDetails, AUDIT_LOG_TX,
Comment on lines +27 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify that all imported entities are still needed after refactoring.
The references (ActorDetails, AuditDetails, AuditLog, etc.) are presumably used within this file, but do ensure none of them are left unused after the structural changes and removals in related modules (e.g., old AuditLogger removal).


🏁 Script executed:

#!/bin/bash
# Description: Search for references of removed or unused structs in the codebase.
fd -e rs --exec rg 'ActorDetails|AuditDetails|AuditLogVersion|...'

Length of output: 65605


Audit Import Cleanup: Remove Unused Entities

The grep search did not reveal any further usage of the imported items (ActorDetails, AuditDetails, AuditLog, AuditLogVersion, RequestDetails, ResponseDetails, ServerDetails, AUDIT_LOG_TX) beyond their inclusion in this file. Please double‑check the implementation in src/audit/builder.rs—if none of these entities are actually referenced (including potential macro or conditional usages), remove the unused imports to keep the file clean and prevent confusion.

  • Verify each imported entity in src/audit/builder.rs is actually referenced.
  • If an import is no longer needed due to the recent refactoring (e.g., after removing the old AuditLogger), please delete it.

};

/// Builder pattern implementation for constructing audit logs
pub struct AuditLogBuilder {
Expand All @@ -156,7 +39,7 @@ pub struct AuditLogBuilder {
impl Default for AuditLogBuilder {
fn default() -> Self {
AuditLogBuilder {
enabled: AUDIT_LOGGER.is_some(),
enabled: AUDIT_LOG_TX.get().is_some(),
inner: AuditLog {
audit: AuditDetails {
version: AuditLogVersion::V1,
Expand Down Expand Up @@ -287,10 +170,14 @@ impl AuditLogBuilder {
audit_log.audit.generated_at = now;
audit_log.request.end_time = now;

AUDIT_LOGGER
.as_ref()
.unwrap()
.send_log(json!(audit_log))
// NOTE: we are fine with blocking here as user expects audit logs to be sent at all costs
if let Err(e) = AUDIT_LOG_TX
.get()
.expect("Audit logger not initialized")
.send(audit_log)
.await
{
error!("Couldn't send to logger: {e}")
}
Comment on lines +173 to +181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Assess the blocking behavior under heavy load.
By design, this code blocks while awaiting the send operation. Under significant load or if the channel is full, requests may stall. This aligns with a near-synchronous approach but may impact throughput. Consider:

  1. Using a bounded channel with a fallback strategy to avoid request stalls.
  2. Logging to disk or an internal buffer if the channel blocks, then retrying asynchronously.

Below is a suggested fallback approach, if you want to avoid fully blocking:

 if let Err(e) = AUDIT_LOG_TX
     .get()
     .expect("Audit logger not initialized")
-    .send(audit_log)
+    .try_send(audit_log)  // Non-blocking attempt
     .await
 {
     error!("Couldn't send to logger: {e}")
 }

Would you like assistance in implementing a buffered or retry-based logging mechanism?

Committable suggestion skipped: line range outside the PR's diff.

}
}
Loading
Loading