Skip to content

Commit 346319a

Browse files
authored
[otap-df-otap] Add support for CEF messages with Syslog headers (open-telemetry#1264)
Address open-telemetry#1073 ## Changes - Add support for parsing CEF messages with Syslog header - Note that we still support parsing raw CEF messages ### Examples: #### Raw CEF ```CEF:0|Security|threatmanager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232``` #### CEF with Syslog header (This is a partial header with RFC 3164, example is from the [spec](https://www.microfocus.com/documentation/arcsight/arcsight-smartconnectors-8.3/cef-implementation-standard/Content/CEF/Chapter%201%20What%20is%20CEF.htm)) ```Sep 29 08:26:10 host CEF:1|Security|threatmanager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232```
1 parent f6bbee6 commit 346319a

File tree

4 files changed

+487
-220
lines changed

4 files changed

+487
-220
lines changed

rust/otap-dataflow/crates/otap/src/syslog_cef_receiver/arrow_records_encoder.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,7 +1426,7 @@ mod tests {
14261426
Some(&AttributeValue::String("1.0".to_string()))
14271427
);
14281428
assert_eq!(
1429-
log1_attrs.get("cef.signature_id"),
1429+
log1_attrs.get("cef.device_event_class_id"),
14301430
Some(&AttributeValue::String("100".to_string()))
14311431
);
14321432
assert_eq!(
@@ -1483,7 +1483,7 @@ mod tests {
14831483
Some(&AttributeValue::String("2.4.1".to_string()))
14841484
);
14851485
assert_eq!(
1486-
log2_attrs.get("cef.signature_id"),
1486+
log2_attrs.get("cef.device_event_class_id"),
14871487
Some(&AttributeValue::String("400".to_string()))
14881488
);
14891489
assert_eq!(
@@ -1526,7 +1526,7 @@ mod tests {
15261526
Some(&AttributeValue::String("1.2.3".to_string()))
15271527
);
15281528
assert_eq!(
1529-
log3_attrs.get("cef.signature_id"),
1529+
log3_attrs.get("cef.device_event_class_id"),
15301530
Some(&AttributeValue::String("SignatureID".to_string()))
15311531
);
15321532
assert_eq!(
@@ -2001,7 +2001,7 @@ mod tests {
20012001
Some(&AttributeValue::String("1.0".to_string()))
20022002
);
20032003
assert_eq!(
2004-
log3_attrs.get("cef.signature_id"),
2004+
log3_attrs.get("cef.device_event_class_id"),
20052005
Some(&AttributeValue::String("100".to_string()))
20062006
);
20072007
assert_eq!(
@@ -2643,7 +2643,10 @@ mod tests {
26432643
log1_attrs.get("cef.device_version"),
26442644
Some(&"1.0".to_string())
26452645
);
2646-
assert_eq!(log1_attrs.get("cef.signature_id"), Some(&"100".to_string()));
2646+
assert_eq!(
2647+
log1_attrs.get("cef.device_event_class_id"),
2648+
Some(&"100".to_string())
2649+
);
26472650
assert_eq!(
26482651
log1_attrs.get("cef.name"),
26492652
Some(&"worm successfully stopped".to_string())
@@ -2711,7 +2714,10 @@ mod tests {
27112714
log2_attrs.get("cef.device_version"),
27122715
Some(&"2.4.1".to_string())
27132716
);
2714-
assert_eq!(log2_attrs.get("cef.signature_id"), Some(&"400".to_string()));
2717+
assert_eq!(
2718+
log2_attrs.get("cef.device_event_class_id"),
2719+
Some(&"400".to_string())
2720+
);
27152721
assert_eq!(
27162722
log2_attrs.get("cef.name"),
27172723
Some(&"Successful Login".to_string())
@@ -2772,7 +2778,7 @@ mod tests {
27722778
Some(&"1.2.3".to_string())
27732779
);
27742780
assert_eq!(
2775-
log3_attrs.get("cef.signature_id"),
2781+
log3_attrs.get("cef.device_event_class_id"),
27762782
Some(&"SignatureID".to_string())
27772783
);
27782784
assert_eq!(log3_attrs.get("cef.name"), Some(&"Event Name".to_string()));
@@ -3050,7 +3056,10 @@ mod tests {
30503056
log3_attrs.get("cef.device_version"),
30513057
Some(&"1.0".to_string())
30523058
);
3053-
assert_eq!(log3_attrs.get("cef.signature_id"), Some(&"100".to_string()));
3059+
assert_eq!(
3060+
log3_attrs.get("cef.device_event_class_id"),
3061+
Some(&"100".to_string())
3062+
);
30543063
assert_eq!(
30553064
log3_attrs.get("cef.name"),
30563065
Some(&"worm successfully stopped".to_string())

rust/otap-dataflow/crates/otap/src/syslog_cef_receiver/parser/cef.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ impl<'a> CefExtensionsIter<'a> {
350350

351351
/// Collect all extensions into a Vec, allocating only when necessary
352352
#[cfg(test)]
353-
fn collect_all(mut self) -> Vec<(Vec<u8>, Vec<u8>)> {
353+
pub(super) fn collect_all(mut self) -> Vec<(Vec<u8>, Vec<u8>)> {
354354
let mut result = Vec::new();
355355
while let Some((key, value)) = self.next_extension() {
356356
result.push((key.to_vec(), value.to_vec()));

rust/otap-dataflow/crates/otap/src/syslog_cef_receiver/parser/mod.rs

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,79 @@ pub enum ParseError {
4040
InvalidCef,
4141
/// Error parsing UTF-8 strings
4242
InvalidUtf8,
43+
/// Error indicating that the format of the syslog message is unknown
44+
UnknownFormat,
4345
}
4446

4547
/// Parse a syslog message from bytes, automatically detecting the format
4648
pub(super) fn parse(input: &[u8]) -> Result<ParsedSyslogMessage<'_>, ParseError> {
47-
// Check if it's a CEF message first
49+
// Try pure CEF first - it's the simplest check
4850
if input.starts_with(b"CEF:") {
49-
return parse_cef(input).map(ParsedSyslogMessage::Cef);
51+
if let Ok(cef_msg) = parse_cef(input) {
52+
return Ok(ParsedSyslogMessage::Cef(cef_msg));
53+
}
54+
}
55+
56+
// Try RFC 5424 (has version number after priority)
57+
if let Ok(rfc5424_msg) = parse_rfc5424(input) {
58+
// Check if the message contains CEF
59+
if let Some(msg) = rfc5424_msg.message {
60+
if msg.starts_with(b"CEF:") {
61+
if let Ok(cef_msg) = parse_cef(msg) {
62+
return Ok(ParsedSyslogMessage::CefWithRfc5424(rfc5424_msg, cef_msg));
63+
}
64+
}
65+
}
66+
return Ok(ParsedSyslogMessage::Rfc5424(rfc5424_msg));
5067
}
5168

52-
// Try RFC 5424 first
53-
if let Ok(msg) = parse_rfc5424(input) {
54-
return Ok(ParsedSyslogMessage::Rfc5424(msg));
69+
// Try RFC 3164
70+
if let Ok(rfc3164_msg) = parse_rfc3164(input) {
71+
// Check if the content contains CEF
72+
if let Some(content) = rfc3164_msg.content {
73+
if content.starts_with(b"CEF:") {
74+
if let Ok(cef_msg) = parse_cef(content) {
75+
return Ok(ParsedSyslogMessage::CefWithRfc3164(rfc3164_msg, cef_msg));
76+
}
77+
}
78+
}
79+
80+
// Special case: If tag is "CEF", the full CEF message spans from "CEF:" in the input
81+
// This handles the case where RFC3164 parser splits "CEF:1|..." into tag="CEF" and content="1|..."
82+
if rfc3164_msg.tag == Some(&b"CEF"[..]) && rfc3164_msg.content.is_some() {
83+
// Find where "CEF:" appears in the original input after the hostname
84+
if let Some(hostname) = rfc3164_msg.hostname {
85+
// Find hostname position in input
86+
if let Some(hostname_pos) =
87+
input.windows(hostname.len()).position(|w| w == hostname)
88+
{
89+
// Look for "CEF:" after hostname position
90+
let search_start = hostname_pos + hostname.len();
91+
let after_hostname = &input[search_start..];
92+
93+
// Find "CEF:" in the remaining input
94+
if let Some(cef_pos) = after_hostname.windows(4).position(|w| w == b"CEF:") {
95+
// The CEF message starts at this position and goes to the end
96+
let cef_message = &after_hostname[cef_pos..];
97+
98+
if let Ok(cef_msg) = parse_cef(cef_message) {
99+
// Update the rfc3164 content to point to the full CEF message
100+
let mut modified_rfc3164 = rfc3164_msg;
101+
modified_rfc3164.content = Some(cef_message);
102+
return Ok(ParsedSyslogMessage::CefWithRfc3164(
103+
modified_rfc3164,
104+
cef_msg,
105+
));
106+
}
107+
}
108+
}
109+
}
110+
}
111+
112+
return Ok(ParsedSyslogMessage::Rfc3164(rfc3164_msg));
55113
}
56114

57-
// Fallback to RFC 3164
58-
parse_rfc3164(input).map(ParsedSyslogMessage::Rfc3164)
115+
Err(ParseError::UnknownFormat)
59116
}
60117

61118
/// Parse priority from the beginning of a syslog message

0 commit comments

Comments
 (0)