Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog.d/15687_syslog_udp_received_metrics.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
The `syslog` source in UDP mode now emits the standard "received" metrics, aligning behavior with TCP and the Component Specification:

- `component_received_events_total`
- `component_received_event_bytes_total`
- `component_received_bytes_total`

This makes internal telemetry consistent and restores compliance checks for UDP syslog.

authors: sghall
83 changes: 81 additions & 2 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use vector_lib::{
},
config::{LegacyKey, LogNamespace},
configurable::configurable_component,
internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
ipallowlist::IpAllowlistConfig,
lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, path},
};
Expand All @@ -29,7 +30,9 @@ use crate::{
DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput, log_schema,
},
event::Event,
internal_events::{SocketBindError, SocketMode, SocketReceiveError, StreamClosedError},
internal_events::{
SocketBindError, SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
},
net,
shutdown::ShutdownSignal,
sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket},
Expand Down Expand Up @@ -341,6 +344,8 @@ pub fn udp(
r#type = "udp"
);

let bytes_received = register!(BytesReceived::from(Protocol::UDP));

let mut stream = UdpFramed::new(
socket,
Decoder::new(
Expand All @@ -353,9 +358,17 @@ pub fn udp(
.take_until(shutdown)
.filter_map(|frame| {
let host_key = host_key.clone();
let bytes_received = bytes_received.clone();
async move {
match frame {
Ok(((mut events, _byte_size), received_from)) => {
Ok(((mut events, byte_size), received_from)) => {
let count = events.len();
bytes_received.emit(ByteSize(byte_size));
emit!(SocketEventsReceived {
mode: SocketMode::Udp,
byte_size: byte_size.into(),
count,
});
let received_from = received_from.ip().to_string().into();
handle_events(&mut events, &host_key, Some(received_from), log_namespace);
Some(events.remove(0))
Expand Down Expand Up @@ -1185,6 +1198,72 @@ mod test {
.await;
}

#[tokio::test]
async fn test_udp_syslog() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let num_messages: usize = 1000;
let (_guard, in_addr) = next_addr();

// Create and spawn the source.
let config = SyslogConfig::from_mode(Mode::Udp {
address: in_addr.into(),
receive_buffer_bytes: None,
});

let key = ComponentKey::from("in");
let (tx, rx) = SourceSender::new_test();
let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
let shutdown_complete = shutdown.shutdown_tripwire();

let source = config
.build(context)
.await
.expect("source should not fail to build");
tokio::spawn(source);

// Give UDP a brief moment to start listening.
sleep(Duration::from_millis(150)).await;

let output_events = CountReceiver::receive_events(rx);

// Craft and send syslog messages as individual UDP datagrams.
let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
.map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
.collect();

let input_lines: Vec<String> =
input_messages.iter().map(|msg| msg.to_string()).collect();

let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
for line in input_lines {
socket.send_to(line.as_bytes(), in_addr).await.unwrap();
}

// Wait a short period of time to ensure the messages get sent.
sleep(Duration::from_secs(2)).await;

// Shutdown the source, and make sure we've got all the messages we sent in.
shutdown
.shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
.await;
shutdown_complete.await;

let output_events = output_events.await;
assert_eq!(output_events.len(), num_messages);

let output_messages: Vec<SyslogMessageRfc5424> = output_events
.into_iter()
.map(|mut e| {
e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
e.into()
})
.collect();
assert_eq!(output_messages, input_messages);
})
.await;
}

#[cfg(unix)]
#[tokio::test]
async fn test_unix_stream_syslog() {
Expand Down
Loading