Skip to content

Commit a46e45a

Browse files
committed
UDP input plugin: add support for NetFlow v5/v9 reception
1 parent 2a28efd commit a46e45a

File tree

1 file changed

+104
-15
lines changed
  • src/plugins/input/udp

1 file changed

+104
-15
lines changed

src/plugins/input/udp/udp.c

Lines changed: 104 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
* \file src/plugins/input/udp/udp.c
33
* \author Lukas Hutak <[email protected]>
44
* \brief UDP input plugin for IPFIXcol 2
5-
* \date 2018
5+
* \date 2018-2019
66
*/
77

8-
/* Copyright (C) 2018 CESNET, z.s.p.o.
8+
/* Copyright (C) 2018-2019 CESNET, z.s.p.o.
99
*
1010
* Redistribution and use in source and binary forms, with or without
1111
* modification, are permitted provided that the following conditions
@@ -77,11 +77,68 @@ IPX_API struct ipx_plugin_info ipx_plugin_info = {
7777
// Configuration flags (reserved for future use)
7878
.flags = 0,
7979
// Plugin version string (like "1.2.3")
80-
.version = "2.0.0",
80+
.version = "2.1.0",
8181
// Minimal IPFIXcol version string (like "1.2.3")
82-
.ipx_min = "2.0.0"
82+
.ipx_min = "2.1.0"
8383
};
8484

85+
/**
86+
* @struct nf5_msg_hdr
87+
* @brief NetFlow v5 Packet Header structure
88+
* @warning All values are stored in Network Byte Order!
89+
*/
90+
struct __attribute__((__packed__)) nf5_msg_hdr {
91+
/// NetFlow export format version number
92+
uint16_t version;
93+
/// Number of flows exported in this packet (1 - 30)
94+
uint16_t count;
95+
/// Current time in milliseconds since the export device booted
96+
uint32_t sys_uptime;
97+
/// Current count of seconds since 0000 UTC 1970
98+
uint32_t unix_sec;
99+
/// Residual nanoseconds since 0000 UTC 1970
100+
uint32_t unix_nsec;
101+
/// Sequence counter of total flows seen
102+
uint32_t flow_seq;
103+
/// Type of flow-switching engine
104+
uint8_t engine_type;
105+
/// Slot number of the flow-switching engine
106+
uint8_t engine_id;
107+
/// First two bits hold the sampling mode. Remaining 14 bits hold value of sampling interval
108+
uint16_t sampling_interval;
109+
};
110+
111+
/** Version identification in NetFlow v5 header */
112+
#define NF5_HDR_VERSION (5)
113+
/** Length of NetFlow v5 header (in bytes) */
114+
#define NF5_HDR_LEN (sizeof(struct nf5_msg_hdr))
115+
116+
/**
117+
* @struct nf9_msg_hdr
118+
* @brief NetFlow v9 Packet Header structure
119+
* @warning All values are stored in Network Byte Order!
120+
*/
121+
struct __attribute__((__packed__)) nf9_msg_hdr {
122+
/// Version of Flow Record format exported in this packet
123+
uint16_t version;
124+
/// The total number of records in the Export Packet
125+
uint16_t count;
126+
/// Time in milliseconds since this device was first booted
127+
uint32_t sys_uptime;
128+
/// Time in seconds since 0000 UTC 1970, at which the Export Packet leaves the Exporter
129+
uint32_t unix_sec;
130+
/// Incremental sequence counter of all Export Packets by the Exporter
131+
uint32_t seq_number;
132+
/// Exporter Observation Domain
133+
uint32_t source_id;
134+
};
135+
136+
/** Version identification in NetFlow v9 header */
137+
#define NF9_HDR_VERSION (9)
138+
/** Length of NetFlow v9 header (in bytes) */
139+
#define NF9_HDR_LEN (sizeof(struct nf9_msg_hdr))
140+
141+
85142
/** Description of a UDP Transport Session */
86143
struct udp_source {
87144
/** Identification of local socket (on which the data came) */
@@ -850,16 +907,14 @@ process_timer(struct udp_data *instance, int fd)
850907
}
851908

852909
/**
853-
* \brief Get an IPFIX message from a socket and pass it
910+
* \brief Get an IPFIX/NetFlow message from a socket and pass it
854911
* \param[in] instance Instance data
855912
* \param[in] sd File descriptor of the socket
856913
*/
857914
static void
858915
process_socket(struct udp_data *instance, int sd)
859916
{
860917
const char *err_str;
861-
struct fds_ipfix_msg_hdr hdr;
862-
static_assert(sizeof(hdr) == FDS_IPFIX_MSG_HDR_LEN, "Invalid size of IPFIX Message header");
863918

864919
// Get size of the message
865920
int msg_size;
@@ -870,9 +925,10 @@ process_socket(struct udp_data *instance, int sd)
870925
return;
871926
}
872927

873-
if (msg_size < FDS_IPFIX_MSG_HDR_LEN || msg_size > UINT16_MAX) {
928+
if (msg_size < (int) sizeof(uint16_t) || msg_size > UINT16_MAX) {
874929
// Remove the malformed message from the buffer
875-
ssize_t ret = recvfrom(sd, &hdr, sizeof(hdr), 0, NULL, NULL);
930+
uint16_t mini_buffer;
931+
ssize_t ret = recvfrom(sd, &mini_buffer, sizeof(mini_buffer), 0, NULL, NULL);
876932
if (ret == -1) {
877933
ipx_strerror(errno, err_str);
878934
IPX_CTX_WARNING(instance->ctx, "An error has occurred during reading a malformed "
@@ -917,11 +973,44 @@ process_socket(struct udp_data *instance, int sd)
917973
return;
918974
}
919975

920-
// Check IPFIX header
921-
const struct fds_ipfix_msg_hdr *msg_hdr = (const struct fds_ipfix_msg_hdr *) buffer;
922-
if (ntohs(msg_hdr->version) != FDS_IPFIX_VERSION
923-
|| ntohs(msg_hdr->length) < FDS_IPFIX_MSG_HDR_LEN) {
924-
IPX_CTX_ERROR(instance->ctx, "Receiver an invalid IPFIX Message header from '%s'. "
976+
// Check NetFlow/IPFIX header length and extract ODID/Source ID
977+
const uint16_t msg_ver = ntohs(*(uint16_t *) buffer);
978+
uint32_t msg_odid = 0;
979+
bool is_len_ok = true;
980+
981+
switch (msg_ver) {
982+
case FDS_IPFIX_VERSION: // IPFIX
983+
if (msg_size < FDS_IPFIX_MSG_HDR_LEN) {
984+
is_len_ok = false;
985+
break;
986+
}
987+
988+
msg_odid = ntohl(((const struct fds_ipfix_msg_hdr *) buffer)->odid);
989+
break;
990+
case NF9_HDR_VERSION: // NetFlow v9
991+
if (msg_size < (int) NF9_HDR_LEN) {
992+
is_len_ok = false;
993+
break;
994+
}
995+
996+
msg_odid = ntohl(((const struct nf9_msg_hdr *) buffer)->source_id);
997+
break;
998+
case NF5_HDR_VERSION: // NetFlow v5
999+
if (msg_size < (int) NF5_HDR_LEN) {
1000+
is_len_ok = false;
1001+
break;
1002+
}
1003+
1004+
// Source ID is not available in NetFlow v5 -> always 0
1005+
msg_odid = 0;
1006+
break;
1007+
default:
1008+
is_len_ok = false;
1009+
break;
1010+
}
1011+
1012+
if (!is_len_ok) {
1013+
IPX_CTX_ERROR(instance->ctx, "Receiver an invalid NetFlow/IPFIX Message header from '%s'. "
9251014
"The message will be dropped!", source->session->ident);
9261015
free(buffer);
9271016
return;
@@ -943,7 +1032,7 @@ process_socket(struct udp_data *instance, int sd)
9431032
// Create a message wrapper and pass the message
9441033
struct ipx_msg_ctx msg_ctx;
9451034
msg_ctx.session = source->session;
946-
msg_ctx.odid = ntohl(msg_hdr->odid);
1035+
msg_ctx.odid = msg_odid;
9471036
msg_ctx.stream = 0; // Streams are not supported over UDP
9481037

9491038
ipx_msg_ipfix_t *msg = ipx_msg_ipfix_create(instance->ctx, &msg_ctx, buffer, (uint16_t) msg_size);

0 commit comments

Comments
 (0)