Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,37 @@ const OTLP_BUFFER_INITIAL_CAPACITY: usize = 1024 * 64;
static EXPRESSIONS: LazyLock<RwLock<Vec<BridgePipeline>>> =
LazyLock::new(|| RwLock::new(Vec::new()));

static LOG_RECORD_SCHEMA: LazyLock<ParserMapSchema> = LazyLock::new(|| {
ParserMapSchema::new()
.set_default_map_key("attributes")
.with_key_definition("time_unix_nano", ParserMapKeySchema::DateTime)
.with_key_definition("observed_time_unix_nano", ParserMapKeySchema::DateTime)
.with_key_definition("severity_number", ParserMapKeySchema::Integer)
.with_key_definition("severity_text", ParserMapKeySchema::String)
.with_key_definition("body", ParserMapKeySchema::Any)
.with_key_definition("trace_id", ParserMapKeySchema::Array)
.with_key_definition("span_id", ParserMapKeySchema::Array)
.with_key_definition("flags", ParserMapKeySchema::Integer)
.with_key_definition("event_name", ParserMapKeySchema::String)
.with_key_aliases([
("Attributes", "attributes"),
("Timestamp", "time_unix_nano"),
("ObservedTimestamp", "observed_time_unix_nano"),
("SeverityNumber", "severity_number"),
("SeverityText", "severity_text"),
("Body", "body"),
("TraceId", "trace_id"),
("SpanId", "span_id"),
("TraceFlags", "flags"),
("EventName", "event_name"),
])
});

/// Get the static schema for LogRecord fields and their aliases
pub fn get_log_record_schema() -> &'static ParserMapSchema {
&LOG_RECORD_SCHEMA
}

#[derive(Debug)]
pub struct BridgePipeline {
attributes_schema: Option<ParserMapSchema>,
Expand Down Expand Up @@ -359,17 +390,7 @@ fn build_parser_options(options: Option<BridgeOptions>) -> Result<ParserOptions,
fn build_log_record_schema(
attributes_schema: Option<ParserMapSchema>,
) -> Result<(ParserMapSchema, Option<ParserMapSchema>), ParserError> {
let mut log_record_schema = ParserMapSchema::new()
.set_default_map_key("Attributes")
.with_key_definition("Timestamp", ParserMapKeySchema::DateTime)
.with_key_definition("ObservedTimestamp", ParserMapKeySchema::DateTime)
.with_key_definition("SeverityNumber", ParserMapKeySchema::Integer)
.with_key_definition("SeverityText", ParserMapKeySchema::String)
.with_key_definition("Body", ParserMapKeySchema::Any)
.with_key_definition("TraceId", ParserMapKeySchema::Array)
.with_key_definition("SpanId", ParserMapKeySchema::Array)
.with_key_definition("TraceFlags", ParserMapKeySchema::Integer)
.with_key_definition("EventName", ParserMapKeySchema::String);
let mut log_record_schema = LOG_RECORD_SCHEMA.clone();

if let Some(mut attributes_schema) = attributes_schema {
let schema = attributes_schema.get_schema_mut();
Expand All @@ -385,20 +406,24 @@ fn build_log_record_schema(
// present in Attributes users might query with ambiguous naming.
// For example: source | extend Body = 'something' will write to the
// top-level field and not Attributes.
if let Some(removed) = schema.remove(top_level_key)
&& &removed != top_level_key_schema
{
return Err(ParserError::SchemaError(format!(
"'{top_level_key}' key cannot be declared as '{}' type",
&removed
)));

// Check both the canonical key and all its aliases
for key_name in log_record_schema.get_all_key_names_for_canonical_key(top_level_key) {
if let Some(removed) = schema.remove(key_name.as_ref())
&& &removed != top_level_key_schema
{
return Err(ParserError::SchemaError(format!(
"'{key_name}' key cannot be declared as '{}' type",
&removed
)));
}
}
}

let allow_undefined_keys = attributes_schema.get_allow_undefined_keys();

log_record_schema = log_record_schema.with_key_definition(
"Attributes",
"attributes",
ParserMapKeySchema::Map(Some(attributes_schema)),
);

Expand All @@ -409,7 +434,7 @@ fn build_log_record_schema(
}

for (top_level_key, top_level_key_schema) in log_record_schema.get_schema() {
if top_level_key.as_ref() == "Attributes" {
if top_level_key.as_ref() == "attributes" {
if let ParserMapKeySchema::Map(Some(attributes_schema)) = top_level_key_schema {
for (top_level_key, top_level_key_schema) in attributes_schema.get_schema() {
summary_schema = summary_schema
Expand Down Expand Up @@ -672,7 +697,7 @@ mod tests {
Some(
BridgeOptions::new().with_attributes_schema(
ParserMapSchema::new()
.with_key_definition("Body", ParserMapKeySchema::Any)
.with_key_definition("body", ParserMapKeySchema::Any)
.with_key_definition("int_value", ParserMapKeySchema::Integer),
),
),
Expand All @@ -688,7 +713,7 @@ mod tests {
Some(
BridgeOptions::new().with_attributes_schema(
ParserMapSchema::new()
.with_key_definition("Body", ParserMapKeySchema::Any)
.with_key_definition("body", ParserMapKeySchema::Any)
.with_key_definition("int_value", ParserMapKeySchema::Integer),
),
),
Expand Down Expand Up @@ -722,9 +747,9 @@ mod tests {
"source | summarize by int_value | extend int_value = 1 | summarize int_value = count() | extend Custom = 1234",
);

run_test_success("source | extend Body = 'hello world'");
// Note: Body gets removed from Attributes schema because it is defined at the root
run_test_failure("source | extend Attributes.Body = 'hello world'");
run_test_success("source | extend body = 'hello world'");
// Note: body gets removed from attributes schema because it is defined at the root
run_test_failure("source | extend attributes.body = 'hello world'");
}

#[test]
Expand Down Expand Up @@ -771,7 +796,7 @@ mod tests {
Some(
BridgeOptions::new().with_attributes_schema(
ParserMapSchema::new()
.with_key_definition("Body", ParserMapKeySchema::Any)
.with_key_definition("body", ParserMapKeySchema::Any)
.with_key_definition("int_value", ParserMapKeySchema::Integer)
.set_allow_undefined_keys(),
),
Expand All @@ -792,12 +817,137 @@ mod tests {
let e = parse_kql_query_into_pipeline(
"",
Some(BridgeOptions::new().with_attributes_schema(
ParserMapSchema::new().with_key_definition("Body", ParserMapKeySchema::Map(None)),
ParserMapSchema::new().with_key_definition("body", ParserMapKeySchema::Map(None)),
)),
)
.unwrap_err();

assert_eq!(1, e.len());
assert!(matches!(e[0], ParserError::SchemaError(_)));
}

#[test]
fn test_parse_kql_query_with_field_aliases() {
let run_test = |query: &str| {
parse_kql_query_into_pipeline(query, None).unwrap();
};

// Test canonical names
run_test("source | where severity_text == 'Info'");
run_test("source | extend x = severity_number");
run_test("source | project time_unix_nano, body");

// Test aliases
run_test("source | where SeverityText == 'Info'");
run_test("source | extend x = SeverityNumber");
run_test("source | project Timestamp, Body");

// Test mixing aliases and canonical names
run_test("source | where SeverityText == 'Info' and severity_number > 0");
run_test("source | extend ts = Timestamp, sev = severity_text");

// Test default map key alias
run_test("source | where attributes.custom_field == 'value'");
run_test("source | where Attributes.custom_field == 'value'");
}

#[test]
fn test_attributes_schema_removes_top_level_aliases() {
// Test that when user provides an attributes schema with aliases
// of top-level fields, they get removed properly
let result = parse_kql_query_into_pipeline(
"source | extend SeverityText = 'Debug'",
Some(
BridgeOptions::new().with_attributes_schema(
ParserMapSchema::new()
// This should be removed because SeverityText is an alias for severity_text
.with_key_definition("SeverityText", ParserMapKeySchema::String)
.with_key_definition("custom_field", ParserMapKeySchema::Integer),
),
),
);

// Should succeed - SeverityText should write to top-level field
assert!(result.is_ok());

// Test with canonical name (snake_case) too
let result = parse_kql_query_into_pipeline(
"source | extend severity_text = 'Debug'",
Some(
BridgeOptions::new().with_attributes_schema(
ParserMapSchema::new()
.with_key_definition("severity_text", ParserMapKeySchema::String)
.with_key_definition("custom_field", ParserMapKeySchema::Integer),
),
),
);

assert!(result.is_ok());
}

#[test]
fn test_field_aliases_in_log_record_processing() {
let create_request = || {
ExportLogsServiceRequest::new().with_resource_logs(ResourceLogs::new().with_scope_logs(
ScopeLogs::new().with_log_records(vec![
LogRecord::new()
.with_severity_text("Info".into())
.with_severity_number(9),
LogRecord::new()
.with_severity_text("Warning".into())
.with_severity_number(13),
]),
))
};

let pipeline_canonical = parse_kql_query_into_pipeline(
"source | where severity_text == 'Info' and severity_number == 9",
None,
)
.unwrap();
let (result_canonical, _) = process_export_logs_service_request_using_pipeline(
&pipeline_canonical,
RecordSetEngineDiagnosticLevel::Error,
create_request(),
)
.unwrap();

let pipeline_with_aliases = parse_kql_query_into_pipeline(
"source | where SeverityText == 'Info' and SeverityNumber == 9",
None,
)
.unwrap();
let (result_with_aliases, _) = process_export_logs_service_request_using_pipeline(
&pipeline_with_aliases,
RecordSetEngineDiagnosticLevel::Error,
create_request(),
)
.unwrap();

let canonical_logs = &result_canonical.unwrap().resource_logs[0].scope_logs[0].log_records;
let aliased_logs = &result_with_aliases.unwrap().resource_logs[0].scope_logs[0].log_records;

assert_eq!(canonical_logs.len(), 1);
assert_eq!(aliased_logs.len(), 1);
assert_eq!(
canonical_logs[0]
.severity_text
.as_ref()
.unwrap()
.get_value(),
aliased_logs[0].severity_text.as_ref().unwrap().get_value()
);
assert_eq!(
canonical_logs[0]
.severity_number
.as_ref()
.unwrap()
.get_value(),
aliased_logs[0]
.severity_number
.as_ref()
.unwrap()
.get_value()
);
}
}
Loading
Loading