diff --git a/changelog.d/15687_syslog_udp_received_metrics.fix.md b/changelog.d/15687_syslog_udp_received_metrics.fix.md new file mode 100644 index 0000000000000..cd4805bc0675e --- /dev/null +++ b/changelog.d/15687_syslog_udp_received_metrics.fix.md @@ -0,0 +1,9 @@ +The `syslog` source in UDP mode now emits the standard "received" metrics, aligning behavior with TCP and the Component Specification: + +- `component_received_events_total` +- `component_received_event_bytes_total` +- `component_received_bytes_total` + +This makes internal telemetry consistent and restores compliance checks for UDP syslog. + +authors: sghall diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 57ed8d90f3a28..9a3d78b8e1bad 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -9,12 +9,14 @@ use listenfd::ListenFd; use smallvec::SmallVec; use tokio_util::udp::UdpFramed; use vector_lib::{ + EstimatedJsonEncodedSizeOf, codecs::{ BytesDecoder, OctetCountingDecoder, SyslogDeserializerConfig, decoding::{Deserializer, Framer}, }, config::{LegacyKey, LogNamespace}, configurable::configurable_component, + internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol}, ipallowlist::IpAllowlistConfig, lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, path}, }; @@ -29,7 +31,9 @@ use crate::{ DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput, log_schema, }, event::Event, - internal_events::{SocketBindError, SocketMode, SocketReceiveError, StreamClosedError}, + internal_events::{ + SocketBindError, SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError, + }, net, shutdown::ShutdownSignal, sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket}, @@ -341,6 +345,8 @@ pub fn udp( r#type = "udp" ); + let bytes_received = register!(BytesReceived::from(Protocol::UDP)); + let mut stream = UdpFramed::new( socket, Decoder::new( @@ -353,9 +359,17 @@ pub fn udp( .take_until(shutdown) .filter_map(|frame| { let host_key = host_key.clone(); + let bytes_received = bytes_received.clone(); async move { match frame { - Ok(((mut events, _byte_size), received_from)) => { + Ok(((mut events, byte_size), received_from)) => { + let count = events.len(); + bytes_received.emit(ByteSize(byte_size)); + emit!(SocketEventsReceived { + mode: SocketMode::Udp, + byte_size: events.estimated_json_encoded_size_of(), + count, + }); let received_from = received_from.ip().to_string().into(); handle_events(&mut events, &host_key, Some(received_from), log_namespace); Some(events.remove(0)) @@ -1185,6 +1199,72 @@ mod test { .await; } + #[tokio::test] + async fn test_udp_syslog() { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { + let num_messages: usize = 1000; + let (_guard, in_addr) = next_addr(); + + // Create and spawn the source. + let config = SyslogConfig::from_mode(Mode::Udp { + address: in_addr.into(), + receive_buffer_bytes: None, + }); + + let key = ComponentKey::from("in"); + let (tx, rx) = SourceSender::new_test(); + let (context, shutdown) = SourceContext::new_shutdown(&key, tx); + let shutdown_complete = shutdown.shutdown_tripwire(); + + let source = config + .build(context) + .await + .expect("source should not fail to build"); + tokio::spawn(source); + + // Give UDP a brief moment to start listening. + sleep(Duration::from_millis(150)).await; + + let output_events = CountReceiver::receive_events(rx); + + // Craft and send syslog messages as individual UDP datagrams. + let input_messages: Vec = (0..num_messages) + .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3)) + .collect(); + + let input_lines: Vec = + input_messages.iter().map(|msg| msg.to_string()).collect(); + + let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap(); + for line in input_lines { + socket.send_to(line.as_bytes(), in_addr).await.unwrap(); + } + + // Wait a short period of time to ensure the messages get sent. + sleep(Duration::from_secs(2)).await; + + // Shutdown the source, and make sure we've got all the messages we sent in. + shutdown + .shutdown_all(Some(Instant::now() + Duration::from_millis(100))) + .await; + shutdown_complete.await; + + let output_events = output_events.await; + assert_eq!(output_events.len(), num_messages); + + let output_messages: Vec = output_events + .into_iter() + .map(|mut e| { + e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error. + e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error. + e.into() + }) + .collect(); + assert_eq!(output_messages, input_messages); + }) + .await; + } + #[cfg(unix)] #[tokio::test] async fn test_unix_stream_syslog() {