Skip to content

Commit 4cd508a

Browse files
authored
Merge pull request #31 from CESNET/hutak-ipfix-file
Add IPFIX File input plugin
2 parents 8ca71e4 + e7aca24 commit 4cd508a

File tree

11 files changed

+875
-11
lines changed

11 files changed

+875
-11
lines changed

README.rst

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,25 @@ No problem, pick any combination of plugins.
2929
Available plugins
3030
-----------------
3131

32-
**Input plugins** - receive IPFIX data. Each can be configured to to listen on a specific
32+
**Input plugins** - receive NetFlow/IPFIX data. Each can be configured to listen on a specific
3333
network interface and a port. Multiple instances of these plugins can run concurrently.
3434

35-
- `UDP <src/plugins/input/udp>`_ - receives NetFlow v5/v9 and IPFIX over UDP
36-
- `TCP <src/plugins/input/tcp>`_ - receives IPFIX over TCP
35+
- `UDP <src/plugins/input/udp>`_ - receive NetFlow v5/v9 and IPFIX over UDP
36+
- `TCP <src/plugins/input/tcp>`_ - receive IPFIX over TCP
37+
- `IPFIX File <src/plugins/input/ipfix>`_ - read flow data from IPFIX File
3738

3839
**Intermediate plugins** - modify, enrich and filter flow records.
3940

40-
- `anonymization <src/plugins/intermediate/anonymization/>`_ - anonymize IP addresses
41+
- `Anonymization <src/plugins/intermediate/anonymization/>`_ - anonymize IP addresses
4142
(in flow records) with Crypto-PAn algorithm
4243

4344
**Output plugins** - store or forward your flows.
4445

45-
- `FDS file <src/plugins/output/fds>`_ - store all flows in FDS file format (efficient long-term storage)
46+
- `FDS File <src/plugins/output/fds>`_ - store all flows in FDS file format (efficient long-term storage)
4647
- `JSON <src/plugins/output/json>`_ - convert flow records to JSON and send/store them
4748
- `Viewer <src/plugins/output/viewer>`_ - convert IPFIX into plain text and print
4849
it on standard output
49-
- `IPFIX file <src/plugins/output/ipfix>`_ - store all flows in IPFIX File format
50+
- `IPFIX File <src/plugins/output/ipfix>`_ - store all flows in IPFIX File format
5051
- `Time Check <src/plugins/output/timecheck>`_ - flow timestamp check
5152
- `Dummy <src/plugins/output/dummy>`_ - simple output module example
5253
- `lnfstore <extra_plugins/output/lnfstore>`_ (*) - store all flows in nfdump compatible

src/core/configurator/configurator.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -373,8 +373,8 @@ ipx_configurator::termination_handle(const struct ipx_cpipe_req &req, ipx_contro
373373
// First of all, check if termination process has been completed
374374
if (req.type == IPX_CPIPE_TYPE_TERM_DONE) {
375375
if (m_state != STATUS::STOP_SLOW && m_state != STATUS::STOP_FAST) {
376-
IPX_ERROR(comp_str, "Got a termination done notification, but the termination process "
377-
"is not in progress!", '\0');
376+
IPX_ERROR(comp_str, "[internal] Got a termination done notification, but the "
377+
"termination process is not in progress!", '\0');
378378
return false;
379379
}
380380

@@ -473,6 +473,7 @@ ipx_configurator::termination_stop_all()
473473
{
474474
for (auto &it : m_running_inputs) {
475475
it->set_processing(false);
476+
it->set_parser_processing(false);
476477
};
477478
for (auto &it : m_running_inter) {
478479
it->set_processing(false);
@@ -508,8 +509,16 @@ ipx_configurator::termination_stop_partly(const ipx_ctx_t *ctx)
508509
it->set_processing(false);
509510
}
510511

511-
if (ctx_info->type == IPX_PT_INPUT || ctx_info == &ipx_plugin_parser_info) {
512-
// The termination has been invoked by an input plugin or its message parser
512+
if (ctx_info->type == IPX_PT_INPUT) {
513+
return;
514+
}
515+
516+
// Stop all NetFlow/IPFIX message parsers
517+
for (auto &it : m_running_inputs) {
518+
it->set_parser_processing(false);
519+
}
520+
521+
if (ctx_info == &ipx_plugin_parser_info) {
513522
return;
514523
}
515524

src/core/configurator/instance_input.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,5 +183,11 @@ void
183183
ipx_instance_input::set_processing(bool en)
184184
{
185185
ipx_ctx_processing_set(_ctx, en);
186+
187+
}
188+
189+
void
190+
ipx_instance_input::set_parser_processing(bool en)
191+
{
186192
ipx_ctx_processing_set(_parser_ctx, en);
187193
}

src/core/configurator/instance_input.hpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,26 @@ class ipx_instance_input : public ipx_instance {
162162
extensions_resolve(ipx_cfg_extensions *ext_mgr) override;
163163

164164
/**
165-
* \brief Enable/disable processing of data messages (IPFIX and Transport Session)
165+
* \brief Enable/disable processing of data messages by the plugin (IPFIX and Transport Session)
166+
*
167+
* \warning This doesn't affect NetFlow/IPFIX Message parser, see set_parser_processing()
166168
* \note By default, data processing is enabled.
167169
* \see ipx_ctx_processing() for more details
168170
* \param[in] en Enable/disable processing
169171
*/
170172
void
171173
set_processing(bool en) override;
174+
175+
/**
176+
* \brief Enable/disable processing of data messages by the parser (IPFIX and Transport Session)
177+
*
178+
* \warning This doesn't affect the input plugin, see set_processing()
179+
* \note By default, data processing is enabled.
180+
* \see ipx_ctx_processing() for more details
181+
* \param[in] en Enable/disable processing
182+
*/
183+
void
184+
set_parser_processing(bool en);
172185
};
173186

174187
#endif //IPFIXCOL_INSTANCE_INPUT_HPP

src/plugins/input/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# List of input plugins to build and install
22
add_subdirectory(dummy)
3+
add_subdirectory(ipfix)
34
add_subdirectory(tcp)
45
add_subdirectory(udp)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Create a linkable module
2+
add_library(ipfix-input MODULE
3+
ipfix.c
4+
config.c
5+
config.h
6+
)
7+
8+
install(
9+
TARGETS ipfix-input
10+
LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/"
11+
)
12+
13+
if (ENABLE_DOC_MANPAGE)
14+
# Build a manual page
15+
set(SRC_FILE "${CMAKE_CURRENT_SOURCE_DIR}/doc/ipfixcol2-ipfix-input.7.rst")
16+
set(DST_FILE "${CMAKE_CURRENT_BINARY_DIR}/ipfixcol2-ipfix-input.7")
17+
18+
add_custom_command(TARGET ipfix-input PRE_BUILD
19+
COMMAND ${RST2MAN_EXECUTABLE} --syntax-highlight=none ${SRC_FILE} ${DST_FILE}
20+
DEPENDS ${SRC_FILE}
21+
VERBATIM
22+
)
23+
24+
install(
25+
FILES "${DST_FILE}"
26+
DESTINATION "${INSTALL_DIR_MAN}/man7"
27+
)
28+
endif()

src/plugins/input/ipfix/README.rst

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
IPFIX File (input plugin)
2+
=========================
3+
4+
The plugin reads flow data from one or more files in IPFIX File format. It is possible to
5+
use it to load flow records previously stored using IPFIX output plugin.
6+
7+
Unlike UDP and TCP input plugins which infinitely waits for data from NetFlow/IPFIX
8+
exporters, the plugin will terminate the collector after all files are processed.
9+
10+
Example configuration
11+
---------------------
12+
13+
.. code-block:: xml
14+
15+
<input>
16+
<name>IPFIX File</name>
17+
<plugin>ipfix</plugin>
18+
<params>
19+
<path>/tmp/flow/file.ipfix</path>
20+
</params>
21+
</input>
22+
23+
Parameters
24+
----------
25+
26+
:``path``:
27+
Path to file(s) in IPFIX File format. It is possible to use asterisk instead of
28+
a filename/directory, tilde character (i.e. "~") instead of the home directory of
29+
the user, and brace expressions (i.e. "/tmp/{source1,source2}/file.ipfix").
30+
Directories and non-IPFIX Files that match the file pattern are skipped/ignored.

src/plugins/input/ipfix/config.c

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/**
2+
* \file src/plugins/input/tcp/config.c
3+
* \author Lukas Hutak <[email protected]>
4+
* \brief Configuration parser of TCP input plugin (source file)
5+
* \date 2018
6+
*/
7+
8+
/* Copyright (C) 2018 CESNET, z.s.p.o.
9+
*
10+
* Redistribution and use in source and binary forms, with or without
11+
* modification, are permitted provided that the following conditions
12+
* are met:
13+
* 1. Redistributions of source code must retain the above copyright
14+
* notice, this list of conditions and the following disclaimer.
15+
* 2. Redistributions in binary form must reproduce the above copyright
16+
* notice, this list of conditions and the following disclaimer in
17+
* the documentation and/or other materials provided with the
18+
* distribution.
19+
* 3. Neither the name of the Company nor the names of its contributors
20+
* may be used to endorse or promote products derived from this
21+
* software without specific prior written permission.
22+
*
23+
* ALTERNATIVELY, provided that this notice is retained in full, this
24+
* product may be distributed under the terms of the GNU General Public
25+
* License (GPL) version 2 or later, in which case the provisions
26+
* of the GPL apply INSTEAD OF those given above.
27+
*
28+
* This software is provided ``as is'', and any express or implied
29+
* warranties, including, but not limited to, the implied warranties of
30+
* merchantability and fitness for a particular purpose are disclaimed.
31+
* In no event shall the company or contributors be liable for any
32+
* direct, indirect, incidental, special, exemplary, or consequential
33+
* damages (including, but not limited to, procurement of substitute
34+
* goods or services; loss of use, data, or profits; or business
35+
* interruption) however caused and on any theory of liability, whether
36+
* in contract, strict liability, or tort (including negligence or
37+
* otherwise) arising in any way out of the use of this software, even
38+
* if advised of the possibility of such damage.
39+
*
40+
*/
41+
42+
#include <stdlib.h>
43+
#include <limits.h>
44+
#include <string.h>
45+
#include "config.h"
46+
47+
/*
48+
* <params>
49+
* <path>...</path> // required, exactly once
50+
* </params>
51+
*/
52+
53+
/** XML nodes */
54+
enum params_xml_nodes {
55+
NODE_PATH = 1
56+
};
57+
58+
/** Definition of the \<params\> node */
59+
static const struct fds_xml_args args_params[] = {
60+
FDS_OPTS_ROOT("params"),
61+
FDS_OPTS_ELEM(NODE_PATH, "path", FDS_OPTS_T_STRING, 0),
62+
FDS_OPTS_END
63+
};
64+
65+
/**
66+
* \brief Process \<params\> node
67+
* \param[in] ctx Plugin context
68+
* \param[in] root XML context to process
69+
* \param[in] cfg Parsed configuration
70+
* \return #IPX_OK on success
71+
* \return #IPX_ERR_FORMAT in case of failure
72+
*/
73+
static int
74+
config_parser_root(ipx_ctx_t *ctx, fds_xml_ctx_t *root, struct ipfix_config *cfg)
75+
{
76+
const struct fds_xml_cont *content;
77+
while (fds_xml_next(root, &content) != FDS_EOC) {
78+
switch (content->id) {
79+
case NODE_PATH:
80+
// File(s) path
81+
assert(content->type == FDS_OPTS_T_STRING);
82+
cfg->path = strdup(content->ptr_string);
83+
break;
84+
default:
85+
// Internal error
86+
assert(false);
87+
}
88+
}
89+
90+
if (!cfg->path) {
91+
IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__);
92+
}
93+
94+
return IPX_OK;
95+
}
96+
97+
/**
98+
* \brief Set default parameters of the configuration
99+
* \param[in] cfg Configuration
100+
*/
101+
static void
102+
config_default_set(struct ipfix_config *cfg)
103+
{
104+
cfg->path = NULL;
105+
}
106+
107+
struct ipfix_config *
108+
config_parse(ipx_ctx_t *ctx, const char *params)
109+
{
110+
struct ipfix_config *cfg = calloc(1, sizeof(*cfg));
111+
if (!cfg) {
112+
IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__);
113+
return NULL;
114+
}
115+
116+
// Set default parameters
117+
config_default_set(cfg);
118+
119+
// Create an XML parser
120+
fds_xml_t *parser = fds_xml_create();
121+
if (!parser) {
122+
IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__);
123+
config_destroy(cfg);
124+
return NULL;
125+
}
126+
127+
if (fds_xml_set_args(parser, args_params) != IPX_OK) {
128+
IPX_CTX_ERROR(ctx, "Failed to parse the description of an XML document!", '\0');
129+
fds_xml_destroy(parser);
130+
config_destroy(cfg);
131+
return NULL;
132+
}
133+
134+
fds_xml_ctx_t *params_ctx = fds_xml_parse_mem(parser, params, true);
135+
if (params_ctx == NULL) {
136+
IPX_CTX_ERROR(ctx, "Failed to parse the configuration: %s", fds_xml_last_err(parser));
137+
fds_xml_destroy(parser);
138+
config_destroy(cfg);
139+
return NULL;
140+
}
141+
142+
// Parse parameters
143+
int rc = config_parser_root(ctx, params_ctx, cfg);
144+
fds_xml_destroy(parser);
145+
if (rc != IPX_OK) {
146+
config_destroy(cfg);
147+
return NULL;
148+
}
149+
150+
return cfg;
151+
}
152+
153+
void
154+
config_destroy(struct ipfix_config *cfg)
155+
{
156+
free(cfg->path);
157+
free(cfg);
158+
}

0 commit comments

Comments
 (0)