Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion resources/formats.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"name": "alb_log",
"regex": [
{
"pattern": "^(?<type>http|https|h2|ws|wss) (?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<elb>[^ ]+) (?<client_ip>[\\w\\.:]+):(?<client_port>\\d+) (?<target_ip>[\\w\\.:]+):(?<target_port>\\d+) (?<request_processing_time>[-\\d\\.]+) (?<target_processing_time>[-\\d\\.]+) (?<response_processing_time>[-\\d\\.]+) (?<elb_status_code>\\d+|-) (?<target_status_code>\\d+|-) (?<received_bytes>\\d+) (?<sent_bytes>\\d+) (?<cs_method>POST|GET|PUT|DELETE|HEAD|OPTIONS|CONNECT|TRACE|PATCH) (?<cs_uri_whole>[^ ]+) (?<cs_version>[^ ]+) (?<user_agent>[^ \\(]+) (?:\\([^\\)]+\\))? (?<ssl_cipher>[\\w-]+) (?<ssl_protocol>[\\w\\.-]+) (?<target_group_arn>[^ ]+) (?<trace_id>[^ ]+) (?<domain_name>[^ ]+) (?<chosen_cert_arn>[^ ]+) (?<action_executed>\\d+) (?<request_creation_time>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<redirect_url>[^ ]+) (?<redirect_proto>[^ ]+) (?<redirect_port>[^ ]+) (?<target_ip_port>[\\d\\.:]+) (?<target_status_desc>\\d+|-) (?<classification>[^ ]+) (?<classification_reason>[^ ]+) (?<track_id>[^ ]+)$",
"pattern": "^(?<type>http|https|h2|ws|wss) (?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<elb>[^ ]+) (?<client_ip>[\\w\\.:]+):(?<client_port>\\d+) (?<target_ip>[\\w\\.:]+):(?<target_port>\\d+) (?<request_processing_time>[-\\d\\.]+) (?<target_processing_time>[-\\d\\.]+) (?<response_processing_time>[-\\d\\.]+) (?<elb_status_code>\\d+|-) (?<target_status_code>\\d+|-) (?<received_bytes>\\d+) (?<sent_bytes>\\d+) (?<cs_method>POST|GET|PUT|DELETE|HEAD|OPTIONS|CONNECT|TRACE|PATCH) (?<cs_uri_whole>[^ ]+) (?<cs_version>[^ ]+) (?<user_agent>.+?) (?<ssl_cipher>[^ ]+) (?<ssl_protocol>[^ ]+) (?<target_group_arn>[^ ]+) (?<trace_id>[^ ]+) (?<domain_name>[^ ]+) (?<chosen_cert_arn>[^ ]+) (?<action_executed>\\d+) (?<request_creation_time>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<redirect_url>[^ ]+) (?<redirect_proto>[^ ]+) (?<redirect_port>[^ ]+) (?<target_ip_port>[\\d\\.:]+) (?<target_status_desc>\\d+|-) (?<classification>[^ ]+) (?<classification_reason>[^ ]+) (?<track_id>[^ ]+)$",
"fields": [
"type",
"timestamp",
Expand Down
21 changes: 18 additions & 3 deletions src/event/format/known_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use serde::{Deserialize, Deserializer};
use serde_json::{Map, Value};
use tracing::error;

use crate::event::FORMAT_VERIFY_KEY;

/// Predefined JSON with known textual logging formats
const FORMATS_JSON: &str = include_str!("../../../resources/formats.json");

Expand Down Expand Up @@ -120,11 +122,21 @@ impl SchemaDefinition {
}
}

// add `P_FORMAT_VERIFY_KEY` to the object
obj.insert(
FORMAT_VERIFY_KEY.to_string(),
Value::String("true".to_string()),
);

obj.extend(extracted_fields);

return Some(format.fields.clone());
}

// add `P_FORMAT_VERIFY_KEY` to the object
obj.insert(
FORMAT_VERIFY_KEY.to_string(),
Value::String("false".to_string()),
);
None
}
}
Expand Down Expand Up @@ -180,6 +192,7 @@ impl EventProcessor {
pub fn extract_from_inline_log(
&self,
json: &mut Value,
p_custom_fields: &mut HashMap<String, String>,
log_source: &str,
extract_log: Option<&str>,
) -> Result<HashSet<String>, Error> {
Expand All @@ -197,15 +210,17 @@ impl EventProcessor {
if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
fields.extend(known_fields);
} else {
return Err(Error::Unacceptable(log_source.to_owned()));
// add `P_FORMAT_VERIFY_KEY` to the object
p_custom_fields.insert(FORMAT_VERIFY_KEY.to_string(), "false".to_string());
}
}
}
Value::Object(event) => {
if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
return Ok(known_fields);
} else {
return Err(Error::Unacceptable(log_source.to_owned()));
// add `P_FORMAT_VERIFY_KEY` to the object
p_custom_fields.insert(FORMAT_VERIFY_KEY.to_string(), "false".to_string());
}
}
_ => unreachable!("We don't accept events of the form: {json}"),
Expand Down
1 change: 1 addition & 0 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const USER_AGENT_KEY: &str = "p_user_agent";
pub const SOURCE_IP_KEY: &str = "p_src_ip";
pub const FORMAT_KEY: &str = "p_format";
pub const FORMAT_VERIFY_KEY: &str = "p_format_verified";

#[derive(Clone)]
pub struct Event {
Expand Down
30 changes: 20 additions & 10 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,15 @@ pub async fn ingest(
return Err(PostError::OtelNotSupported);
}

let mut p_custom_fields = get_custom_fields_from_header(&req);

let fields = match &log_source {
LogSource::Custom(src) => {
KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?
}
LogSource::Custom(src) => KNOWN_SCHEMA_LIST.extract_from_inline_log(
&mut json,
&mut p_custom_fields,
src,
extract_log,
)?,
_ => HashSet::new(),
};

Expand Down Expand Up @@ -114,7 +119,7 @@ pub async fn ingest(
PARSEABLE
.add_update_log_source(&stream_name, log_source_entry)
.await?;
let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -198,7 +203,7 @@ pub async fn handle_otel_logs_ingestion(
.add_update_log_source(&stream_name, log_source_entry)
.await?;

let p_custom_fields = get_custom_fields_from_header(req);
let p_custom_fields = get_custom_fields_from_header(&req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Expand Down Expand Up @@ -256,7 +261,7 @@ pub async fn handle_otel_metrics_ingestion(
.add_update_log_source(&stream_name, log_source_entry)
.await?;

let p_custom_fields = get_custom_fields_from_header(req);
let p_custom_fields = get_custom_fields_from_header(&req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Expand Down Expand Up @@ -315,7 +320,7 @@ pub async fn handle_otel_traces_ingestion(
.add_update_log_source(&stream_name, log_source_entry)
.await?;

let p_custom_fields = get_custom_fields_from_header(req);
let p_custom_fields = get_custom_fields_from_header(&req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Expand Down Expand Up @@ -363,13 +368,18 @@ pub async fn post_event(
.headers()
.get(EXTRACT_LOG_KEY)
.and_then(|h| h.to_str().ok());

let mut p_custom_fields = get_custom_fields_from_header(&req);
match &log_source {
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
return Err(PostError::OtelNotSupported)
}
LogSource::Custom(src) => {
KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?;
KNOWN_SCHEMA_LIST.extract_from_inline_log(
&mut json,
&mut p_custom_fields,
src,
extract_log,
)?;
}
_ => {}
}
Expand All @@ -386,7 +396,7 @@ pub async fn post_event(
})
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
}
let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
Expand Down
10 changes: 5 additions & 5 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn push_logs(
Ok(())
}

pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap<String, String> {
pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap<String, String> {
let user_agent = req
.headers()
.get(USER_AGENT)
Expand Down Expand Up @@ -217,7 +217,7 @@ mod tests {
.insert_header(("x-p-environment", "dev"))
.to_http_request();

let custom_fields = get_custom_fields_from_header(req);
let custom_fields = get_custom_fields_from_header(&req);

assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent");
assert_eq!(custom_fields.get("environment").unwrap(), "dev");
Expand All @@ -230,7 +230,7 @@ mod tests {
.insert_header((STREAM_NAME_HEADER_KEY, "teststream"))
.to_http_request();

let custom_fields = get_custom_fields_from_header(req);
let custom_fields = get_custom_fields_from_header(&req);

assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent");
assert!(!custom_fields.contains_key(STREAM_NAME_HEADER_KEY));
Expand All @@ -243,7 +243,7 @@ mod tests {
.insert_header((LOG_SOURCE_KEY, "otel-logs"))
.to_http_request();

let custom_fields = get_custom_fields_from_header(req);
let custom_fields = get_custom_fields_from_header(&req);

assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent");
assert_eq!(custom_fields.get(FORMAT_KEY).unwrap(), "otel-logs");
Expand All @@ -255,7 +255,7 @@ mod tests {
.insert_header(("x-p-", "empty"))
.to_http_request();

let custom_fields = get_custom_fields_from_header(req);
let custom_fields = get_custom_fields_from_header(&req);

assert_eq!(custom_fields.len(), 2);
assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "");
Expand Down
Loading