Skip to content

Commit 2a28efd

Browse files
committed
NetFlow: integration of NetFlow v5/v9 converters to the collector parser
1 parent edeee93 commit 2a28efd

File tree

2 files changed

+183
-10
lines changed

2 files changed

+183
-10
lines changed

src/core/parser.c

Lines changed: 182 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
* \file src/core/parser.c
33
* \author Lukas Hutak <[email protected]>
44
* \brief IPFIX Message parser (source file)
5-
* \date 2018
5+
* \date 2018-2019
66
*/
77

8-
/* Copyright (C) 2018 CESNET, z.s.p.o.
8+
/* Copyright (C) 2018-2019 CESNET, z.s.p.o.
99
*
1010
* Redistribution and use in source and binary forms, with or without
1111
* modification, are permitted provided that the following conditions
@@ -49,6 +49,8 @@
4949
#include "parser.h"
5050
#include "verbose.h"
5151
#include "fpipe.h"
52+
#include "netflow2ipfix/netflow2ipfix.h"
53+
#include "netflow2ipfix/netflow_structs.h"
5254

5355
/** Default record of the parser structure */
5456
#define PARSER_DEF_RECS 8
@@ -77,6 +79,18 @@ enum stream_ctx_flags {
7779
SCF_BLOCK = (1 << 0),
7880
};
7981

82+
/** Type of source data */
83+
enum source_type {
84+
/// Unknown type of messages
85+
ST_UNKNOWN,
86+
/// IPFIX Messages
87+
ST_IPFIX,
88+
/// NetFlow v5 Messages
89+
ST_NETFLOW5,
90+
/// NetFlow v9 Messages
91+
ST_NETFLOW9
92+
};
93+
8094
/**
8195
* \brief Stream context
8296
* \note Represents parameters common to all streams within the same combination of Transport
@@ -87,6 +101,15 @@ struct stream_ctx {
87101
fds_tmgr_t *mgr;
88102
/** Connection flags (see #stream_ctx_flags) */
89103
uint16_t flags;
104+
/** Type of source messages (IPFIX/NetFlow) */
105+
enum source_type type;
106+
/** Messages converters to IPFIX (based on source type) */
107+
union {
108+
/** Converter from NetFlow v5 to IPFIX */
109+
ipx_nf5_conv_t *nf5;
110+
/** Converter from NetFlow v9 to IPFIX */
111+
ipx_nf9_conv_t *nf9;
112+
} converter;
90113

91114
/** Number of pre-allocated stream records */
92115
size_t infos_alloc;
@@ -207,6 +230,7 @@ stream_ctx_create(const struct ipx_parser *parser, const struct ipx_session *ses
207230
ctx->infos_alloc = STREAM_DEF_RECS;
208231
ctx->infos_valid = 0;
209232
ctx->flags = 0;
233+
ctx->type = ST_UNKNOWN; // type of flows is unknown
210234

211235
// Initialize a new template manager
212236
ctx->mgr = fds_tmgr_create(session->type);
@@ -241,6 +265,15 @@ static void
241265
stream_ctx_destroy(struct stream_ctx *ctx)
242266
{
243267
fds_tmgr_destroy(ctx->mgr);
268+
269+
// Destroy converters
270+
if (ctx->type == ST_NETFLOW5 && ctx->converter.nf5 != NULL) {
271+
ipx_nf5_conv_destroy(ctx->converter.nf5);
272+
}
273+
if (ctx->type == ST_NETFLOW9 && ctx->converter.nf9 != NULL) {
274+
ipx_nf9_conv_destroy(ctx->converter.nf9);
275+
}
276+
244277
free(ctx);
245278
}
246279

@@ -988,6 +1021,127 @@ parser_session_block_all(ipx_parser_t *parser)
9881021
}
9891022
}
9901023

1024+
/**
1025+
* \brief Convert a message with flow records to IPFIX Message format
1026+
*
1027+
* If the message is already in IPFIX Message format, no conversion is performed.
1028+
* The function also check if the message type matches previously seen type of messages
1029+
* in the stream.
1030+
*
1031+
* @param[in] parser Parser (for logging information)
1032+
* @param[in] rec Parser record of the current stream
1033+
* @param[in,out] msg Message wrapper with a message to convert
1034+
*
1035+
* @return #IPX_OK on success
1036+
* @return #IPX_ERR_DENIED if the message cannot be converted (i.e. not NetFlow or IPFIX)
1037+
* @return #IPX_ERR_FORMAT if the message is malformed and stream must be closed/reset
1038+
* @return #IPX_ERR_NOMEM in case of a memory allocation error
1039+
*/
1040+
static int
1041+
parser_convert(ipx_parser_t *parser, struct parser_rec *rec, ipx_msg_ipfix_t *msg)
1042+
{
1043+
const struct ipx_msg_ctx *msg_ctx = &msg->ctx;
1044+
const uint8_t *msg_data = msg->raw_pkt;
1045+
const uint16_t msg_size = msg->raw_size;
1046+
1047+
if (msg_size < sizeof(uint16_t)) {
1048+
return FDS_ERR_FORMAT;
1049+
}
1050+
1051+
// Determine the version of flow message
1052+
const uint16_t version = ntohs(*(uint16_t *) msg_data);
1053+
1054+
enum source_type type = rec->ctx->type;
1055+
if (type == ST_UNKNOWN) {
1056+
// This is the first message that we received for processing
1057+
switch (version) {
1058+
case FDS_IPFIX_VERSION:
1059+
// IPFIX
1060+
rec->ctx->type = ST_IPFIX;
1061+
break;
1062+
case IPX_NF9_VERSION:
1063+
// NetFlow v9 (+ initialize converter)
1064+
rec->ctx->type = ST_NETFLOW9;
1065+
rec->ctx->converter.nf9 = ipx_nf9_conv_init(parser->ident, parser->vlevel);
1066+
if (!rec->ctx->converter.nf9) {
1067+
PARSER_ERROR(parser, msg_ctx, "Failed to initialize NetFlow v9 converter!", '\0');
1068+
return IPX_ERR_NOMEM;
1069+
}
1070+
break;
1071+
case IPX_NF5_VERSION: {
1072+
// NetFlow v5 (+ initialize converter)
1073+
rec->ctx->type = ST_NETFLOW5;
1074+
1075+
// Determine suitable Template refresh interval
1076+
uint32_t tmplt_refresh = 0; // disabled
1077+
if (rec->session->type == FDS_SESSION_UDP) {
1078+
// Lifetime should be at least 3x higher than refresh interval
1079+
tmplt_refresh = rec->session->udp.lifetime.tmplts / 3U;
1080+
}
1081+
1082+
rec->ctx->converter.nf5 = ipx_nf5_conv_init(parser->ident, parser->vlevel,
1083+
tmplt_refresh, rec->odid);
1084+
if (!rec->ctx->converter.nf5) {
1085+
PARSER_ERROR(parser, msg_ctx, "Failed to initialize NetFlow v5 converter!", '\0');
1086+
return IPX_ERR_NOMEM;
1087+
}
1088+
}
1089+
break;
1090+
default:
1091+
PARSER_ERROR(parser, msg_ctx, "Unexpected NetFlow/IPFIX message version (expected: "
1092+
"5,9 or 10, got: %" PRIu16 ")", version);
1093+
return IPX_ERR_DENIED;
1094+
}
1095+
}
1096+
1097+
type = rec->ctx->type;
1098+
assert(type != ST_UNKNOWN);
1099+
1100+
// Perform convertion based on the stream type, if necessary
1101+
int conv_status = IPX_OK;
1102+
switch (type) {
1103+
case ST_IPFIX:
1104+
// IPFIX
1105+
if (version != FDS_IPFIX_VERSION) {
1106+
PARSER_ERROR(parser, msg_ctx, "Expected an IPFIX Message but non-IPFIX data has been "
1107+
"received (expected version: 10, got: %" PRIu16 ")", version);
1108+
return IPX_ERR_FORMAT;
1109+
}
1110+
1111+
// Nothing to convert...
1112+
break;
1113+
case ST_NETFLOW9:
1114+
// NetFlow v9 -> convert to IPFIX
1115+
if (version != IPX_NF9_VERSION) {
1116+
PARSER_ERROR(parser, msg_ctx, "Expected NetFlow v9 Message but non-NetFlow data has "
1117+
"been received (expected version: 9, got: %" PRIu16 ")", version);
1118+
return IPX_ERR_FORMAT;
1119+
}
1120+
1121+
conv_status = ipx_nf9_conv_process(rec->ctx->converter.nf9, msg);
1122+
break;
1123+
case ST_NETFLOW5:
1124+
// NetFlow v5 -> convert to IPFIX
1125+
if (version != IPX_NF5_VERSION) {
1126+
PARSER_ERROR(parser, msg_ctx, "Expected NetFlow v5 Message but non-NetFlow data has "
1127+
"been received (expected version: 5, got: %" PRIu16 ")", version);
1128+
return IPX_ERR_FORMAT;
1129+
}
1130+
1131+
conv_status = ipx_nf5_conv_process(rec->ctx->converter.nf5, msg);
1132+
break;
1133+
default:
1134+
PARSER_ERROR(parser, msg_ctx, "Unimplemented support for message format conversion!", '\0');
1135+
return IPX_ERR_DENIED;
1136+
}
1137+
1138+
if (conv_status != IPX_OK) {
1139+
return (conv_status == IPX_ERR_NOMEM) ? IPX_ERR_NOMEM : IPX_ERR_FORMAT;
1140+
}
1141+
1142+
return IPX_OK;
1143+
}
1144+
9911145
ipx_parser_t *
9921146
ipx_parser_create(const char *ident, enum ipx_verb_level vlevel)
9931147
{
@@ -1038,6 +1192,18 @@ ipx_parser_verb(ipx_parser_t *parser, enum ipx_verb_level *v_new, enum ipx_verb_
10381192

10391193
if (v_new != NULL) {
10401194
parser->vlevel = *v_new;
1195+
1196+
// Change verbosity of all converters too
1197+
for (size_t i = 0; i < parser->recs_valid; ++i) {
1198+
struct stream_ctx *ctx = parser->recs[i].ctx;
1199+
1200+
if (ctx->type == ST_NETFLOW5 && ctx->converter.nf5 != NULL) {
1201+
ipx_nf5_conv_verb(ctx->converter.nf5, *v_new);
1202+
}
1203+
if (ctx->type == ST_NETFLOW9 && ctx->converter.nf9 != NULL) {
1204+
ipx_nf9_conv_verb(ctx->converter.nf9, *v_new);
1205+
}
1206+
}
10411207
}
10421208
}
10431209

@@ -1046,7 +1212,6 @@ ipx_parser_process(ipx_parser_t *parser, ipx_msg_ipfix_t **ipfix, ipx_msg_garbag
10461212
{
10471213
*garbage = NULL;
10481214
const struct ipx_msg_ctx *msg_ctx = &(*ipfix)->ctx;
1049-
const struct fds_ipfix_msg_hdr *msg_data = (struct fds_ipfix_msg_hdr *)(*ipfix)->raw_pkt;
10501215

10511216
// Find a Stream Info
10521217
struct parser_rec *rec; // Combination of Transport Session, ODID
@@ -1069,14 +1234,21 @@ ipx_parser_process(ipx_parser_t *parser, ipx_msg_ipfix_t **ipfix, ipx_msg_garbag
10691234
assert(rec->odid == msg_ctx->odid);
10701235
assert(info->id == msg_ctx->stream);
10711236

1072-
// Check IPFIX Message header and its sequence number
1073-
if (ntohs(msg_data->version) != FDS_IPFIX_VERSION) {
1074-
// TODO: convert NetFlow to IPFIX
1075-
PARSER_ERROR(parser, msg_ctx, "IPFIX Message version doesn't match (expected: "
1076-
"%" PRIu16 ", got: %" PRIu16 ")", FDS_IPFIX_VERSION, ntohs(msg_data->version));
1077-
return IPX_ERR_FORMAT;
1237+
// Check if the message must be converted to IPFIX
1238+
int conv_status = parser_convert(parser, rec, *ipfix);
1239+
if (conv_status != IPX_OK) {
1240+
// Note: An appropriate error message has been printed in the converter
1241+
if (conv_status != FDS_ERR_NOMEM) {
1242+
return FDS_ERR_FORMAT;
1243+
}
1244+
1245+
return FDS_ERR_NOMEM;
10781246
}
10791247

1248+
// Check IPFIX Message header and its sequence number
1249+
const struct fds_ipfix_msg_hdr *msg_data = (struct fds_ipfix_msg_hdr *)(*ipfix)->raw_pkt;
1250+
assert(ntohs(msg_data->version) == FDS_IPFIX_VERSION && "Message to parse must be IPFIX!");
1251+
10801252
if (ntohs(msg_data->length) < FDS_IPFIX_MSG_HDR_LEN) {
10811253
PARSER_ERROR(parser, msg_ctx, "IPFIX Message Header size (%" PRIu16 ") is invalid "
10821254
"(total length of the message is smaller than the IPFIX Message Header structure).",
@@ -1106,6 +1278,7 @@ ipx_parser_process(ipx_parser_t *parser, ipx_msg_ipfix_t **ipfix, ipx_msg_garbag
11061278
}
11071279
}
11081280
}
1281+
info->flags |= SIF_SEEN;
11091282

11101283
// Configure a template manager
11111284
fds_tmgr_t *tmgr = rec->ctx->mgr;

src/core/parser.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ ipx_parser_verb(ipx_parser_t *parser, enum ipx_verb_level *v_new, enum ipx_verb_
107107
* the Message which holds a Template manager and expected sequence number of the Message.
108108
* If this is the first record from the TS and ODID, new info is created.
109109
* Second, if the message is in the form of NetFlow, it is converted to IPFIX.
110-
* Finally, parse and check validity of all (Data/Template Options Template) Sets in the
110+
* Finally, it parses and checks validity of all (Data/Template Options Template) Sets in the
111111
* IPFIX Message.
112112
*
113113
* \note

0 commit comments

Comments
 (0)