Skip to content

Commit c6c9718

Browse files
committed
Core: add support for Data Record extension [WIP]
1 parent 80b5a31 commit c6c9718

File tree

13 files changed

+410
-110
lines changed

13 files changed

+410
-110
lines changed

include/ipfixcol2/message_ipfix.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,12 @@ struct ipx_ipfix_set {
9898
* \brief Data record (record + extensions)
9999
*/
100100
struct ipx_ipfix_record {
101-
/** Data record information */
101+
/** Data record information */
102102
struct fds_drec rec;
103-
/** Start of reserved space for registered extensions (filled by plugins) */
103+
104+
/** Bit mask of filled extensions (set by producers) */
105+
uint64_t ext_mask;
106+
/** Start of reserved space for registered extensions (filled by producers) */
104107
uint8_t ext[1];
105108
};
106109

src/core/configurator/configurator.cpp

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include <signal.h>
4747

4848
#include "configurator.hpp"
49+
#include "extensions.hpp"
4950

5051
extern "C" {
5152
#include "../message_terminate.h"
@@ -249,16 +250,40 @@ ipx_configurator::start(const ipx_config_model &model)
249250

250251
IPX_DEBUG(comp_str, "All instances have been successfully initialized.", '\0');
251252

252-
// Phase 4. Start threads of all plugins
253+
// Phase 4. Register and resolved Data Record extensions and dependencies
254+
ipx_cfg_extensions ext_mgr;
255+
size_t pos = 0; // Position of an instance in the collector pipeline
256+
257+
for (auto &input : inputs) {
258+
input->extensions_register(&ext_mgr, pos);
259+
}
260+
261+
pos++;
262+
for (auto &inter : inters) {
263+
inter->extensions_register(&ext_mgr, pos);
264+
pos++;
265+
}
266+
267+
for (auto &output : outputs) {
268+
output->extensions_register(&ext_mgr, pos);
269+
}
270+
271+
ext_mgr.resolve();
272+
ext_mgr.list_extensions();
273+
274+
// Phase 5. Start threads of all plugins and update definitions of extensions
253275
for (auto &output : outputs) {
276+
output->extensions_resolve(&ext_mgr);
254277
output->start();
255278
}
256279

257280
for (auto &inter : inters) {
281+
inter->extensions_resolve(&ext_mgr);
258282
inter->start();
259283
}
260284

261285
for (auto &input : inputs) {
286+
input->extensions_resolve(&ext_mgr);
262287
input->start();
263288
}
264289

Lines changed: 142 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,81 @@
1-
1+
/**
2+
* @file src/code/configurator/extensions.cpp
3+
* @author Lukas Hutak ([email protected])
4+
* @brief Manager of Data Record extensions
5+
* @date February 2020
6+
*/
7+
8+
/* Copyright (C) 2020 CESNET, z.s.p.o.
9+
*
10+
* Redistribution and use in source and binary forms, with or without
11+
* modification, are permitted provided that the following conditions
12+
* are met:
13+
* 1. Redistributions of source code must retain the above copyright
14+
* notice, this list of conditions and the following disclaimer.
15+
* 2. Redistributions in binary form must reproduce the above copyright
16+
* notice, this list of conditions and the following disclaimer in
17+
* the documentation and/or other materials provided with the
18+
* distribution.
19+
* 3. Neither the name of the Company nor the names of its contributors
20+
* may be used to endorse or promote products derived from this
21+
* software without specific prior written permission.
22+
*
23+
* ALTERNATIVELY, provided that this notice is retained in full, this
24+
* product may be distributed under the terms of the GNU General Public
25+
* License (GPL) version 2 or later, in which case the provisions
26+
* of the GPL apply INSTEAD OF those given above.
27+
*
28+
* This software is provided ``as is'', and any express or implied
29+
* warranties, including, but not limited to, the implied warranties of
30+
* merchantability and fitness for a particular purpose are disclaimed.
31+
* In no event shall the company or contributors be liable for any
32+
* direct, indirect, incidental, special, exemplary, or consequential
33+
* damages (including, but not limited to, procurement of substitute
34+
* goods or services; loss of use, data, or profits; or business
35+
* interruption) however caused and on any theory of liability, whether
36+
* in contract, strict liability, or tort (including negligence or
37+
* otherwise) arising in any way out of the use of this software, even
38+
* if advised of the possibility of such damage.
39+
*
40+
*/
241

342
#include <algorithm>
443
#include "extensions.hpp"
544

6-
/** Identification of this component (for log) */
7-
static const char *comp_str = "Extensions";
45+
extern "C" {
46+
#include "../message_ipfix.h"
47+
#include "../verbose.h"
48+
}
849

50+
/** Identification of this component (for log) */
51+
static const char *comp_str = "Configurator (extensions)";
52+
53+
/**
54+
* \brief Register extension producer or dependency
55+
*
56+
* \param[in] name Name of the plugin instance
57+
* \param[in] pos Position of the instance in the pipeline
58+
* \param[in] ext Reference to extension definition of the plugin (read only)
59+
*/
960
void
10-
ipx_cfg_extensions::add_extension(ipx_instance *inst, unsigned int idx, struct ipx_ctx_ext *ext)
61+
ipx_cfg_extensions::add_extension(const char *name, unsigned int pos, const struct ipx_ctx_ext *ext)
1162
{
1263
auto ext_info = &m_extensions[ext->data_type][ext->data_name];
1364
std::vector<struct plugin_rec> &vec_plugins = (ext->etype == IPX_EXTENSION_PRODUCER)
1465
? ext_info->producers
1566
: ext_info->consumers;
16-
vec_plugins.push_back({inst->get_name(), idx, ext});
67+
vec_plugins.push_back({name, pos, ext});
1768
}
1869

70+
/**
71+
* @brief Check extension definition
72+
*
73+
* The function checks if there is exactly one producer and that the producer is placed
74+
* before all consumers in the collector pipeline.
75+
* @param[in] ident Identification of the extension (for log only)
76+
* @param[in] rec Internal extension record
77+
* @throw runtime_error if any condition is broken
78+
*/
1979
void
2080
ipx_cfg_extensions::check_dependencies(const std::string &ident, const struct ext_rec &rec)
2181
{
@@ -58,81 +118,122 @@ ipx_cfg_extensions::check_dependencies(const std::string &ident, const struct ex
58118
// Check if the producer is placed before all consumers
59119
const struct plugin_rec &producer_rec = rec.producers.front();
60120
const auto min_cons = std::min_element(rec.consumers.cbegin(), rec.consumers.cend());
61-
if (producer_rec.plugin_idx > min_cons->plugin_idx) {
121+
if (producer_rec.inst_pos > min_cons->inst_pos) {
62122
throw std::runtime_error("Instance '" + producer_rec.name + "', which is a provider "
63123
"of Data Record extension " + ident + ", is placed in the collector pipeline "
64124
"after '" + min_cons->name + "' instance, which depends on the extension. "
65125
"Please, swap the order of the plugin instances");
66126
}
67127
}
68128

69-
70129
void
71-
ipx_cfg_extensions::resolve(std::vector<ipx_instance *> &plugins)
130+
ipx_cfg_extensions::register_instance(ipx_ctx_t *ctx, size_t pos)
72131
{
73-
// Add extensions and dependencies from all plugins
74-
unsigned int pos = 0;
75-
for (auto &it : plugins) {
76-
if (dynamic_cast<ipx_instance_input *>(it)) {
77-
// Input plugins cannot register extensions
78-
continue;
79-
}
80-
81-
struct ipx_ctx_ext *arr_ptr = nullptr;
82-
size_t arr_size = 0;
132+
const char *inst_name = ipx_ctx_name_get(ctx);
133+
struct ipx_ctx_ext *arr_ptr = nullptr;
134+
size_t arr_size = 0;
83135

84-
std::tie(arr_ptr, arr_size) = it->get_extensions();
85-
for (size_t i = 0; i < arr_size; ++i) {
86-
add_extension(it, pos, &arr_ptr[i]);
87-
}
88-
89-
if (!dynamic_cast<ipx_instance_output *>(it)) {
90-
// Increment position for all plugins except outputs
91-
pos++;
92-
}
93-
};
136+
if (m_resolved) {
137+
throw std::runtime_error("(internal) Instance extensions " + std::string(inst_name)
138+
+ " cannot be registered anymore as extension depencies have been resolved!");
139+
}
94140

95-
if (m_extensions.empty()) {
96-
// No extensions, no dependencies
97-
return;
141+
// Register all extensions
142+
ipx_ctx_ext_defs(ctx, &arr_ptr, &arr_size);
143+
for (size_t i = 0; i < arr_size; ++i) {
144+
add_extension(inst_name, pos, &arr_ptr[i]);
98145
}
146+
}
99147

148+
void
149+
ipx_cfg_extensions::resolve()
150+
{
100151
size_t offset = 0;
101152
uint64_t mask = 1U;
102153

154+
if (m_resolved) {
155+
return;
156+
}
157+
103158
for (auto &ext_type : m_extensions) {
104159
for (auto &ext_name : ext_type.second) {
160+
// Check extension dependencies and update its description
161+
struct ext_rec &ext = ext_name.second;
162+
std::string ident = "'" + ext_type.first + "/" + ext_name.first + "'";
163+
105164
if (!mask) {
106165
// No more bits in the mask!
107166
throw std::runtime_error("Maximum number of Data Record extensions has been reached!");
108167
}
109168

110-
struct ext_rec &ext = ext_name.second;
111-
assert(ext.producers.size() == 1 && "Exactly one producer");
112-
113169
// Check the extension
114-
std::string ident = "'" + ext_type.first + "/" + ext_name.first + "'";
115-
check_dependencies(ident, ext_name.second);
170+
check_dependencies(ident, ext);
171+
assert(ext.producers.size() == 1 && "Exactly one producer");
116172

117173
// Determine size, offset and bitset mask
118174
ext.size = ext.producers[0].rec->size;
119175
ext.offset = offset;
120176
ext.mask = mask;
121177

122178
// Align the offset to multiple of 8
123-
ext.offset += (ext.size % 8U == 0) ? ext.size : (((ext.size / 8U) + 1U) * 8U);
179+
ext.offset += (ext.size % 8U == 0) ? (ext.size) : (((ext.size / 8U) + 1U) * 8U);
124180
ext.mask <<= 1U;
125-
126-
IPX_DEBUG(comp_str, "Extension %s registered (size: %zu, offset: %zu)",
127-
ext.size, ext.offset);
128181
}
129182
}
130183

184+
m_size_total = offset;
185+
m_resolved = true;
186+
}
187+
188+
void
189+
ipx_cfg_extensions::update_instance(ipx_ctx_t *ctx)
190+
{
191+
struct ipx_ctx_ext *arr_ptr = nullptr;
192+
size_t arr_size = 0;
131193

194+
if (!m_resolved) {
195+
throw std::runtime_error("(internal) Extensions hasn't been resolved yet!");
196+
}
132197

133-
// TODO: save total size of all extensions
198+
// For all instance extensions and dependencies
199+
ipx_ctx_ext_defs(ctx, &arr_ptr, &arr_size);
200+
for (size_t i = 0; i < arr_size; ++i) {
201+
// Find the extension definition and update instance parameters
202+
struct ipx_ctx_ext *ext = &arr_ptr[i];
134203

204+
const auto &exts_by_type = m_extensions.at(ext->data_type);
205+
const auto &ext_def = exts_by_type.at(ext->data_name);
135206

207+
// In case of the producer, the extension size must be still the same
208+
assert(ext->etype != IPX_EXTENSION_PRODUCER || ext->size == ext_def.size);
209+
ext->mask = ext_def.mask;
210+
ext->offset = ext_def.offset;
211+
ext->size = ext_def.size;
212+
}
136213

214+
// Update size of the Data Record in the plugin context
215+
ipx_ctx_recsize_set(ctx, IPX_MSG_IPFIX_BASE_REC_SIZE + m_size_total);
137216
}
138217

218+
void
219+
ipx_cfg_extensions::list_extensions()
220+
{
221+
if (!m_resolved) {
222+
throw std::runtime_error("(internal) Extensions hasn't been resolved yet!");
223+
}
224+
225+
if (m_extensions.empty()) {
226+
IPX_DEBUG(comp_str, "No Data Record extensions!");
227+
return;
228+
}
229+
230+
for (auto &ext_type : m_extensions) {
231+
for (auto &ext_name : ext_type.second) {
232+
struct ext_rec &ext = ext_name.second;
233+
std::string ident = "'" + ext_type.first + "/" + ext_name.first + "'";
234+
235+
IPX_DEBUG(comp_str, "Data Record extension %s (size: %zu, offset: %zu, consumers: %zu)",
236+
ident.c_str(), ext.size, ext.offset, ext.consumers.size());
237+
}
238+
}
239+
}

0 commit comments

Comments
 (0)