Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog.d/15687_syslog_udp_received_metrics.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
The `syslog` source in UDP mode now emits the standard "received" metrics, aligning behavior with TCP and the Component Specification:

- `component_received_events_total`
- `component_received_event_bytes_total`
- `component_received_bytes_total`

This makes internal telemetry consistent and restores compliance checks for UDP syslog.

authors: sghall
84 changes: 82 additions & 2 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ use listenfd::ListenFd;
use smallvec::SmallVec;
use tokio_util::udp::UdpFramed;
use vector_lib::{
EstimatedJsonEncodedSizeOf,
codecs::{
BytesDecoder, OctetCountingDecoder, SyslogDeserializerConfig,
decoding::{Deserializer, Framer},
},
config::{LegacyKey, LogNamespace},
configurable::configurable_component,
internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
ipallowlist::IpAllowlistConfig,
lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, path},
};
Expand All @@ -29,7 +31,9 @@ use crate::{
DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput, log_schema,
},
event::Event,
internal_events::{SocketBindError, SocketMode, SocketReceiveError, StreamClosedError},
internal_events::{
SocketBindError, SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
},
net,
shutdown::ShutdownSignal,
sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket},
Expand Down Expand Up @@ -341,6 +345,8 @@ pub fn udp(
r#type = "udp"
);

let bytes_received = register!(BytesReceived::from(Protocol::UDP));

let mut stream = UdpFramed::new(
socket,
Decoder::new(
Expand All @@ -353,9 +359,17 @@ pub fn udp(
.take_until(shutdown)
.filter_map(|frame| {
let host_key = host_key.clone();
let bytes_received = bytes_received.clone();
async move {
match frame {
Ok(((mut events, _byte_size), received_from)) => {
Ok(((mut events, byte_size), received_from)) => {
let count = events.len();
bytes_received.emit(ByteSize(byte_size));
emit!(SocketEventsReceived {
mode: SocketMode::Udp,
byte_size: events.estimated_json_encoded_size_of(),
count,
});
let received_from = received_from.ip().to_string().into();
handle_events(&mut events, &host_key, Some(received_from), log_namespace);
Some(events.remove(0))
Expand Down Expand Up @@ -1185,6 +1199,72 @@ mod test {
.await;
}

#[tokio::test]
async fn test_udp_syslog() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let num_messages: usize = 1000;
let (_guard, in_addr) = next_addr();

// Create and spawn the source.
let config = SyslogConfig::from_mode(Mode::Udp {
address: in_addr.into(),
receive_buffer_bytes: None,
});

let key = ComponentKey::from("in");
let (tx, rx) = SourceSender::new_test();
let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
let shutdown_complete = shutdown.shutdown_tripwire();

let source = config
.build(context)
.await
.expect("source should not fail to build");
tokio::spawn(source);

// Give UDP a brief moment to start listening.
sleep(Duration::from_millis(150)).await;

let output_events = CountReceiver::receive_events(rx);

// Craft and send syslog messages as individual UDP datagrams.
let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
.map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
.collect();

let input_lines: Vec<String> =
input_messages.iter().map(|msg| msg.to_string()).collect();

let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
for line in input_lines {
socket.send_to(line.as_bytes(), in_addr).await.unwrap();
}

// Wait a short period of time to ensure the messages get sent.
sleep(Duration::from_secs(2)).await;

// Shutdown the source, and make sure we've got all the messages we sent in.
shutdown
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
shutdown_complete.await;

let output_events = output_events.await;
assert_eq!(output_events.len(), num_messages);

let output_messages: Vec<SyslogMessageRfc5424> = output_events
.into_iter()
.map(|mut e| {
e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
e.into()
})
.collect();
assert_eq!(output_messages, input_messages);
})
.await;
}

#[cfg(unix)]
#[tokio::test]
async fn test_unix_stream_syslog() {
Expand Down
Loading