Skip to content

Commit 5b86270

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
Process plugins - Update ipfixprobe core
1 parent 037dd8c commit 5b86270

File tree

11 files changed

+229
-325
lines changed

11 files changed

+229
-325
lines changed

src/core/CMakeLists.txt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,28 @@ add_library(ipfixprobe-core STATIC
1313
inputPlugin.cpp
1414
pluginManager.cpp
1515
pluginManager.hpp
16+
fieldManager.cpp
17+
xxhash.c
1618
)
1719

18-
1920
target_include_directories(ipfixprobe-core PUBLIC
2021
${CMAKE_SOURCE_DIR}/include
22+
${CMAKE_SOURCE_DIR}/include/ipfixprobe/processPlugin
23+
${CMAKE_SOURCE_DIR}/include/ipfixprobe/outputPlugin
2124
${CMAKE_BINARY_DIR}/src
25+
${adaptmon_SOURCE_DIR}/lib/include/public/
2226
)
2327

2428
target_compile_options(ipfixprobe-core PRIVATE -fPIC)
2529

30+
#add_library(ipfixprobe-plugin-factory STATIC
31+
# pluginFactory.cpp
32+
#)
33+
34+
#target_include_directories(ipfixprobe-plugin-factory PUBLIC
35+
# ${CMAKE_SOURCE_DIR}/include
36+
#)
37+
2638
set(CORE_LIB -Wl,--whole-archive ipfixprobe-core -Wl,--no-whole-archive)
2739

2840
target_link_libraries(ipfixprobe-core

src/core/fieldManager.cpp

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/**
2+
* @file
3+
* @author Pavel Siska <[email protected]>
4+
* @brief Implementation of FieldManager methods for registering and accessing fields.
5+
*
6+
* @copyright Copyright (c) 2025 CESNET, z.s.p.o.
7+
*/
8+
9+
#include "fieldManager.hpp"
10+
11+
#include "fieldGroup.hpp"
12+
13+
namespace ipxp::process {
14+
15+
void FieldManager::addField(
16+
std::vector<FieldDescriptor>& container,
17+
std::string_view group,
18+
std::string_view name,
19+
std::size_t bitIndex,
20+
GenericValueGetter getter)
21+
{
22+
container.emplace_back(FieldDescriptor(
23+
FieldInfo {
24+
.group = std::string(group),
25+
.name = std::string(name),
26+
.bitIndex = bitIndex,
27+
.getter = std::move(getter),
28+
}));
29+
}
30+
31+
[[nodiscard]] FieldGroup FieldManager::createFieldGroup(std::string_view groupName)
32+
{
33+
return FieldGroup(groupName, *this);
34+
}
35+
36+
const std::vector<FieldDescriptor>& FieldManager::getBiflowFields() const noexcept
37+
{
38+
return m_biflowFields;
39+
}
40+
41+
const std::vector<FieldDescriptor>& FieldManager::getReverseBiflowFields() const noexcept
42+
{
43+
return m_reverseBiflowFields;
44+
}
45+
46+
const std::vector<FieldDescriptor>& FieldManager::getUniflowForwardFields() const noexcept
47+
{
48+
return m_uniflowForwardFields;
49+
}
50+
51+
const std::vector<FieldDescriptor>& FieldManager::getUniflowReverseFields() const noexcept
52+
{
53+
return m_uniflowReverseFields;
54+
}
55+
56+
[[nodiscard]] FieldHandler FieldManager::registerField(
57+
std::string_view groupName,
58+
std::string_view fieldName,
59+
GenericValueGetter getter)
60+
{
61+
const auto bitIndex = getNextBitIndex();
62+
const FieldHandler fieldHandler(bitIndex);
63+
64+
// biflow
65+
addField(m_biflowFields, groupName, fieldName, bitIndex, getter);
66+
67+
// reverse biflow
68+
addField(m_reverseBiflowFields, groupName, fieldName, bitIndex, getter);
69+
70+
// forward uniflow
71+
addField(m_uniflowForwardFields, groupName, fieldName, bitIndex, getter);
72+
73+
// reverse uniflow
74+
addField(m_uniflowReverseFields, groupName, fieldName, bitIndex, getter);
75+
76+
return fieldHandler;
77+
}
78+
79+
[[nodiscard]] std::pair<FieldHandler, FieldHandler> FieldManager::registerDirectionalPairFields(
80+
std::string_view groupName,
81+
std::string_view forwardFieldName,
82+
std::string_view reverseFieldName,
83+
GenericValueGetter forwardGetter,
84+
GenericValueGetter reverseGetter)
85+
{
86+
const auto forwardBitIndex = getNextBitIndex();
87+
const auto reverseBitIndex = getNextBitIndex();
88+
89+
const FieldHandler forwardFieldHandler(forwardBitIndex);
90+
const FieldHandler reverseFieldHandler(reverseBitIndex);
91+
92+
// biflow
93+
addField(m_biflowFields, groupName, forwardFieldName, forwardBitIndex, forwardGetter);
94+
addField(m_biflowFields, groupName, reverseFieldName, reverseBitIndex, reverseGetter);
95+
96+
// reverse biflow
97+
addField(m_reverseBiflowFields, groupName, forwardFieldName, reverseBitIndex, reverseGetter);
98+
addField(m_reverseBiflowFields, groupName, reverseFieldName, forwardBitIndex, forwardGetter);
99+
100+
// forward uniflow
101+
addField(m_uniflowForwardFields, groupName, forwardFieldName, forwardBitIndex, forwardGetter);
102+
103+
// reverse uniflow
104+
addField(m_uniflowReverseFields, groupName, forwardFieldName, reverseBitIndex, reverseGetter);
105+
106+
return {forwardFieldHandler, reverseFieldHandler};
107+
}
108+
109+
[[nodiscard]] std::pair<FieldHandler, FieldHandler> FieldManager::registerBiflowPairFields(
110+
std::string_view groupName,
111+
std::string_view aFieldName,
112+
std::string_view bFieldName,
113+
GenericValueGetter aGetter,
114+
GenericValueGetter bGetter)
115+
{
116+
const std::size_t aBitIndex = getNextBitIndex();
117+
const std::size_t bBitIndex = getNextBitIndex();
118+
119+
const FieldHandler aFieldHandler(aBitIndex);
120+
const FieldHandler bFieldHandler(bBitIndex);
121+
122+
// biflow
123+
addField(m_biflowFields, groupName, aFieldName, aBitIndex, aGetter);
124+
addField(m_biflowFields, groupName, bFieldName, bBitIndex, bGetter);
125+
126+
// reverse biflow
127+
addField(m_reverseBiflowFields, groupName, aFieldName, bBitIndex, bGetter);
128+
addField(m_reverseBiflowFields, groupName, bFieldName, aBitIndex, aGetter);
129+
130+
// forward uniflow
131+
addField(m_uniflowForwardFields, groupName, aFieldName, aBitIndex, aGetter);
132+
addField(m_uniflowForwardFields, groupName, bFieldName, bBitIndex, bGetter);
133+
134+
// reverse uniflow
135+
addField(m_uniflowReverseFields, groupName, aFieldName, bBitIndex, bGetter);
136+
addField(m_uniflowReverseFields, groupName, bFieldName, aBitIndex, aGetter);
137+
138+
return {aFieldHandler, bFieldHandler};
139+
}
140+
141+
} // namespace ipxp::process

src/core/ipfixprobe.cpp

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
#include <poll.h>
4646
#include <signal.h>
4747
#include <unistd.h>
48+
#include <sys/types.h>
49+
#include <sys/socket.h>
4850

4951
namespace ipxp {
5052

@@ -250,7 +252,7 @@ void set_thread_details(pthread_t thread, const std::string& name, const std::ve
250252

251253
bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
252254
{
253-
OutputPlugin::ProcessPlugins processPlugins;
255+
//OutputPlugin::ProcessPlugins processPlugins;
254256
std::string storage_name = "cache";
255257
std::string storage_params = "";
256258
std::string output_name = "ipfix";
@@ -276,7 +278,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
276278

277279
// Process
278280
for (auto& it : parser.m_process) {
279-
std::shared_ptr<ProcessPlugin> processPlugin;
281+
//std::shared_ptr<ProcessPlugin> processPlugin;
280282
std::string process_params;
281283
std::string process_name;
282284
std::vector<int> affinity;
@@ -287,8 +289,8 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
287289
"inside "
288290
"input threads)");
289291
}
290-
for (auto& it : processPlugins) {
291-
std::string plugin_name = it.first;
292+
for (auto& it : conf.manager.getEntries()) {
293+
std::string plugin_name = it.name;
292294
if (plugin_name == process_name) {
293295
throw IPXPError(process_name + " plugin was specified multiple times");
294296
}
@@ -298,14 +300,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
298300
}
299301

300302
try {
301-
auto& processPluginFactory = ProcessPluginFactory::getInstance();
302-
const int pluginID = ProcessPluginIDGenerator::instance().generatePluginID();
303-
processPlugin
304-
= processPluginFactory.createShared(process_name, process_params, pluginID);
305-
if (processPlugin == nullptr) {
306-
throw IPXPError("invalid process plugin " + process_name);
307-
}
308-
processPlugins.emplace_back(process_name, processPlugin);
303+
conf.manager.addProcessPlugin(process_name, process_params);
309304
} catch (PluginError& e) {
310305
throw IPXPError(process_name + std::string(": ") + e.what());
311306
} catch (PluginExit& e) {
@@ -334,7 +329,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
334329

335330
try {
336331
auto& outputPluginFactory = OutputPluginFactory::getInstance();
337-
outputPlugin = outputPluginFactory.createShared(output_name, output_params, processPlugins);
332+
outputPlugin = outputPluginFactory.createShared(output_name, output_params, conf.manager.getFieldManager(), conf.manager.getEntries());
338333
if (outputPlugin == nullptr) {
339334
throw IPXPError("invalid output plugin " + output_name);
340335
}
@@ -412,7 +407,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
412407
try {
413408
auto& storagePluginFactory = StoragePluginFactory::getInstance();
414409
storagePlugin
415-
= storagePluginFactory.createShared(storage_name, storage_params, output_queue);
410+
= storagePluginFactory.createShared(storage_name, storage_params, output_queue, conf.manager);
416411
if (storagePlugin == nullptr) {
417412
throw IPXPError("invalid storage plugin " + storage_name);
418413
}
@@ -427,11 +422,11 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
427422
}
428423

429424
std::vector<ProcessPlugin*> storage_process_plugins;
430-
for (auto& it : processPlugins) {
431-
ProcessPlugin* tmp = it.second->copy();
432-
storagePlugin->add_plugin(tmp);
425+
for (auto& it : conf.manager.getEntries()) {
426+
ProcessPlugin* tmp = it.plugin.get();
427+
//storagePlugin->add_plugin(tmp);
433428
conf.active.process.push_back(tmp);
434-
conf.active.all.push_back(tmp);
429+
//conf.active.all.push_back(tmp);
435430
storage_process_plugins.push_back(tmp);
436431
}
437432

@@ -479,7 +474,7 @@ void finish(ipxp_conf_t& conf)
479474
// Terminate all storages
480475
for (auto& it : conf.pipelines) {
481476
for (auto& itp : it.storage.plugins) {
482-
itp->close();
477+
//itp->close(); TODO
483478
}
484479
}
485480

@@ -596,7 +591,7 @@ void serve_stat_clients(ipxp_conf_t& conf, struct pollfd pfds[2])
596591
}
597592

598593
if (pfds[0].revents & POLL_IN) {
599-
int fd = accept(pfds[0].fd, NULL, NULL);
594+
int fd = accept(pfds[-1].fd, NULL, NULL);
600595
if (pfds[1].fd == -1) {
601596
pfds[1].fd = fd;
602597
} else if (fd != -1) {

src/core/ipfixprobe.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@
4343
#include <appFs.hpp>
4444
#include <ipfixprobe/inputPlugin.hpp>
4545
#include <ipfixprobe/options.hpp>
46-
#include <ipfixprobe/outputPlugin.hpp>
46+
#include <outputPlugin.hpp>
4747
#include <ipfixprobe/packet.hpp>
4848
#include <ipfixprobe/plugin.hpp>
49-
#include <ipfixprobe/processPlugin.hpp>
49+
#include <processPlugin.hpp>
5050
#include <ipfixprobe/ring.h>
5151
#include <ipfixprobe/storagePlugin.hpp>
5252
#include <ipfixprobe/utils.hpp>
@@ -314,7 +314,8 @@ struct ipxp_conf_t {
314314
std::vector<std::shared_ptr<InputPlugin>> inputPlugins;
315315
std::vector<std::shared_ptr<StoragePlugin>> storagePlugins;
316316
std::shared_ptr<OutputPlugin> outputPlugin;
317-
317+
FieldManager fieldManager;
318+
ProcessPluginManager manager;
318319
PluginManager pluginManager;
319320
struct Plugins {
320321
std::vector<InputPlugin*> input;
@@ -352,6 +353,7 @@ struct ipxp_conf_t {
352353
, worker_cnt(0)
353354
, fps(0)
354355
, max_pkts(0)
356+
, manager(fieldManager)
355357
, pluginManager(false)
356358
, pkt_bufsize(1600)
357359
, blocks_cnt(0)

src/core/workers.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -173,25 +173,27 @@ void output_worker(
173173
while (1) {
174174
gettimeofday(&end, nullptr);
175175

176-
Flow* flow = static_cast<Flow*>(ipx_ring_pop(queue));
177-
if (!flow) {
176+
FlowRecordUniquePtr* flowPtr = static_cast<FlowRecordUniquePtr*>(ipx_ring_pop(queue));
177+
if (!flowPtr) {
178178
if (end.tv_sec - last_flush.tv_sec > 1) {
179179
last_flush = end;
180-
outputPlugin->flush();
180+
//outputPlugin->flush(); TODO flush ?
181181
}
182182
if (terminate_export && !ipx_ring_cnt(queue)) {
183183
break;
184184
}
185185
continue;
186186
}
187187

188+
FlowRecordUniquePtr& flow = *flowPtr;
189+
188190
stats.biflows++;
189-
stats.bytes += flow->src_bytes + flow->dst_bytes;
190-
stats.packets += flow->src_packets + flow->dst_packets;
191-
stats.dropped = outputPlugin->m_flows_dropped;
191+
stats.bytes += flow->directionalData[Direction::Forward].bytes + flow->directionalData[Direction::Reverse].bytes;
192+
stats.packets += flow->directionalData[Direction::Forward].packets + flow->directionalData[Direction::Reverse].packets;
193+
stats.dropped = outputPlugin->getDroppedCount();
192194
out_stats->store(stats);
193195
try {
194-
outputPlugin->export_flow(*flow);
196+
outputPlugin->processRecord(flow);
195197
} catch (PluginError& e) {
196198
res.error = true;
197199
res.msg = e.what();
@@ -231,8 +233,8 @@ void output_worker(
231233
}
232234
}
233235

234-
outputPlugin->flush();
235-
stats.dropped = outputPlugin->m_flows_dropped;
236+
//outputPlugin->flush(); TODO flush ?
237+
stats.dropped = outputPlugin->getDroppedCount();
236238
out_stats->store(stats);
237239
out->set_value(res);
238240
}

src/core/workers.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@
3535
#include <future>
3636

3737
#include <ipfixprobe/inputPlugin.hpp>
38-
#include <ipfixprobe/outputPlugin.hpp>
38+
#include <outputPlugin.hpp>
3939
#include <ipfixprobe/packet.hpp>
40-
#include <ipfixprobe/processPlugin.hpp>
40+
#include <processPlugin.hpp>
4141
#include <ipfixprobe/ring.h>
4242
#include <ipfixprobe/storagePlugin.hpp>
4343

0 commit comments

Comments
 (0)