diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 85ca0f9f2981f..143910d5580a2 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -304,6 +304,7 @@ Lifetouch linkerd localdomain localstack +Logpush lookback lucene Lumia diff --git a/changelog.d/splunk_hec_raw_line_splitting.enhancement.md b/changelog.d/splunk_hec_raw_line_splitting.enhancement.md new file mode 100644 index 0000000000000..f8a1124df18be --- /dev/null +++ b/changelog.d/splunk_hec_raw_line_splitting.enhancement.md @@ -0,0 +1,3 @@ +Add `raw_line_splitting` option to `splunk_hec` source to split incoming requests to the `/services/collector/raw` endpoint on newlines, creating separate events for each line. This is useful when receiving newline-delimited JSON (NDJSON) from sources like CloudFlare Logpush. + +authors: clundquist-stripe diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index b020383b9fbda..255c9663ce3c2 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -111,6 +111,16 @@ pub struct SplunkConfig { /// event metadata and preferentially used if the event is sent to a Splunk HEC sink. store_hec_token: bool, + /// Whether or not to split the raw endpoint on newlines. + /// + /// If set to `true`, incoming requests to the `/services/collector/raw` endpoint will be + /// split on newlines, creating a separate event for each line. This is useful when receiving + /// newline-delimited JSON (NDJSON) or other line-delimited data formats. + /// + /// If set to `false` (the default), the entire request body is treated as a single event. + #[serde(default)] + raw_line_splitting: bool, + #[configurable(derived)] tls: Option, @@ -139,6 +149,7 @@ impl Default for SplunkConfig { tls: None, acknowledgements: Default::default(), store_hec_token: false, + raw_line_splitting: false, log_namespace: None, keepalive: Default::default(), } @@ -301,6 +312,7 @@ struct SplunkSource { protocol: &'static str, idx_ack: Option>, store_hec_token: bool, + raw_line_splitting: bool, log_namespace: LogNamespace, events_received: Registered, } @@ -330,6 +342,7 @@ impl SplunkSource { protocol, idx_ack, store_hec_token: config.store_hec_token, + raw_line_splitting: config.raw_line_splitting, log_namespace, events_received: register!(EventsReceived), } @@ -461,6 +474,7 @@ impl SplunkSource { let protocol = self.protocol; let idx_ack = self.idx_ack.clone(); let store_hec_token = self.store_hec_token; + let raw_line_splitting = self.raw_line_splitting; let events_received = self.events_received.clone(); let log_namespace = self.log_namespace; @@ -502,23 +516,52 @@ impl SplunkSource { ), _ => None, }; - let mut event = raw_event( - body, - gzip, - channel_id, - remote, - xff, - batch, - log_namespace, - &events_received, - )?; - if let Some(token) = token.filter(|_| store_hec_token) { - event.metadata_mut().set_splunk_hec_token(token.into()); - } - let res = out.send_event(event).await; - res.map(|_| maybe_ack_id) - .map_err(|_| Rejection::from(ApiError::ServerShutdown)) + if raw_line_splitting { + // Split body on newlines, creating one event per line + let mut events = raw_events( + body, + gzip, + channel_id, + remote, + xff, + batch, + log_namespace, + &events_received, + )?; + + if let Some(token) = token.filter(|_| store_hec_token) { + let token: Arc = token.into(); + for event in &mut events { + event + .metadata_mut() + .set_splunk_hec_token(Arc::clone(&token)); + } + } + + let res = out.send_batch(events).await; + res.map(|_| maybe_ack_id) + .map_err(|_| Rejection::from(ApiError::ServerShutdown)) + } else { + // Original behavior: entire body as single event + let mut event = raw_event( + body, + gzip, + channel_id, + remote, + xff, + batch, + log_namespace, + &events_received, + )?; + if let Some(token) = token.filter(|_| store_hec_token) { + event.metadata_mut().set_splunk_hec_token(token.into()); + } + + let res = out.send_event(event).await; + res.map(|_| maybe_ack_id) + .map_err(|_| Rejection::from(ApiError::ServerShutdown)) + } } }, ) @@ -1128,6 +1171,100 @@ fn raw_event( Ok(Event::from(log)) } +/// Creates multiple events from raw request by splitting on newlines +#[allow(clippy::too_many_arguments)] +fn raw_events( + bytes: Bytes, + gzip: bool, + channel: String, + remote: Option, + xff: Option, + batch: Option, + log_namespace: LogNamespace, + events_received: &Registered, +) -> Result, Rejection> { + // Process gzip + let data: Vec = if gzip { + let mut decompressed = Vec::new(); + match MultiGzDecoder::new(bytes.reader()).read_to_end(&mut decompressed) { + Ok(0) => return Err(ApiError::NoData.into()), + Ok(_) => decompressed, + Err(error) => { + emit!(SplunkHecRequestBodyInvalidError { error }); + return Err(ApiError::InvalidDataFormat { event: 0 }.into()); + } + } + } else { + bytes.to_vec() + }; + + // Split on newlines + let lines: Vec<&[u8]> = data + .split(|&b| b == b'\n') + .filter(|line| !line.is_empty()) + .collect(); + + if lines.is_empty() { + return Err(ApiError::NoData.into()); + } + + // Compute host once for all events + let host = if let Some(remote_address) = xff { + Some(remote_address) + } else { + remote.map(|remote| remote.to_string()) + }; + + let now = Utc::now(); + let mut events = Vec::with_capacity(lines.len()); + + for line in lines { + let message: Value = Value::from(Bytes::copy_from_slice(line)); + + // Construct event + let mut log = match log_namespace { + LogNamespace::Vector => LogEvent::from(message), + LogNamespace::Legacy => { + let mut log = LogEvent::default(); + log.maybe_insert(log_schema().message_key_target_path(), message); + log + } + }; + + // We need to calculate the estimated json size of the event BEFORE enrichment. + events_received.emit(CountByteSize(1, log.estimated_json_encoded_size_of())); + + // Add channel + log_namespace.insert_source_metadata( + SplunkConfig::NAME, + &mut log, + Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))), + lookup::path!(CHANNEL), + channel.clone(), + ); + + if let Some(host) = host.as_ref() { + log_namespace.insert_source_metadata( + SplunkConfig::NAME, + &mut log, + log_schema().host_key().map(LegacyKey::InsertIfEmpty), + lookup::path!("host"), + host.clone(), + ); + } + + log_namespace.insert_standard_vector_source_metadata(&mut log, SplunkConfig::NAME, now); + + if let Some(batch) = batch.clone() { + log = log.with_batch_notifier(&batch); + } + + events.push(Event::from(log)); + } + + Ok(events) +} + #[derive(Clone, Copy, Debug, Snafu)] pub(crate) enum ApiError { MissingAuthorization, @@ -1367,6 +1504,27 @@ mod tests { impl Stream + Unpin + use<>, SocketAddr, PortGuard, + ) { + source_with_options( + token, + valid_tokens, + acknowledgements, + store_hec_token, + false, + ) + .await + } + + async fn source_with_options( + token: Option, + valid_tokens: Option<&[&str]>, + acknowledgements: Option, + store_hec_token: bool, + raw_line_splitting: bool, + ) -> ( + impl Stream + Unpin + use<>, + SocketAddr, + PortGuard, ) { let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered); let (_guard, address) = next_addr(); @@ -1381,6 +1539,7 @@ mod tests { tls: None, acknowledgements: acknowledgements.unwrap_or_default(), store_hec_token, + raw_line_splitting, log_namespace: None, keepalive: Default::default(), } @@ -1749,6 +1908,202 @@ mod tests { .await; } + #[tokio::test] + async fn raw_line_splitting_splits_on_newlines() { + assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { + // Create a source with raw_line_splitting enabled + let (source, address, _guard) = + source_with_options(Some(TOKEN.to_owned().into()), None, None, false, true).await; + + // Send newline-delimited JSON (NDJSON) data + let message = r#"{"field": "value1"} +{"field": "value2"} +{"field": "value3"}"#; + + assert_eq!(200, post(address, "services/collector/raw", message).await); + + // Should receive 3 separate events + let events = collect_n(source, 3).await; + assert_eq!(events.len(), 3); + + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + r#"{"field": "value1"}"#.into() + ); + assert_eq!( + events[1].as_log()[log_schema().message_key().unwrap().to_string()], + r#"{"field": "value2"}"#.into() + ); + assert_eq!( + events[2].as_log()[log_schema().message_key().unwrap().to_string()], + r#"{"field": "value3"}"#.into() + ); + + // Verify all events have required metadata + for event in events { + assert_eq!(event.as_log()[&super::CHANNEL], "channel".into()); + assert!(event.as_log().get_timestamp().is_some()); + assert_eq!( + event.as_log()[log_schema().source_type_key().unwrap().to_string()], + "splunk_hec".into() + ); + } + }) + .await; + } + + #[tokio::test] + async fn raw_line_splitting_handles_trailing_newline() { + assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { + let (source, address, _guard) = + source_with_options(Some(TOKEN.to_owned().into()), None, None, false, true).await; + + // Message with trailing newline + let message = "line1\nline2\n"; + + assert_eq!(200, post(address, "services/collector/raw", message).await); + + // Should receive 2 events (empty lines after trailing newline are filtered) + let events = collect_n(source, 2).await; + assert_eq!(events.len(), 2); + + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + "line1".into() + ); + assert_eq!( + events[1].as_log()[log_schema().message_key().unwrap().to_string()], + "line2".into() + ); + }) + .await; + } + + #[tokio::test] + async fn raw_line_splitting_disabled_keeps_original_behavior() { + assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { + // Use default source (raw_line_splitting = false) + let (source, address, _guard) = source(None).await; + + let message = "line1\nline2\nline3"; + + assert_eq!(200, post(address, "services/collector/raw", message).await); + + // Should receive 1 event with all lines combined + let event = collect_n(source, 1).await.remove(0); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + "line1\nline2\nline3".into() + ); + }) + .await; + } + + #[tokio::test] + async fn raw_line_splitting_cloudflare_ndjson() { + // Table-driven test cases for CloudFlare NDJSON format + // Each case: (name, input_message, expected_messages) + let cases: Vec<(&str, &str, Vec<&str>)> = vec![ + ( + "multiple_cloudflare_events", + // CloudFlare Logpush format with various HTTP methods + r#"{"ClientIP":"1.2.3.4","ClientRequestHost":"foo.example.com","ClientRequestMethod":"GET","EdgeResponseStatus":200} +{"ClientIP":"5.6.7.8","ClientRequestHost":"bar.example.com","ClientRequestMethod":"POST","EdgeResponseStatus":201} +{"ClientIP":"9.10.11.12","ClientRequestHost":"baz.example.com","ClientRequestMethod":"PUT","EdgeResponseStatus":204}"#, + vec![ + r#"{"ClientIP":"1.2.3.4","ClientRequestHost":"foo.example.com","ClientRequestMethod":"GET","EdgeResponseStatus":200}"#, + r#"{"ClientIP":"5.6.7.8","ClientRequestHost":"bar.example.com","ClientRequestMethod":"POST","EdgeResponseStatus":201}"#, + r#"{"ClientIP":"9.10.11.12","ClientRequestHost":"baz.example.com","ClientRequestMethod":"PUT","EdgeResponseStatus":204}"#, + ], + ), + ( + "trailing_newline", + // CloudFlare often sends data with a trailing newline + r#"{"ClientIP":"1.2.3.4","ClientRequestHost":"foo.example.com","ClientRequestMethod":"GET"} +{"ClientIP":"5.6.7.8","ClientRequestHost":"bar.example.com","ClientRequestMethod":"GET"} +"#, + vec![ + r#"{"ClientIP":"1.2.3.4","ClientRequestHost":"foo.example.com","ClientRequestMethod":"GET"}"#, + r#"{"ClientIP":"5.6.7.8","ClientRequestHost":"bar.example.com","ClientRequestMethod":"GET"}"#, + ], + ), + ( + "single_event", + r#"{"ClientIP":"1.2.3.4","ClientRequestHost":"example.com","ClientRequestMethod":"GET"}"#, + vec![ + r#"{"ClientIP":"1.2.3.4","ClientRequestHost":"example.com","ClientRequestMethod":"GET"}"#, + ], + ), + ( + "empty_lines_filtered", + // Empty lines between events should be filtered out + "line1\n\nline2\n\n\nline3", + vec!["line1", "line2", "line3"], + ), + ]; + + for (name, message, expected) in cases { + assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { + let (source, address, _guard) = + source_with_options(Some(TOKEN.to_owned().into()), None, None, false, true) + .await; + + assert_eq!( + 200, + post(address, "services/collector/raw", message).await, + "case '{}' failed: expected 200 response", + name + ); + + let events = collect_n(source, expected.len()).await; + assert_eq!( + events.len(), + expected.len(), + "case '{}': expected {} events, got {}", + name, + expected.len(), + events.len() + ); + + // Verify each event matches expected message + for (i, expected_msg) in expected.iter().enumerate() { + assert_eq!( + events[i].as_log()[log_schema().message_key().unwrap().to_string()], + (*expected_msg).into(), + "case '{}': event {} mismatch", + name, + i + ); + } + + // Verify all events have required metadata + for (i, event) in events.iter().enumerate() { + assert_eq!( + event.as_log()[&super::CHANNEL], + "channel".into(), + "case '{}': event {} missing channel", + name, + i + ); + assert!( + event.as_log().get_timestamp().is_some(), + "case '{}': event {} missing timestamp", + name, + i + ); + assert_eq!( + event.as_log()[log_schema().source_type_key().unwrap().to_string()], + "splunk_hec".into(), + "case '{}': event {} wrong source_type", + name, + i + ); + } + }) + .await; + } + } + #[tokio::test] async fn root() { assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {