Skip to content

Commit cee8e48

Browse files
committed
FDS output: initial version of the new plugin
1 parent e5e994a commit cee8e48

File tree

9 files changed

+927
-0
lines changed

9 files changed

+927
-0
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Create a linkable module
2+
add_library(fds-output MODULE
3+
src/Config.cpp
4+
src/Config.hpp
5+
src/Exception.hpp
6+
src/fds.cpp
7+
src/Storage.cpp
8+
src/Storage.hpp
9+
)
10+
11+
install(
12+
TARGETS fds-output
13+
LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/"
14+
)
15+
16+
if (ENABLE_DOC_MANPAGE)
17+
# Build a manual page
18+
set(SRC_FILE "${CMAKE_CURRENT_SOURCE_DIR}/doc/ipfixcol2-fds-output.7.rst")
19+
set(DST_FILE "${CMAKE_CURRENT_BINARY_DIR}/ipfixcol2-fds-output.7")
20+
21+
add_custom_command(TARGET fds-output PRE_BUILD
22+
COMMAND ${RST2MAN_EXECUTABLE} --syntax-highlight=none ${SRC_FILE} ${DST_FILE}
23+
DEPENDS ${SRC_FILE}
24+
VERBATIM
25+
)
26+
27+
install(
28+
FILES "${DST_FILE}"
29+
DESTINATION "${INSTALL_DIR_MAN}/man7"
30+
)
31+
endif()

src/plugins/output/fds/README.rst

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
Flow Data Storage (output plugin)
2+
=================================
3+
4+
The plugin converts and stores IPFIX Data Records into FDS file format. The file
5+
is based on IPFIX, therefore, it provides highly-effective way for long-term
6+
storage and stores complete flow records (including all Enterprise-specific
7+
fields, biflow, etc.) together with identification of the flow exporters who
8+
exported these records.
9+
10+
All data are stored into flat files, which are automatically rotated and renamed
11+
every N minutes (by default 5 minutes).
12+
13+
Example configuration
14+
---------------------
15+
16+
.. code-block:: xml
17+
18+
<output>
19+
<name>FDS output</name>
20+
<plugin>fds</plugin>
21+
<params>
22+
<storagePath>/tmp/ipfixcol2/fds/</storagePath>
23+
<compression>none</compression>
24+
<dumpInterval>
25+
<timeWindow>300</timeWindow>
26+
<align>yes</align>
27+
</dumpInterval>
28+
</params>
29+
</output>
30+
31+
Parameters
32+
----------
33+
34+
:``storagePath``:
35+
The path element specifies the storage directory for data files. Keep on
36+
mind that the path must exist in your system. Otherwise, no files are stored.
37+
All files will be stored based on the configuration using the following
38+
template: ``<storagePath>/YYYY/MM/DD/flows.<ts>.fds`` where ``YYYY/MM/DD``
39+
means year/month/day and ``<ts>`` represents a UTC timestamp in
40+
format ``YYMMDDhhmmss``.
41+
42+
:``compression``:
43+
Data compression helps to significantly reduce size of output files.
44+
Following compression algorithms are available:
45+
46+
:``none``: Compression disabled [default]
47+
:``lz4``: LZ4 compression (very fast, slightly worse compression ration)
48+
:``zstd``: ZSTD compression (slightly slower, good compression ration)
49+
50+
:``dumpInterval``:
51+
Configuration of output files rotation.
52+
53+
:``timeWindow``:
54+
Specifies time interval in seconds to rotate files i.e. close the current
55+
file and create a new one. [default: 300]
56+
57+
:``align``:
58+
Align file rotation with next N minute interval. For example, if enabled
59+
and window size is 5 minutes long, files will be created at 0, 5, 10, etc.
60+
[values: yes/no, default: yes]
61+
62+
:``asyncIO``:
63+
Allows to use asynchronous I/O for writing to the file. Usually when parts
64+
of the file are being written, the process is blocked on synchronous I/O
65+
and waits for the operation to complete. However, asynchronous I/O allows
66+
the plugin to simultaneously write to file and process flow records, which
67+
significantly improves overall performance. (Note: a pool of service
68+
threads shared among instances of FDS plugin might be created).
69+
[values: true/false, default: true]
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
======================
2+
ipfixcol2-fds-output
3+
======================
4+
5+
---------------------------------
6+
Flow Data Storage (output plugin)
7+
---------------------------------
8+
9+
:Author: Lukáš Huták ([email protected])
10+
:Date: 2019-07-01
11+
:Copyright: Copyright © 2019 CESNET, z.s.p.o.
12+
:Version: 2.0
13+
:Manual section: 7
14+
:Manual group: IPFIXcol collector
15+
16+
Description
17+
-----------
18+
19+
.. include:: ../README.rst
20+
:start-line: 3
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/**
2+
* \file src/plugins/output/fds/src/Config.cpp
3+
* \author Lukas Hutak <[email protected]>
4+
* \brief Parser of XML configuration (source file)
5+
* \date 2019
6+
*
7+
* Copyright(c) 2019 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#include "Config.hpp"
12+
#include <memory>
13+
#include <stdexcept>
14+
15+
/*
16+
* <params>
17+
* <storagePath>...</storagePath>
18+
* <compression>...</compression> <!-- optional -->
19+
* <dumpInterval> <!-- optional -->
20+
* <timeWindow>...</timeWindow> <!-- optional -->
21+
* <align>...</align> <!-- optional -->
22+
* </dumpInterval>
23+
* <asyncIO>...</asyncIO> <!-- optional -->
24+
* </params>
25+
*/
26+
27+
/// XML nodes
28+
enum params_xml_nodes {
29+
NODE_STORAGE = 1,
30+
NODE_COMPRESS,
31+
NODE_DUMP,
32+
NODE_ASYNCIO,
33+
34+
DUMP_WINDOW,
35+
DUMP_ALIGN
36+
};
37+
38+
/// Definition of the \<dumpInterval\> node
39+
static const struct fds_xml_args args_dump[] = {
40+
FDS_OPTS_ELEM(DUMP_WINDOW, "timeWindow", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
41+
FDS_OPTS_ELEM(DUMP_ALIGN, "align", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
42+
FDS_OPTS_END
43+
};
44+
45+
/// Definition of the \<params\> node
46+
static const struct fds_xml_args args_params[] = {
47+
FDS_OPTS_ROOT("params"),
48+
FDS_OPTS_ELEM(NODE_STORAGE, "storagePath", FDS_OPTS_T_STRING, 0),
49+
FDS_OPTS_ELEM(NODE_COMPRESS, "compression", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT),
50+
FDS_OPTS_NESTED(NODE_DUMP, "dumpInterval", args_dump, FDS_OPTS_P_OPT),
51+
FDS_OPTS_ELEM(NODE_ASYNCIO, "asyncIO", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
52+
FDS_OPTS_END
53+
};
54+
55+
Config::Config(const char *params)
56+
{
57+
set_default();
58+
59+
// Create XML parser
60+
std::unique_ptr<fds_xml_t, decltype(&fds_xml_destroy)> xml(fds_xml_create(), &fds_xml_destroy);
61+
if (!xml) {
62+
throw std::runtime_error("Failed to create an XML parser!");
63+
}
64+
65+
if (fds_xml_set_args(xml.get(), args_params) != FDS_OK) {
66+
throw std::runtime_error("Failed to parse the description of an XML document!");
67+
}
68+
69+
fds_xml_ctx_t *params_ctx = fds_xml_parse_mem(xml.get(), params, true);
70+
if (!params_ctx) {
71+
std::string err = fds_xml_last_err(xml.get());
72+
throw std::runtime_error("Failed to parse the configuration: " + err);
73+
}
74+
75+
// Parse parameters and check configuration
76+
try {
77+
parse_root(params_ctx);
78+
validate();
79+
} catch (std::exception &ex) {
80+
throw std::runtime_error("Failed to parse the configuration: " + std::string(ex.what()));
81+
}
82+
}
83+
84+
/**
85+
* @brief Set default parameters
86+
*/
87+
void
88+
Config::set_default()
89+
{
90+
m_path.clear();
91+
m_calg = calg::NONE;
92+
m_async = true;
93+
94+
m_window.align = true;
95+
m_window.size = WINDOW_SIZE;
96+
}
97+
98+
/**
99+
* @brief Check if the configuration is valid
100+
* @throw runtime_error if the configuration breaks some rules
101+
*/
102+
void
103+
Config::validate()
104+
{
105+
if (m_path.empty()) {
106+
throw std::runtime_error("Storage path cannot be empty!");
107+
}
108+
109+
if (m_window.size == 0) {
110+
throw std::runtime_error("Window size cannot be zero!");
111+
}
112+
}
113+
114+
/**
115+
* @brief Process \<params\> node
116+
* @param[in] ctx XML context to process
117+
* @throw runtime_error if the parser fails
118+
*/
119+
void
120+
Config::parse_root(fds_xml_ctx_t *ctx)
121+
{
122+
const struct fds_xml_cont *content;
123+
while (fds_xml_next(ctx, &content) != FDS_EOC) {
124+
switch (content->id) {
125+
case NODE_STORAGE:
126+
// Storage path
127+
assert(content->type == FDS_OPTS_T_STRING);
128+
m_path = content->ptr_string;
129+
break;
130+
case NODE_COMPRESS:
131+
// Compression method
132+
assert(content->type == FDS_OPTS_T_STRING);
133+
if (strcasecmp(content->ptr_string, "none") == 0) {
134+
m_calg = calg::NONE;
135+
} else if (strcasecmp(content->ptr_string, "lz4") == 0) {
136+
m_calg = calg::LZ4;
137+
} else if (strcasecmp(content->ptr_string, "zstd") == 0) {
138+
m_calg = calg::ZSTD;
139+
} else {
140+
const std::string inv_str = content->ptr_string;
141+
throw std::runtime_error("Unknown compression algorithm '" + inv_str + "'");
142+
}
143+
break;
144+
case NODE_ASYNCIO:
145+
// Asynchronous I/O
146+
assert(content->type == FDS_OPTS_T_BOOL);
147+
m_async = content->val_bool;
148+
break;
149+
case NODE_DUMP:
150+
// Dump window
151+
assert(content->type == FDS_OPTS_T_CONTEXT);
152+
parse_dump(content->ptr_ctx);
153+
break;
154+
default:
155+
// Internal error
156+
throw std::runtime_error("Unknown XML node");
157+
}
158+
}
159+
}
160+
161+
/**
162+
* @brief Auxiliary function for parsing \<dumpInterval\> options
163+
* @param[in] ctx XML context to process
164+
* @throw runtime_error if the parser fails
165+
*/
166+
void
167+
Config::parse_dump(fds_xml_ctx_t *ctx)
168+
{
169+
const struct fds_xml_cont *content;
170+
while(fds_xml_next(ctx, &content) != FDS_EOC) {
171+
switch (content->id) {
172+
case DUMP_WINDOW:
173+
// Window size
174+
assert(content->type == FDS_OPTS_T_UINT);
175+
if (content->val_uint > UINT32_MAX) {
176+
throw std::runtime_error("Window size is too long!");
177+
}
178+
m_window.size = static_cast<uint32_t>(content->val_uint);
179+
break;
180+
case DUMP_ALIGN:
181+
// Window alignment
182+
assert(content->type == FDS_OPTS_T_BOOL);
183+
m_window.align = content->val_bool;
184+
break;
185+
default:
186+
// Internal error
187+
throw std::runtime_error("Unknown XML node");
188+
}
189+
}
190+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* \file src/plugins/output/fds/src/Config.hpp
3+
* \author Lukas Hutak <[email protected]>
4+
* \brief Parser of XML configuration (header file)
5+
* \date 2019
6+
*
7+
* Copyright(c) 2019 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#ifndef IPFIXCOL2_FDS_CONFIG_HPP
12+
#define IPFIXCOL2_FDS_CONFIG_HPP
13+
14+
#include <string>
15+
#include <libfds.h>
16+
17+
/**
18+
* @brief Plugin configuration parser
19+
*/
20+
class Config {
21+
public:
22+
/**
23+
* @brief Parse configuration of the plugin
24+
* @param[in] params XML parameters to parse
25+
* @throw runtime_exception on error
26+
*/
27+
Config(const char *params);
28+
~Config() = default;
29+
30+
enum class calg {
31+
NONE, ///< Do not use compression
32+
LZ4, ///< LZ4 compression
33+
ZSTD ///< ZSTD compression
34+
};
35+
36+
/// Storage path
37+
std::string m_path;
38+
/// Compression algorithm
39+
calg m_calg;
40+
/// Asynchronous I/O enabled
41+
bool m_async;
42+
43+
struct {
44+
bool align; ///< Enable/disable window alignment
45+
uint32_t size; ///< Time window size
46+
} m_window; ///< Window alignment
47+
48+
private:
49+
/// Default window size
50+
static const uint32_t WINDOW_SIZE = 300U;
51+
52+
void
53+
set_default();
54+
void
55+
validate();
56+
57+
void
58+
parse_root(fds_xml_ctx_t *ctx);
59+
void
60+
parse_dump(fds_xml_ctx_t *ctx);
61+
};
62+
63+
64+
#endif // IPFIXCOL2_FDS_CONFIG_HPP

0 commit comments

Comments
 (0)