Skip to content

Commit 2d8cd68

Browse files
committed
Core: support for Data Record extensions [WIP]
1 parent 41ea771 commit 2d8cd68

File tree

10 files changed

+280
-0
lines changed

10 files changed

+280
-0
lines changed

src/core/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ set(CORE_SOURCE
44
configurator/config_file.hpp
55
configurator/configurator.cpp
66
configurator/configurator.hpp
7+
configurator/extensions.cpp
8+
configurator/extensions.hpp
79
configurator/instance.hpp
810
configurator/instance_input.cpp
911
configurator/instance_input.hpp
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
2+
3+
#include <algorithm>
4+
#include "extensions.hpp"
5+
6+
/** Identification of this component (for log) */
7+
static const char *comp_str = "Extensions";
8+
9+
void
10+
ipx_cfg_extensions::add_extension(ipx_instance *inst, unsigned int idx, struct ipx_ctx_ext *ext)
11+
{
12+
auto ext_info = &m_extensions[ext->data_type][ext->data_name];
13+
std::vector<struct plugin_rec> &vec_plugins = (ext->etype == IPX_EXTENSION_PRODUCER)
14+
? ext_info->producers
15+
: ext_info->consumers;
16+
vec_plugins.push_back({inst->get_name(), idx, ext});
17+
}
18+
19+
void
20+
ipx_cfg_extensions::check_dependencies(const std::string &ident, const struct ext_rec &rec)
21+
{
22+
std::string name_producers;
23+
std::string name_consumers;
24+
25+
// Prepare list of produces and consumers for log
26+
for (const auto &producer : rec.producers) {
27+
if (!name_producers.empty()) {
28+
name_producers.append(", ");
29+
}
30+
name_producers.append("'" + producer.name + "'");
31+
}
32+
for (const auto &consumer : rec.consumers) {
33+
if (!name_consumers.empty()) {
34+
name_consumers.append(", ");
35+
}
36+
name_consumers.append("'" + consumer.name + "'");
37+
}
38+
39+
// No producers?
40+
if (rec.producers.empty()) {
41+
throw std::runtime_error("No provider of Data Record extension " + ident + "found. "
42+
"The extension is required by " + name_consumers);
43+
}
44+
45+
// Multiple producers?
46+
if (rec.producers.size() > 1) {
47+
throw std::runtime_error("Data Record extension " + ident + " is provided by "
48+
"multiple instances (" + name_producers + ")");
49+
}
50+
51+
// No consumer
52+
if (rec.consumers.empty()) {
53+
IPX_WARNING(comp_str, "Extension %s is provided by %s, but no other plugins use it. "
54+
"The provider can be probably removed.", ident.c_str(), name_producers.c_str());
55+
return;
56+
}
57+
58+
// Check if the producer is placed before all consumers
59+
const struct plugin_rec &producer_rec = rec.producers.front();
60+
const auto min_cons = std::min_element(rec.consumers.cbegin(), rec.consumers.cend());
61+
if (producer_rec.plugin_idx > min_cons->plugin_idx) {
62+
throw std::runtime_error("Instance '" + producer_rec.name + "', which is a provider "
63+
"of Data Record extension " + ident + ", is placed in the collector pipeline "
64+
"after '" + min_cons->name + "' instance, which depends on the extension. "
65+
"Please, swap the order of the plugin instances");
66+
}
67+
}
68+
69+
70+
void
71+
ipx_cfg_extensions::resolve(std::vector<ipx_instance *> &plugins)
72+
{
73+
// Add extensions and dependencies from all plugins
74+
unsigned int pos = 0;
75+
for (auto &it : plugins) {
76+
if (dynamic_cast<ipx_instance_input *>(it)) {
77+
// Input plugins cannot register extensions
78+
continue;
79+
}
80+
81+
struct ipx_ctx_ext *arr_ptr = nullptr;
82+
size_t arr_size = 0;
83+
84+
std::tie(arr_ptr, arr_size) = it->get_extensions();
85+
for (size_t i = 0; i < arr_size; ++i) {
86+
add_extension(it, pos, &arr_ptr[i]);
87+
}
88+
89+
if (!dynamic_cast<ipx_instance_output *>(it)) {
90+
// Increment position for all plugins except outputs
91+
pos++;
92+
}
93+
};
94+
95+
if (m_extensions.empty()) {
96+
// No extensions, no dependencies
97+
return;
98+
}
99+
100+
size_t offset = 0;
101+
uint64_t mask = 1U;
102+
103+
for (auto &ext_type : m_extensions) {
104+
for (auto &ext_name : ext_type.second) {
105+
if (!mask) {
106+
// No more bits in the mask!
107+
throw std::runtime_error("Maximum number of Data Record extensions has been reached!");
108+
}
109+
110+
struct ext_rec &ext = ext_name.second;
111+
assert(ext.producers.size() == 1 && "Exactly one producer");
112+
113+
// Check the extension
114+
std::string ident = "'" + ext_type.first + "/" + ext_name.first + "'";
115+
check_dependencies(ident, ext_name.second);
116+
117+
// Determine size, offset and bitset mask
118+
ext.size = ext.producers[0].rec->size;
119+
ext.offset = offset;
120+
ext.mask = mask;
121+
122+
// Align the offset to multiple of 8
123+
ext.offset += (ext.size % 8U == 0) ? ext.size : (((ext.size / 8U) + 1U) * 8U);
124+
ext.mask <<= 1U;
125+
126+
IPX_DEBUG(comp_str, "Extension %s registered (size: %zu, offset: %zu)",
127+
ext.size, ext.offset);
128+
}
129+
}
130+
131+
132+
133+
// TODO: save total size of all extensions
134+
135+
136+
137+
}
138+
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
2+
#ifndef IPX_CFG_EXTENSIONS_H
3+
#define IPX_CFG_EXTENSIONS_H
4+
5+
#include <map>
6+
#include <memory>
7+
#include <string>
8+
#include <vector>
9+
10+
#include "instance.hpp"
11+
#include "instance_intermediate.hpp"
12+
#include "instance_output.hpp"
13+
14+
extern "C" {
15+
#include "../extension.h"
16+
#include "../verbose.h"
17+
}
18+
19+
class ipx_cfg_extensions {
20+
private:
21+
struct plugin_rec {
22+
std::string name;
23+
unsigned int plugin_idx;
24+
const struct ipx_ctx_ext *rec;
25+
26+
bool operator<(const plugin_rec &other) const {
27+
return plugin_idx < other.plugin_idx;
28+
}
29+
};
30+
31+
struct ext_rec {
32+
std::vector<plugin_rec> producers;
33+
std::vector<plugin_rec> consumers;
34+
35+
size_t size;
36+
size_t offset;
37+
uint64_t mask;
38+
};
39+
40+
// Extensions [extension type][extension name]
41+
std::map<std::string, std::map<std::string, struct ext_rec>> m_extensions;
42+
43+
void
44+
add_extension(ipx_instance *inst, unsigned int idx, struct ipx_ctx_ext *ext);
45+
void
46+
check_dependencies(const std::string &ident, const struct ext_rec &rec);
47+
48+
public:
49+
ipx_cfg_extensions() = default;
50+
~ipx_cfg_extensions() = default;
51+
52+
void
53+
resolve(std::vector<ipx_instance *> &plugins);
54+
};
55+
56+
#endif // IPX_CFG_EXTENSIONS_H

src/core/configurator/instance.hpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,25 @@ class ipx_instance {
116116

117117
/** \brief Start a thread of the instance */
118118
virtual void start() = 0;
119+
120+
/**
121+
* \brief Get name of the instance
122+
* \return Name
123+
*/
124+
const std::string &
125+
get_name() {
126+
return _name;
127+
}
128+
129+
/**
130+
* \brief Get registered extensions and dependencies
131+
* \return An array of extensions and dependencies and the size of the array
132+
* \throw runtime_error if extension are not supported by the instance type
133+
*/
134+
virtual std::tuple<struct ipx_ctx_ext *, size_t>
135+
get_extensions() {
136+
throw std::runtime_error("Extensions are not supported the plugin");
137+
}
119138
};
120139

121140
#endif //IPFIXCOL_INSTANCE_H

src/core/configurator/instance_intermediate.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,13 @@ ipx_instance_intermediate::connect_to(ipx_instance_intermediate &intermediate)
134134
{
135135
assert(_state == state::NEW); // Only configuration of an uninitialized instance can be changed!
136136
ipx_ctx_ring_dst_set(_ctx, intermediate.get_input());
137+
}
138+
139+
std::tuple<struct ipx_ctx_ext *, size_t>
140+
ipx_instance_intermediate::get_extensions()
141+
{
142+
struct ipx_ctx_ext *ext_arr = nullptr;
143+
size_t ext_size = 0;
144+
ipx_ctx_ext_defs(_ctx, &ext_arr, &ext_size);
145+
return std::make_tuple(ext_arr, ext_size);
137146
}

src/core/configurator/instance_intermediate.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ class ipx_instance_intermediate : public ipx_instance {
152152
* \param[in] intermediate Intermediate plugin to receive our messages
153153
*/
154154
virtual void connect_to(ipx_instance_intermediate &intermediate);
155+
156+
/**
157+
* \brief Get registered extensions and dependencies
158+
* \return An array of extensions and dependencies and the size of the array
159+
* \throw runtime_error if extension are not supported by the instance type
160+
*/
161+
virtual std::tuple<struct ipx_ctx_ext *, size_t>
162+
get_extensions() override;
155163
};
156164

157165
#endif //IPFIXCOL_INSTANCE_INTERMEDIATE_HPP

src/core/configurator/instance_output.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,12 @@ ipx_instance_output::get_input()
141141
{
142142
return std::make_tuple(_instance_buffer, _type, _filter);
143143
}
144+
145+
std::tuple<struct ipx_ctx_ext *, size_t>
146+
ipx_instance_output::get_extensions()
147+
{
148+
struct ipx_ctx_ext *ext_arr = nullptr;
149+
size_t ext_size = 0;
150+
ipx_ctx_ext_defs(_ctx, &ext_arr, &ext_size);
151+
return std::make_tuple(ext_arr, ext_size);
152+
}

src/core/configurator/instance_output.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,16 @@ class ipx_instance_output : public ipx_instance {
137137
*/
138138
std::tuple<ipx_ring_t *, enum ipx_odid_filter_type, const ipx_orange_t *>
139139
get_input();
140+
141+
/**
142+
* \brief Get registered extension dependencies
143+
*
144+
* \note Since output instances cannot produce extensions, the array contains only registered
145+
* dependencies.
146+
* \return An array of dependencies and the size of the array
147+
*/
148+
std::tuple<struct ipx_ctx_ext *, size_t>
149+
get_extensions() override;
140150
};
141151

142152
#endif //IPFIXCOL_INSTANCE_OUTPUT_HPP

src/core/context.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,18 @@ ipx_ctx_ring_dst_set(ipx_ctx_t *ctx, ipx_ring_t *ring)
375375
ctx->pipeline.dst = ring;
376376
}
377377

378+
void
379+
ipx_ctx_ext_defs(ipx_ctx_t *ctx, struct ipx_ctx_ext **arr, size_t *arr_size)
380+
{
381+
if (ctx->cfg_extension.items_cnt == 0) {
382+
*arr = NULL;
383+
*arr_size = 0;
384+
return;
385+
}
386+
387+
*arr = ctx->cfg_extension.items;
388+
*arr_size = ctx->cfg_extension.items_cnt;
389+
}
378390

379391
// -------------------------------------------------------------------------------------------------
380392

@@ -448,6 +460,7 @@ ipx_ctx_ext_producer(ipx_ctx_t *ctx, const char *type, const char *name, size_t
448460
return rc;
449461
}
450462

463+
IPX_CTX_DEBUG(ctx, "Data Record extension '%s/%s' has been registered.", type, name);
451464
*ext = rec;
452465
return IPX_OK;
453466
}
@@ -474,6 +487,7 @@ ipx_ctx_ext_consumer(ipx_ctx_t *ctx, const char *type, const char *name, ipx_ctx
474487
return rc;
475488
}
476489

490+
IPX_CTX_DEBUG(ctx, "Dependency on Data Record extension '%s/%s' has been added.", type, name);
477491
*ext = rec;
478492
return IPX_OK;
479493
}

src/core/context.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,4 +264,19 @@ ipx_ctx_verb_set(ipx_ctx_t *ctx, enum ipx_verb_level verb);
264264
IPX_API int
265265
ipx_ctx_term_cnt_set(ipx_ctx_t *ctx, unsigned int cnt);
266266

267+
/**
268+
* \brief Get registered extensions and dependencies
269+
*
270+
* \note
271+
* Keep on mind that the array is filled only after plugin initialization. Moreover,
272+
* the most plugins don't use extension at all, so the array is usually empty.
273+
* \warning
274+
* Don't change the size and offset of extensions if the plugin is already running!
275+
* \param[in] ctx Plugin context
276+
* \param[out] arr Array with extensions and dependencies
277+
* \param[out] arr_size Size of the array
278+
*/
279+
IPX_API void
280+
ipx_ctx_ext_defs(ipx_ctx_t *ctx, struct ipx_ctx_ext **arr, size_t *arr_size);
281+
267282
#endif // IPFIXCOL_CONTEXT_INTERNAL_H

0 commit comments

Comments
 (0)