From a93d78b11ce9b67c6dfbb9358067d986e6e8aa34 Mon Sep 17 00:00:00 2001 From: bachgarash Date: Wed, 15 Oct 2025 00:18:36 +0300 Subject: [PATCH 1/3] feat: initial implementation for snmp v1 and v2 traps #4567 This PR contains initial implementation for SNMP source to log v1 and v2 version traps. --- Cargo.lock | 60 ++++++ Cargo.toml | 4 + config/examples/snmp_trap.yaml | 61 ++++++ src/internal_events/mod.rs | 5 + src/internal_events/snmp_trap.rs | 27 +++ src/sources/mod.rs | 2 + src/sources/snmp_trap/mod.rs | 314 +++++++++++++++++++++++++++++++ src/sources/snmp_trap/parser.rs | 292 ++++++++++++++++++++++++++++ 8 files changed, 765 insertions(+) create mode 100644 config/examples/snmp_trap.yaml create mode 100644 src/internal_events/snmp_trap.rs create mode 100644 src/sources/snmp_trap/mod.rs create mode 100644 src/sources/snmp_trap/parser.rs diff --git a/Cargo.lock b/Cargo.lock index bb5478d575f5c..5ad8318fa8051 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -547,6 +547,44 @@ dependencies = [ "term 1.0.1", ] +[[package]] +name = "asn1-rs" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5493c3bedbacf7fd7382c6346bbd66687d12bbaad3a89a2d2c303ee6cf20b048" +dependencies = [ + "asn1-rs-derive", + "asn1-rs-impl", + "displaydoc", + "nom 7.1.3", + "num-traits", + "rusticata-macros", + "thiserror 1.0.68", +] + +[[package]] +name = "asn1-rs-derive" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" +dependencies = [ + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", + "synstructure", +] + +[[package]] +name = "asn1-rs-impl" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" +dependencies = [ + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", +] + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -9699,6 +9737,15 @@ dependencies = [ "semver", ] +[[package]] +name = "rusticata-macros" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" +dependencies = [ + "nom 7.1.3", +] + [[package]] name = "rustix" version = "0.37.27" @@ -10591,6 +10638,18 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +[[package]] +name = "snmp-parser" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f93180ada23141ed2643b7b4a7b20c7efea6af9c6bdc997c15b813bfa6e2c20" +dependencies = [ + "asn1-rs", + "nom 7.1.3", + "rusticata-macros", + "thiserror 1.0.68", +] + [[package]] name = "socket2" version = "0.4.10" @@ -12676,6 +12735,7 @@ dependencies = [ "smpl_jwt", "snafu 0.8.9", "snap", + "snmp-parser", "socket2 0.5.10", "sqlx", "stream-cancel", diff --git a/Cargo.toml b/Cargo.toml index 26e0f5b355f4c..5c08e38472e97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -185,6 +185,7 @@ serde = { version = "1.0.219", default-features = false, features = ["alloc", "d serde_json = { version = "1.0.143", default-features = false, features = ["raw_value", "std"] } serde_yaml = { version = "0.9.34", default-features = false } snafu = { version = "0.8.9", default-features = false, features = ["futures", "std"] } +snmp-parser = { version = "0.10.0", default-features = false } socket2 = { version = "0.5.10", default-features = false } tempfile = "3.23.0" tokio = { version = "1.45.1", default-features = false } @@ -215,6 +216,7 @@ proptest = { workspace = true, optional = true } proptest-derive = { workspace = true, optional = true } semver.workspace = true snafu.workspace = true +snmp-parser = { workspace = true, optional = true } uuid.workspace = true vrl.workspace = true @@ -624,6 +626,7 @@ sources-logs = [ "sources-file_descriptor", "sources-redis", "sources-socket", + "sources-snmp_trap", "sources-splunk_hec", "sources-stdin", "sources-syslog", @@ -698,6 +701,7 @@ sources-prometheus-pushgateway = ["sinks-prometheus", "sources-utils-http", "vec sources-pulsar = ["dep:apache-avro", "dep:pulsar"] sources-redis = ["dep:redis"] sources-socket = ["sources-utils-net", "tokio-util/net"] +sources-snmp_trap = ["sources-utils-net-udp", "dep:snmp-parser"] sources-splunk_hec = ["dep:roaring"] sources-statsd = ["sources-utils-net", "tokio-util/net"] sources-stdin = ["tokio-util/io"] diff --git a/config/examples/snmp_trap.yaml b/config/examples/snmp_trap.yaml new file mode 100644 index 0000000000000..f06f0ba06a891 --- /dev/null +++ b/config/examples/snmp_trap.yaml @@ -0,0 +1,61 @@ +# SNMP Trap Example +# ------------------------------------------------------------------------------ +# A simple example that demonstrates receiving SNMP traps over UDP. +# This source supports SNMPv1 and SNMPv2c traps. +# +# To test this configuration, you can use the snmptrap command: +# snmptrap -v 2c -c public localhost:1162 '' 1.3.6.1.4.1.8072.2.3.0.1 1.3.6.1.4.1.8072.2.3.2.1 i 123456 + +data_dir: "/var/lib/vector" + +# Receive SNMP traps on UDP port 1162 (using non-privileged port for testing) +# In production, SNMP traps are typically sent to UDP port 162 +sources: + snmp_traps: + type: snmp_trap + address: "0.0.0.0:1162" + # Optional: Set receive buffer size + receive_buffer_bytes: 65536 + +# Process and enrich the trap data +transforms: + process_traps: + type: remap + inputs: ["snmp_traps"] + source: | + # Add a timestamp if not present + if !exists(.timestamp) { + .timestamp = now() + } + + # Add a human-readable severity based on trap type + if .snmp_version == "1" { + .severity = if .generic_trap == 0 { + "info" + } else if .generic_trap == 2 || .generic_trap == 3 { + "warning" + } else if .generic_trap == 4 { + "critical" + } else { + "info" + } + } else { + .severity = "info" + } + +# Output to console for inspection +sinks: + console: + type: console + inputs: ["process_traps"] + encoding: + codec: json + + # Optionally, send to a file for persistent storage + file: + type: file + inputs: ["process_traps"] + path: "/var/log/vector/snmp_traps-%Y-%m-%d.log" + encoding: + codec: json + diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 7a45737149708..4504012a61478 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -122,6 +122,8 @@ mod sample; #[cfg(feature = "sinks-sematext")] mod sematext_metrics; mod socket; +#[cfg(feature = "sources-snmp_trap")] +mod snmp_trap; #[cfg(any(feature = "sources-splunk_hec", feature = "sinks-splunk_hec"))] mod splunk_hec; #[cfg(feature = "sinks-statsd")] @@ -295,3 +297,6 @@ pub use self::{ adaptive_concurrency::*, batch::*, common::*, conditions::*, encoding_transcode::*, heartbeat::*, http::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*, }; + +#[cfg(feature = "sources-snmp_trap")] +pub use self::snmp_trap::*; diff --git a/src/internal_events/snmp_trap.rs b/src/internal_events/snmp_trap.rs new file mode 100644 index 0000000000000..139b85b938ede --- /dev/null +++ b/src/internal_events/snmp_trap.rs @@ -0,0 +1,27 @@ +use metrics::counter; +use vector_lib::internal_event::InternalEvent; +use vector_lib::internal_event::{error_stage, error_type}; + +#[derive(Debug)] +pub struct SnmpTrapParseError { + pub error: String, +} + +impl InternalEvent for SnmpTrapParseError { + fn emit(self) { + error!( + message = "Error parsing SNMP trap.", + error = %self.error, + error_type = error_type::PARSER_FAILED, + stage = error_stage::PROCESSING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_type" => error_type::PARSER_FAILED, + "stage" => error_stage::PROCESSING, + ) + .increment(1); + } +} + diff --git a/src/sources/mod.rs b/src/sources/mod.rs index 77258bdb77a46..d0307010d43ac 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -82,6 +82,8 @@ pub mod pulsar; pub mod redis; #[cfg(feature = "sources-socket")] pub mod socket; +#[cfg(feature = "sources-snmp_trap")] +pub mod snmp_trap; #[cfg(feature = "sources-splunk_hec")] pub mod splunk_hec; #[cfg(feature = "sources-static_metrics")] diff --git a/src/sources/snmp_trap/mod.rs b/src/sources/snmp_trap/mod.rs new file mode 100644 index 0000000000000..3a46e9618e1be --- /dev/null +++ b/src/sources/snmp_trap/mod.rs @@ -0,0 +1,314 @@ +use std::net::SocketAddr; + +use bytes::Bytes; +use futures::StreamExt; +use listenfd::ListenFd; +use smallvec::SmallVec; +use tokio_util::udp::UdpFramed; +use vector_lib::{ + EstimatedJsonEncodedSizeOf, + codecs::{BytesDecoder, decoding::{self, Framer}}, + config::{LogNamespace, log_schema}, + configurable::configurable_component, + internal_event::InternalEventHandle, + lookup::lookup_v2::OptionalValuePath, +}; + +use crate::{ + SourceSender, + codecs::Decoder, + config::{DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput}, + event::Event, + internal_events::{ + EventsReceived, SocketBindError, SocketBytesReceived, SocketMode, SocketReceiveError, + StreamClosedError, + }, + net, + shutdown::ShutdownSignal, + sources::util::net::{SocketListenAddr, try_bind_udp_socket}, +}; + +mod parser; + +use parser::parse_snmp_trap; + +/// Configuration for the `snmp_trap` source. +#[configurable_component(source( + "snmp_trap", + "Receive SNMP traps over UDP." +))] +#[derive(Clone, Debug)] +pub struct SnmpTrapConfig { + /// The address to listen for SNMP traps on. + /// + /// SNMP traps are typically sent to UDP port 162. + #[configurable(metadata(docs::examples = "0.0.0.0:162"))] + #[configurable(metadata(docs::examples = "127.0.0.1:1162"))] + address: SocketListenAddr, + + /// The size of the receive buffer used for the listening socket. + /// + /// This should not typically need to be changed. + #[configurable(metadata(docs::type_unit = "bytes"))] + receive_buffer_bytes: Option, + + /// Overrides the name of the log field used to add the peer host to each event. + /// + /// The value is the peer host's address, including the port. For example, `192.168.1.1:162`. + /// + /// By default, the [global `log_schema.host_key` option][global_host_key] is used. + /// + /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key + host_key: Option, + + /// The namespace to use for logs. This overrides the global setting. + #[configurable(metadata(docs::hidden))] + #[serde(default)] + log_namespace: Option, +} + +impl Default for SnmpTrapConfig { + fn default() -> Self { + Self { + address: SocketListenAddr::SocketAddr("0.0.0.0:162".parse().unwrap()), + receive_buffer_bytes: None, + host_key: None, + log_namespace: None, + } + } +} + +impl GenerateConfig for SnmpTrapConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self::default()).unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "snmp_trap")] +impl SourceConfig for SnmpTrapConfig { + async fn build(&self, cx: SourceContext) -> crate::Result { + let log_namespace = cx.log_namespace(self.log_namespace); + let host_key = self + .host_key + .clone() + .and_then(|k| k.path) + .or_else(|| log_schema().host_key().cloned()); + + Ok(Box::pin(snmp_trap_udp( + self.address, + self.receive_buffer_bytes, + host_key, + cx.shutdown, + log_namespace, + cx.out, + ))) + } + + fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { + let _log_namespace = global_log_namespace.merge(self.log_namespace); + + // Define a simple schema for SNMP trap logs + let schema_definition = vector_lib::schema::Definition::empty_legacy_namespace() + .with_standard_vector_source_metadata(); + + vec![SourceOutput::new_maybe_logs( + DataType::Log, + schema_definition, + )] + } + + fn resources(&self) -> Vec { + vec![self.address.as_udp_resource()] + } + + fn can_acknowledge(&self) -> bool { + false + } +} + +#[derive(Clone)] +struct SnmpTrapDeserializer { + events_received: vector_lib::internal_event::Registered, +} + +impl SnmpTrapDeserializer { + fn new() -> Self { + Self { + events_received: register!(EventsReceived), + } + } +} + +impl decoding::format::Deserializer for SnmpTrapDeserializer { + fn parse( + &self, + bytes: Bytes, + _log_namespace: LogNamespace, + ) -> crate::Result> { + // Emit bytes received metric + emit!(SocketBytesReceived { + mode: SocketMode::Udp, + byte_size: bytes.len(), + }); + + // We need to get the source address from somewhere, but the decoder doesn't have access to it. + // For now, we'll use a placeholder and set it properly in the frame handler. + // This is a limitation of the current codec architecture. + let dummy_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + + match parse_snmp_trap(&bytes, dummy_addr) { + Ok(events) => { + let count = events.len(); + let byte_size = events.estimated_json_encoded_size_of(); + self.events_received.emit(vector_lib::internal_event::CountByteSize(count, byte_size)); + Ok(events) + } + Err(error) => { + emit!(crate::internal_events::SnmpTrapParseError { + error: format!("{}", error), + }); + // Return empty vec on parse error + Ok(SmallVec::new()) + } + } + } +} + +async fn snmp_trap_udp( + address: SocketListenAddr, + receive_buffer_bytes: Option, + host_key: Option, + shutdown: ShutdownSignal, + _log_namespace: LogNamespace, + mut out: SourceSender, +) -> Result<(), ()> { + let listenfd = ListenFd::from_env(); + let socket = try_bind_udp_socket(address, listenfd) + .await + .map_err(|error| { + emit!(SocketBindError { + mode: SocketMode::Udp, + error, + }) + })?; + + if let Some(receive_buffer_bytes) = receive_buffer_bytes { + if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) { + warn!(message = "Failed configuring receive buffer size on UDP socket.", %error); + } + } + + info!( + message = "Listening for SNMP traps.", + addr = %address, + r#type = "udp" + ); + + let codec = Decoder::new( + Framer::Bytes(BytesDecoder::new()), + decoding::Deserializer::Boxed(Box::new(SnmpTrapDeserializer::new())), + ); + + let mut stream = UdpFramed::new(socket, codec).take_until(shutdown); + + while let Some(frame) = stream.next().await { + match frame { + Ok(((mut events, _byte_size), peer_addr)) => { + // Now we have access to the peer address, so we can set it on the events + for event in &mut events { + if let Event::Log(log) = event { + // Override the dummy source_address with the real peer address + log.insert("source_address", peer_addr.to_string()); + + // Add host field if configured + if let Some(host_key) = &host_key { + log.insert( + (vector_lib::lookup::PathPrefix::Event, host_key), + peer_addr.to_string(), + ); + } + } + } + + let count = events.len(); + if out.send_batch(events).await.is_err() { + emit!(StreamClosedError { count }); + } + } + Err(error) => { + emit!(SocketReceiveError { + mode: SocketMode::Udp, + error: &error, + }); + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_util::{ + components::{assert_source_compliance, SOURCE_TAGS}, + next_addr, + }; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } + + #[tokio::test] + async fn test_udp_socket_bind() { + let addr = next_addr(); + let config = SnmpTrapConfig { + address: SocketListenAddr::SocketAddr(addr), + receive_buffer_bytes: None, + host_key: None, + log_namespace: None, + }; + + let (tx, _rx) = SourceSender::new_test(); + // This should successfully bind + let source = SourceConfig::build(&config, SourceContext::new_test(tx, None)) + .await + .expect("Failed to build source"); + + // Just verify we can create the source + drop(source); + } + + #[tokio::test] + async fn test_config_default() { + let config = SnmpTrapConfig::default(); + assert_eq!( + config.address, + SocketListenAddr::SocketAddr("0.0.0.0:162".parse().unwrap()) + ); + } + + #[tokio::test] + async fn test_source_compliance() { + let _result = assert_source_compliance(&SOURCE_TAGS, async { + let addr = next_addr(); + let mut host_path = vector_lib::lookup::OwnedValuePath::root(); + host_path.push_field("host"); + let config = SnmpTrapConfig { + address: SocketListenAddr::SocketAddr(addr), + receive_buffer_bytes: Some(65536), + host_key: Some(OptionalValuePath::from(host_path)), + log_namespace: None, + }; + + let (tx, _rx) = SourceSender::new_test(); + SourceConfig::build(&config, SourceContext::new_test(tx, None)) + .await + .unwrap() + }) + .await; + } +} + diff --git a/src/sources/snmp_trap/parser.rs b/src/sources/snmp_trap/parser.rs new file mode 100644 index 0000000000000..8bf57ec9c8db4 --- /dev/null +++ b/src/sources/snmp_trap/parser.rs @@ -0,0 +1,292 @@ +use bytes::Bytes; +use chrono::Utc; +use serde_json::json; +use smallvec::{SmallVec, smallvec}; +use snmp_parser::{ + parse_snmp_v1, parse_snmp_v2c, + snmp::{SnmpMessage, SnmpPdu}, +}; +use std::net::SocketAddr; +use vector_lib::{ + config::log_schema, + event::{Event, LogEvent}, +}; + +/// Error types for SNMP trap parsing +#[derive(Debug)] +pub enum ParseError { + SnmpParseError(String), + InvalidPduType, +} + +impl std::fmt::Display for ParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ParseError::SnmpParseError(msg) => write!(f, "Failed to parse SNMP message: {}", msg), + ParseError::InvalidPduType => write!(f, "Invalid PDU type for trap"), + } + } +} + +impl std::error::Error for ParseError {} + +/// Parse an SNMP trap from raw bytes and convert to Vector log event +pub fn parse_snmp_trap( + data: &Bytes, + source_addr: SocketAddr, +) -> Result, ParseError> { + // Try parsing as SNMPv1 first + match parse_snmp_v1(data) { + Ok((_, message)) => { + if message.version == 0 { + // SNMPv1 (version field is 0) + return parse_v1_trap(message, source_addr); + } + } + Err(_) => { + // Not v1, try v2c + } + } + + // Try parsing as SNMPv2c + match parse_snmp_v2c(data) { + Ok((_, message)) => { + if message.version == 1 { + // SNMPv2c (version field is 1) + return parse_v2c_trap(message, source_addr); + } + } + Err(e) => { + return Err(ParseError::SnmpParseError(format!("{:?}", e))); + } + } + + Err(ParseError::SnmpParseError( + "Could not parse as v1 or v2c".to_string(), + )) +} + +fn parse_v1_trap( + message: SnmpMessage, + source_addr: SocketAddr, +) -> Result, ParseError> { + match message.pdu { + SnmpPdu::TrapV1(trap) => { + let mut log = LogEvent::default(); + + // Add timestamp + if let Some(timestamp_key) = log_schema().timestamp_key() { + log.insert((vector_lib::lookup::PathPrefix::Event, timestamp_key), Utc::now()); + } + + // Basic fields + log.insert("snmp_version", "1"); + log.insert("source_address", source_addr.to_string()); + log.insert("community", String::from_utf8_lossy(message.community.as_bytes()).to_string()); + + // Trap-specific fields + log.insert("enterprise_oid", trap.enterprise.to_string()); + + // Format agent address + let agent_addr_str = match trap.agent_addr { + snmp_parser::snmp::NetworkAddress::IPv4(ip) => { + let octets = ip.octets(); + format!("{}.{}.{}.{}", octets[0], octets[1], octets[2], octets[3]) + } + }; + log.insert("agent_address", agent_addr_str); + + log.insert("generic_trap", trap.generic_trap.0 as i64); + log.insert("specific_trap", trap.specific_trap as i64); + log.insert("uptime", trap.timestamp as i64); + + // Parse varbinds + let mut varbinds = Vec::new(); + for var in &trap.var { + let oid = var.oid.to_string(); + let value = match &var.val { + snmp_parser::snmp::VarBindValue::Value(v) => format_object_value(v), + snmp_parser::snmp::VarBindValue::Unspecified => "unspecified".to_string(), + snmp_parser::snmp::VarBindValue::NoSuchObject => "noSuchObject".to_string(), + snmp_parser::snmp::VarBindValue::NoSuchInstance => "noSuchInstance".to_string(), + snmp_parser::snmp::VarBindValue::EndOfMibView => "endOfMibView".to_string(), + }; + varbinds.push(json!({ + "oid": oid, + "value": value, + })); + } + log.insert("varbinds", varbinds); + + // Add a human-readable message + let trap_type = match trap.generic_trap.0 { + 0 => "coldStart", + 1 => "warmStart", + 2 => "linkDown", + 3 => "linkUp", + 4 => "authenticationFailure", + 5 => "egpNeighborLoss", + 6 => "enterpriseSpecific", + _ => "unknown", + }; + log.insert( + "message", + format!( + "SNMPv1 trap from {} ({}): {}", + source_addr, trap.enterprise, trap_type + ), + ); + + Ok(smallvec![Event::Log(log)]) + } + _ => Err(ParseError::InvalidPduType), + } +} + +fn parse_v2c_trap( + message: SnmpMessage, + source_addr: SocketAddr, +) -> Result, ParseError> { + match message.pdu { + SnmpPdu::Generic(pdu) => { + let mut log = LogEvent::default(); + + // Add timestamp + if let Some(timestamp_key) = log_schema().timestamp_key() { + log.insert((vector_lib::lookup::PathPrefix::Event, timestamp_key), Utc::now()); + } + + // Basic fields + log.insert("snmp_version", "2c"); + log.insert("source_address", source_addr.to_string()); + log.insert("community", String::from_utf8_lossy(message.community.as_bytes()).to_string()); + + // SNMPv2 traps include request_id + log.insert("request_id", pdu.req_id as i64); + + // Parse varbinds to extract sysUpTime and snmpTrapOID + let mut varbinds = Vec::new(); + let mut uptime = None; + let mut trap_oid = None; + + for var in &pdu.var { + let oid = var.oid.to_string(); + let value = match &var.val { + snmp_parser::snmp::VarBindValue::Value(v) => format_object_value(&v), + snmp_parser::snmp::VarBindValue::Unspecified => "unspecified".to_string(), + snmp_parser::snmp::VarBindValue::NoSuchObject => "noSuchObject".to_string(), + snmp_parser::snmp::VarBindValue::NoSuchInstance => "noSuchInstance".to_string(), + snmp_parser::snmp::VarBindValue::EndOfMibView => "endOfMibView".to_string(), + }; + + // sysUpTime is typically the first varbind (OID 1.3.6.1.2.1.1.3.0) + if oid.starts_with("1.3.6.1.2.1.1.3") { + uptime = Some(value.clone()); + } + + // snmpTrapOID is typically the second varbind (OID 1.3.6.1.6.3.1.1.4.1.0) + if oid.starts_with("1.3.6.1.6.3.1.1.4.1") { + trap_oid = Some(value.clone()); + } + + varbinds.push(json!({ + "oid": oid, + "value": value, + })); + } + + if let Some(uptime_val) = uptime { + log.insert("uptime", uptime_val); + } + + if let Some(trap_oid_val) = &trap_oid { + log.insert("trap_oid", trap_oid_val.clone()); + } + + log.insert("varbinds", varbinds); + + // Add a human-readable message + let trap_desc = trap_oid + .as_ref() + .map(|o| o.as_str()) + .unwrap_or("unknown"); + log.insert( + "message", + format!("SNMPv2c trap from {}: {}", source_addr, trap_desc), + ); + + Ok(smallvec![Event::Log(log)]) + } + _ => Err(ParseError::InvalidPduType), + } +} + +/// Format an SNMP object value as a string +fn format_object_value(val: &snmp_parser::snmp::ObjectSyntax) -> String { + use snmp_parser::snmp::{NetworkAddress, ObjectSyntax}; + + match val { + ObjectSyntax::Number(n) => n.to_string(), + ObjectSyntax::String(s) => String::from_utf8_lossy(s).to_string(), + ObjectSyntax::Object(oid) => oid.to_string(), + ObjectSyntax::BitString(bits) => format!("BitString({:?})", bits), + ObjectSyntax::IpAddress(net_addr) => match net_addr { + NetworkAddress::IPv4(ip) => { + let octets = ip.octets(); + format!("{}.{}.{}.{}", octets[0], octets[1], octets[2], octets[3]) + } + }, + ObjectSyntax::Counter32(c) => c.to_string(), + ObjectSyntax::Gauge32(g) => g.to_string(), + ObjectSyntax::TimeTicks(t) => t.to_string(), + ObjectSyntax::Opaque(o) => format!("Opaque({:?})", o), + ObjectSyntax::Counter64(c) => c.to_string(), + ObjectSyntax::UInteger32(u) => u.to_string(), + // Handle any other variants + _ => format!("{:?}", val), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::{IpAddr, Ipv4Addr}; + + #[test] + fn test_format_object_value() { + use snmp_parser::snmp::{NetworkAddress, ObjectSyntax}; + + assert_eq!(format_object_value(&ObjectSyntax::Number(42)), "42"); + assert_eq!( + format_object_value(&ObjectSyntax::String(b"test")), + "test" + ); + assert_eq!(format_object_value(&ObjectSyntax::Counter32(100)), "100"); + assert_eq!(format_object_value(&ObjectSyntax::Gauge32(200)), "200"); + assert_eq!(format_object_value(&ObjectSyntax::TimeTicks(300)), "300"); + assert_eq!( + format_object_value(&ObjectSyntax::IpAddress(NetworkAddress::IPv4( + Ipv4Addr::new(192, 168, 1, 1) + ))), + "192.168.1.1" + ); + } + + #[test] + fn test_parse_invalid_data() { + let data = Bytes::from("invalid data"); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let result = parse_snmp_trap(&data, addr); + assert!(result.is_err()); + } + + #[test] + fn test_parse_empty_data() { + let data = Bytes::from(""); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let result = parse_snmp_trap(&data, addr); + assert!(result.is_err()); + } +} + + From 0eb1bde705dd818ff87f11d26e163ba6ef44a33a Mon Sep 17 00:00:00 2001 From: bachgarash Date: Wed, 15 Oct 2025 10:01:44 +0300 Subject: [PATCH 2/3] feat: add changelog fragment for snmp trap source implementation --- changelog.d/snmp_trap_source.feature.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 changelog.d/snmp_trap_source.feature.md diff --git a/changelog.d/snmp_trap_source.feature.md b/changelog.d/snmp_trap_source.feature.md new file mode 100644 index 0000000000000..ef146a5717f2c --- /dev/null +++ b/changelog.d/snmp_trap_source.feature.md @@ -0,0 +1,6 @@ +A new `snmp_trap` source has been added to receive SNMP v1 and v2c trap messages over UDP (issue #4567) + + +The source listens for SNMP traps on a configurable UDP port (typically port 162) and converts them into log events. Each trap is parsed and its fields are extracted into structured log data, including community string, version, trap type, enterprise OID, and variable bindings. + +authors: bachgarash From 4a2ce21e2feeb3958ff8e304b5d64880d319505f Mon Sep 17 00:00:00 2001 From: bachgarash Date: Sun, 19 Oct 2025 21:19:42 +0300 Subject: [PATCH 3/3] feat: run generate-component-docs to generate some docs --- .tool-versions | 1 + .../sinks/generated/greptimedb_logs.cue | 3 +- .../sources/generated/snmp_trap.cue | 35 +++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 .tool-versions create mode 100644 website/cue/reference/components/sources/generated/snmp_trap.cue diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 0000000000000..5aa8e0c30b8d6 --- /dev/null +++ b/.tool-versions @@ -0,0 +1 @@ +ruby 3.3.6 diff --git a/website/cue/reference/components/sinks/generated/greptimedb_logs.cue b/website/cue/reference/components/sinks/generated/greptimedb_logs.cue index ae7e4a444700c..f93188f9d6a3e 100644 --- a/website/cue/reference/components/sinks/generated/greptimedb_logs.cue +++ b/website/cue/reference/components/sinks/generated/greptimedb_logs.cue @@ -153,7 +153,8 @@ generated: components: sinks: greptimedb_logs: configuration: { """ required: false type: object: { - examples: [{}] + examples: [{}, + ] options: "*": { description: "Extra header key-value pairs." required: true diff --git a/website/cue/reference/components/sources/generated/snmp_trap.cue b/website/cue/reference/components/sources/generated/snmp_trap.cue new file mode 100644 index 0000000000000..5289d4c915b84 --- /dev/null +++ b/website/cue/reference/components/sources/generated/snmp_trap.cue @@ -0,0 +1,35 @@ +package metadata + +generated: components: sources: snmp_trap: configuration: { + address: { + description: """ + The address to listen for SNMP traps on. + + SNMP traps are typically sent to UDP port 162. + """ + required: true + type: string: examples: ["0.0.0.0:9000", "systemd", "systemd#3", "0.0.0.0:162", "127.0.0.1:1162"] + } + host_key: { + description: """ + Overrides the name of the log field used to add the peer host to each event. + + The value is the peer host's address, including the port. For example, `192.168.1.1:162`. + + By default, the [global `log_schema.host_key` option][global_host_key] is used. + + [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key + """ + required: false + type: string: {} + } + receive_buffer_bytes: { + description: """ + The size of the receive buffer used for the listening socket. + + This should not typically need to be changed. + """ + required: false + type: uint: unit: "bytes" + } +}