Skip to content

Commit 5faae61

Browse files
committed
ipfixprobe - refactor PluginManager
1 parent 00a7336 commit 5faae61

File tree

7 files changed

+250
-291
lines changed

7 files changed

+250
-291
lines changed

src/core/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ add_executable(ipfixprobe
33
ipfixprobe.cpp
44
ipfixprobe.hpp
55
options.cpp
6-
pluginmgr.cpp
7-
pluginmgr.hpp
86
ring.c
97
stacktrace.cpp
108
stacktrace.hpp
@@ -14,6 +12,8 @@ add_executable(ipfixprobe
1412
workers.cpp
1513
workers.hpp
1614
inputPlugin.cpp
15+
pluginManager.cpp
16+
pluginManager.hpp
1717
)
1818

1919

src/core/ipfixprobe.cpp

Lines changed: 35 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@
2626
*
2727
*/
2828

29-
#include "ipfixprobe.hpp"
30-
3129
#include "buildConfig.hpp"
30+
#include "ipfixprobe.hpp"
3231

3332
#include <fstream>
3433
#include <future>
@@ -47,6 +46,8 @@
4746
#endif
4847
#include "stats.hpp"
4948

49+
#include <ipfixprobe/pluginFactory/pluginFactory.hpp>
50+
5051
namespace ipxp {
5152

5253
volatile sig_atomic_t stop = 0;
@@ -90,56 +91,38 @@ void error(std::string msg)
9091
std::cerr << "Error: " << msg << std::endl;
9192
}
9293

93-
template<typename T>
94-
static void print_plugins_help(std::vector<Plugin*>& plugins)
94+
static void printPluginsUsage(const std::vector<PluginManifest>& pluginsManifest)
9595
{
96-
for (auto& it : plugins) {
97-
if (dynamic_cast<T*>(it)) {
98-
OptionsParser* parser = it->get_parser();
99-
parser->usage(std::cout);
100-
std::cout << std::endl;
101-
delete parser;
96+
for (const auto& pluginManifest : pluginsManifest) {
97+
if (pluginManifest.usage) {
98+
pluginManifest.usage();
10299
}
103100
}
104101
}
105102

106103
void print_help(ipxp_conf_t& conf, const std::string& arg)
107104
{
108-
auto deleter = [&](std::vector<Plugin*>* p) {
109-
for (auto& it : *p) {
110-
delete it;
111-
}
112-
delete p;
113-
};
114-
auto plugins = std::unique_ptr<std::vector<Plugin*>, decltype(deleter)>(
115-
new std::vector<Plugin*>(conf.mgr.get()),
116-
deleter);
117-
118105
if (arg == "input") {
119-
print_plugins_help<InputPlugin>(*plugins);
120-
} else if (arg == "storage") {
121-
print_plugins_help<StoragePlugin>(*plugins);
122-
} else if (arg == "output") {
123-
print_plugins_help<OutputPlugin>(*plugins);
124-
} else if (arg == "process") {
125-
print_plugins_help<ProcessPlugin>(*plugins);
126-
} else {
127-
Plugin* p;
128-
try {
129-
p = conf.mgr.get(arg);
130-
if (p == nullptr) {
131-
std::cout << "No help available for " << arg << std::endl;
132-
return;
133-
}
134-
} catch (PluginManagerError& e) {
135-
error(std::string("when loading plugin: ") + e.what());
136-
return;
137-
}
138-
OptionsParser* parser = p->get_parser();
139-
parser->usage(std::cout);
140-
delete parser;
141-
delete p;
106+
auto& inputPluginFactory = InputPluginFactory::getInstance();
107+
return printPluginsUsage(inputPluginFactory.getRegisteredPlugins());
108+
}
109+
110+
if (arg == "storage") {
111+
auto& storagePluginFactory = StoragePluginFactory::getInstance();
112+
return printPluginsUsage(storagePluginFactory.getRegisteredPlugins());
142113
}
114+
115+
if (arg == "output") {
116+
auto& outputPluginFactory = OutputPluginFactory::getInstance();
117+
return printPluginsUsage(outputPluginFactory.getRegisteredPlugins());
118+
}
119+
120+
if (arg == "process") {
121+
auto& processPluginFactory = ProcessPluginFactory::getInstance();
122+
return printPluginsUsage(processPluginFactory.getRegisteredPlugins());
123+
}
124+
125+
std::cerr << "No help available for " << arg << std::endl;
143126
}
144127

145128
void process_plugin_argline(
@@ -262,6 +245,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
262245
if (process_name == BASIC_PLUGIN_NAME) {
263246
continue;
264247
}
248+
/*
265249
try {
266250
process_plugin = dynamic_cast<ProcessPlugin*>(conf.mgr.get(process_name));
267251
if (process_plugin == nullptr) {
@@ -279,6 +263,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
279263
} catch (PluginManagerError& e) {
280264
throw IPXPError(process_name + std::string(": ") + e.what());
281265
}
266+
*/
282267
}
283268

284269
// telemetry
@@ -297,6 +282,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
297282
conf.holder.add(statsFile);
298283

299284
OutputPlugin* output_plugin = nullptr;
285+
/*
300286
try {
301287
output_plugin = dynamic_cast<OutputPlugin*>(conf.mgr.get(output_name));
302288
if (output_plugin == nullptr) {
@@ -318,6 +304,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
318304
} catch (PluginManagerError& e) {
319305
throw IPXPError(output_name + std::string(": ") + e.what());
320306
}
307+
*/
321308

322309
{
323310
std::promise<WorkerResult>* output_res = new std::promise<WorkerResult>();
@@ -360,6 +347,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
360347
auto pipeline_queue_dir
361348
= pipeline_dir->addDir("queues")->addDir(std::to_string(pipeline_idx));
362349

350+
/*
363351
try {
364352
input_plugin = dynamic_cast<InputPlugin*>(conf.mgr.get(input_name));
365353
if (input_plugin == nullptr) {
@@ -398,6 +386,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
398386
} catch (PluginManagerError& e) {
399387
throw IPXPError(storage_name + std::string(": ") + e.what());
400388
}
389+
*/
401390

402391
std::vector<ProcessPlugin*> storage_process_plugins;
403392
for (auto& it : *process_plugins) {
@@ -638,6 +627,7 @@ int run(int argc, char* argv[])
638627
IpfixprobeOptParser parser;
639628
ipxp_conf_t conf;
640629
int status = EXIT_SUCCESS;
630+
const bool loadPluginsRecursive = true;
641631

642632
register_handlers();
643633

@@ -649,6 +639,8 @@ int run(int argc, char* argv[])
649639
goto EXIT;
650640
}
651641

642+
conf.pluginManager.loadPlugins("/usr/local/lib64/ipfixprobe/", loadPluginsRecursive);
643+
652644
if (parser.m_help) {
653645
if (parser.m_help_str.empty()) {
654646
parser.usage(std::cout, 0, IPXP_APP_NAME);

src/core/ipfixprobe.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
#ifndef IPXP_IPFIXPROBE_HPP
3030
#define IPXP_IPFIXPROBE_HPP
3131

32-
#include "pluginmgr.hpp"
32+
#include "pluginManager.hpp"
3333
#include "workers.hpp"
3434

3535
#include <atomic>
@@ -296,7 +296,7 @@ struct ipxp_conf_t {
296296
uint32_t fps;
297297
uint32_t max_pkts;
298298

299-
PluginManager mgr;
299+
PluginManager pluginManager;
300300
struct Plugins {
301301
std::vector<InputPlugin*> input;
302302
std::vector<StoragePlugin*> storage;
@@ -333,6 +333,7 @@ struct ipxp_conf_t {
333333
, worker_cnt(0)
334334
, fps(0)
335335
, max_pkts(0)
336+
, pluginManager(false)
336337
, pkt_bufsize(1600)
337338
, blocks_cnt(0)
338339
, pkts_cnt(0)

src/core/pluginManager.cpp

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/**
2+
* @file
3+
* @brief Implementation of PluginManager class for loading and unloading plugins.
4+
* @author Pavel Siska <[email protected]>
5+
* @date 2025
6+
*
7+
* Copyright (c) 2025 CESNET
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*/
11+
12+
#include "pluginManager.hpp"
13+
14+
#include <filesystem>
15+
#include <iostream>
16+
#include <stdexcept>
17+
#include <variant>
18+
19+
#include <dlfcn.h>
20+
21+
namespace ipxp {
22+
23+
namespace fs = std::filesystem;
24+
25+
static void clearErrorMessage()
26+
{
27+
dlerror();
28+
}
29+
30+
static void* openSharedObject(const std::string& path)
31+
{
32+
const int dlFlags = RTLD_LAZY | RTLD_LOCAL;
33+
return dlopen(path.c_str(), dlFlags);
34+
}
35+
36+
template<typename Iterator>
37+
static bool isValidPlugin(const Iterator& entry)
38+
{
39+
return entry.is_regular_file() && entry.path().extension() == ".so";
40+
}
41+
42+
static void loadPluginsRecursive(const std::string& dirPath, PluginManager& pluginManager)
43+
{
44+
for (const auto& entry : fs::recursive_directory_iterator(dirPath)) {
45+
if (isValidPlugin(entry)) {
46+
pluginManager.loadPlugin(entry.path().string());
47+
}
48+
}
49+
}
50+
51+
static void loadPluginsNonRecursive(const std::string& dirPath, PluginManager& pluginManager)
52+
{
53+
for (const auto& entry : fs::directory_iterator(dirPath)) {
54+
if (isValidPlugin(entry)) {
55+
pluginManager.loadPlugin(entry.path().string());
56+
}
57+
}
58+
}
59+
60+
PluginManager::PluginManager(bool unloadAtExit)
61+
: m_unloadAtExit(unloadAtExit)
62+
{
63+
}
64+
65+
void PluginManager::loadPlugins(const std::string& dirPath, bool recursive)
66+
{
67+
const auto loadPluginsFunction = recursive ? loadPluginsRecursive : loadPluginsNonRecursive;
68+
69+
try {
70+
loadPluginsFunction(dirPath, *this);
71+
} catch (const fs::filesystem_error& ex) {
72+
std::cerr << "Error accessing directory '" << dirPath << "': " << ex.what() << std::endl;
73+
// m_logger->error("Error accessing directory '{}': {}", dirPath, ex.what());
74+
throw std::runtime_error("PluginManager::loadPlugins() has failed.");
75+
}
76+
}
77+
78+
void PluginManager::loadPlugin(const std::string& pluginPath)
79+
{
80+
clearErrorMessage();
81+
void* pluginHandle = openSharedObject(pluginPath);
82+
83+
if (!pluginHandle) {
84+
std::cerr << "Error loading plugin '" << pluginPath << "': " << dlerror() << std::endl;
85+
// m_logger->error(std::string(dlerror()));
86+
throw std::runtime_error("PluginManager::loadPlugin() has failed.");
87+
}
88+
89+
std::cerr << "Plugin '" << pluginPath << "' loaded." << std::endl;
90+
// m_logger->info("Plugin '{}' loaded.", pluginPath);
91+
92+
m_pluginHandles.emplace_back(pluginHandle);
93+
}
94+
95+
PluginManager::~PluginManager()
96+
{
97+
if (m_unloadAtExit) {
98+
unloadPlugins();
99+
}
100+
}
101+
102+
void PluginManager::unloadPlugins()
103+
{
104+
for (auto handle : m_pluginHandles) {
105+
dlclose(handle);
106+
}
107+
m_pluginHandles.clear();
108+
}
109+
110+
} // namespace ipxp

0 commit comments

Comments
 (0)