Skip to content

Commit 2c324ab

Browse files
Pavel SiskaPavel Šiška
authored andcommitted
ipfixprobe - refactor ipfixprobe and workers
1 parent a640de1 commit 2c324ab

File tree

4 files changed

+114
-114
lines changed

4 files changed

+114
-114
lines changed

src/core/ipfixprobe.cpp

Lines changed: 79 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@
2626
*
2727
*/
2828

29-
#include "buildConfig.hpp"
3029
#include "ipfixprobe.hpp"
3130

31+
#include "buildConfig.hpp"
32+
3233
#include <fstream>
3334
#include <future>
3435
#include <iomanip>
@@ -65,6 +66,7 @@ const uint32_t DEFAULT_FPS = 0; // unlimited
6566
*/
6667
void signal_handler(int sig)
6768
{
69+
(void) sig;
6870
#ifdef WITH_LIBUNWIND
6971
if (sig == SIGSEGV) {
7072
st_dump(STDERR_FILENO, sig);
@@ -100,7 +102,31 @@ static void printPluginsUsage(const std::vector<PluginManifest>& pluginsManifest
100102
}
101103
}
102104

103-
void print_help(ipxp_conf_t& conf, const std::string& arg)
105+
void printRegisteredPlugins(
106+
const std::string& pluginType,
107+
const std::vector<PluginManifest>& pluginsManifest)
108+
{
109+
std::cout << "Registered " << pluginType << " plugins:" << std::endl;
110+
for (const auto& pluginManifest : pluginsManifest) {
111+
std::cout << " " << pluginManifest.name << std::endl;
112+
}
113+
std::cout << "#####################\n";
114+
}
115+
116+
void printPlugins()
117+
{
118+
auto& inputPluginFactory = InputPluginFactory::getInstance();
119+
auto& storagePluginFactory = StoragePluginFactory::getInstance();
120+
auto& processPluginFactory = ProcessPluginFactory::getInstance();
121+
auto& outputPluginFactory = OutputPluginFactory::getInstance();
122+
123+
printRegisteredPlugins("input", inputPluginFactory.getRegisteredPlugins());
124+
printRegisteredPlugins("storage", storagePluginFactory.getRegisteredPlugins());
125+
printRegisteredPlugins("process", processPluginFactory.getRegisteredPlugins());
126+
printRegisteredPlugins("output", outputPluginFactory.getRegisteredPlugins());
127+
}
128+
129+
void print_help(const std::string& arg)
104130
{
105131
if (arg == "input") {
106132
auto& inputPluginFactory = InputPluginFactory::getInstance();
@@ -192,15 +218,7 @@ void set_thread_details(pthread_t thread, const std::string& name, const std::ve
192218

193219
bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
194220
{
195-
auto deleter = [&](OutputPlugin::Plugins* p) {
196-
for (auto& it : *p) {
197-
delete it.second;
198-
}
199-
delete p;
200-
};
201-
auto process_plugins = std::unique_ptr<OutputPlugin::Plugins, decltype(deleter)>(
202-
new OutputPlugin::Plugins(),
203-
deleter);
221+
OutputPlugin::ProcessPlugins processPlugins;
204222
std::string storage_name = "cache";
205223
std::string storage_params = "";
206224
std::string output_name = "ipfix";
@@ -226,7 +244,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
226244

227245
// Process
228246
for (auto& it : parser.m_process) {
229-
ProcessPlugin* process_plugin = nullptr;
247+
std::shared_ptr<ProcessPlugin> processPlugin;
230248
std::string process_params;
231249
std::string process_name;
232250
std::vector<int> affinity;
@@ -236,7 +254,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
236254
"cannot set CPU affinity for process plugin (process plugins are invoked inside "
237255
"input threads)");
238256
}
239-
for (auto& it : *process_plugins) {
257+
for (auto& it : processPlugins) {
240258
std::string plugin_name = it.first;
241259
if (plugin_name == process_name) {
242260
throw IPXPError(process_name + " plugin was specified multiple times");
@@ -245,25 +263,21 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
245263
if (process_name == BASIC_PLUGIN_NAME) {
246264
continue;
247265
}
248-
/*
266+
249267
try {
250-
process_plugin = dynamic_cast<ProcessPlugin*>(conf.mgr.get(process_name));
251-
if (process_plugin == nullptr) {
252-
throw IPXPError("invalid processing plugin " + process_name);
268+
auto& processPluginFactory = ProcessPluginFactory::getInstance();
269+
processPlugin = processPluginFactory.createShared(process_name, process_params);
270+
if (processPlugin == nullptr) {
271+
throw IPXPError("invalid process plugin " + process_name);
253272
}
254-
255-
process_plugin->init(process_params.c_str());
256-
process_plugins->push_back(std::make_pair(process_name, process_plugin));
273+
processPlugins.emplace_back(process_name, processPlugin);
257274
} catch (PluginError& e) {
258-
delete process_plugin;
259275
throw IPXPError(process_name + std::string(": ") + e.what());
260276
} catch (PluginExit& e) {
261-
delete process_plugin;
262277
return true;
263-
} catch (PluginManagerError& e) {
264-
throw IPXPError(process_name + std::string(": ") + e.what());
278+
} catch (std::runtime_error& ex) {
279+
throw IPXPError(process_name + std::string(": ") + ex.what());
265280
}
266-
*/
267281
}
268282

269283
// telemetry
@@ -281,40 +295,32 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
281295
auto statsFile = ipxRingTelemetryDir->addFile("stats", statsOps);
282296
conf.holder.add(statsFile);
283297

284-
OutputPlugin* output_plugin = nullptr;
285-
/*
298+
std::shared_ptr<OutputPlugin> outputPlugin;
299+
286300
try {
287-
output_plugin = dynamic_cast<OutputPlugin*>(conf.mgr.get(output_name));
288-
if (output_plugin == nullptr) {
289-
ipx_ring_destroy(output_queue);
301+
auto& outputPluginFactory = OutputPluginFactory::getInstance();
302+
outputPlugin = outputPluginFactory.createShared(output_name, output_params, processPlugins);
303+
if (outputPlugin == nullptr) {
290304
throw IPXPError("invalid output plugin " + output_name);
291305
}
292-
293-
output_plugin->init(output_params.c_str(), *process_plugins);
294-
conf.active.output.push_back(output_plugin);
295-
conf.active.all.push_back(output_plugin);
306+
conf.outputPlugin = outputPlugin;
296307
} catch (PluginError& e) {
297-
ipx_ring_destroy(output_queue);
298-
delete output_plugin;
299308
throw IPXPError(output_name + std::string(": ") + e.what());
300309
} catch (PluginExit& e) {
301-
ipx_ring_destroy(output_queue);
302-
delete output_plugin;
303310
return true;
304-
} catch (PluginManagerError& e) {
305-
throw IPXPError(output_name + std::string(": ") + e.what());
311+
} catch (std::runtime_error& ex) {
312+
throw IPXPError(output_name + std::string(": ") + ex.what());
306313
}
307-
*/
308314

309315
{
310316
std::promise<WorkerResult>* output_res = new std::promise<WorkerResult>();
311317
auto output_stats = new std::atomic<OutputStats>();
312318
conf.output_stats.push_back(output_stats);
313319
OutputWorker tmp
314-
= {output_plugin,
320+
= {outputPlugin,
315321
new std::thread(
316322
output_worker,
317-
output_plugin,
323+
outputPlugin,
318324
output_queue,
319325
output_res,
320326
output_stats,
@@ -336,8 +342,8 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
336342
auto flowcache_dir = conf.telemetry_root_node->addDir("flowcache");
337343
size_t pipeline_idx = 0;
338344
for (auto& it : parser.m_input) {
339-
InputPlugin* input_plugin = nullptr;
340-
StoragePlugin* storage_plugin = nullptr;
345+
std::shared_ptr<InputPlugin> inputPlugin;
346+
std::shared_ptr<StoragePlugin> storagePlugin;
341347
std::string input_params;
342348
std::string input_name;
343349
std::vector<int> affinity;
@@ -347,51 +353,43 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
347353
auto pipeline_queue_dir
348354
= pipeline_dir->addDir("queues")->addDir(std::to_string(pipeline_idx));
349355

350-
/*
351356
try {
352-
input_plugin = dynamic_cast<InputPlugin*>(conf.mgr.get(input_name));
353-
if (input_plugin == nullptr) {
357+
auto& inputPluginFactory = InputPluginFactory::getInstance();
358+
inputPlugin = inputPluginFactory.createShared(input_name, input_params);
359+
if (inputPlugin == nullptr) {
354360
throw IPXPError("invalid input plugin " + input_name);
355361
}
356-
input_plugin->init(input_params.c_str());
357-
input_plugin->set_telemetry_dirs(input_plugin_dir, pipeline_queue_dir);
358-
conf.active.input.push_back(input_plugin);
359-
conf.active.all.push_back(input_plugin);
362+
inputPlugin->set_telemetry_dirs(input_plugin_dir, pipeline_queue_dir);
363+
conf.inputPlugins.emplace_back(inputPlugin);
360364
} catch (PluginError& e) {
361-
delete input_plugin;
362365
throw IPXPError(input_name + std::string(": ") + e.what());
363366
} catch (PluginExit& e) {
364-
delete input_plugin;
365367
return true;
366-
} catch (PluginManagerError& e) {
367-
throw IPXPError(input_name + std::string(": ") + e.what());
368+
} catch (std::runtime_error& ex) {
369+
throw IPXPError(input_name + std::string(": ") + ex.what());
368370
}
369371

370372
try {
371-
storage_plugin = dynamic_cast<StoragePlugin*>(conf.mgr.get(storage_name));
372-
if (storage_plugin == nullptr) {
373+
auto& storagePluginFactory = StoragePluginFactory::getInstance();
374+
storagePlugin
375+
= storagePluginFactory.createShared(storage_name, storage_params, output_queue);
376+
if (storagePlugin == nullptr) {
373377
throw IPXPError("invalid storage plugin " + storage_name);
374378
}
375-
storage_plugin->set_queue(output_queue);
376-
storage_plugin->init(storage_params.c_str());
377-
storage_plugin->set_telemetry_dir(pipeline_queue_dir);
378-
conf.active.storage.push_back(storage_plugin);
379-
conf.active.all.push_back(storage_plugin);
379+
storagePlugin->set_telemetry_dir(pipeline_queue_dir);
380+
conf.storagePlugins.emplace_back(storagePlugin);
380381
} catch (PluginError& e) {
381-
delete storage_plugin;
382382
throw IPXPError(storage_name + std::string(": ") + e.what());
383383
} catch (PluginExit& e) {
384-
delete storage_plugin;
385384
return true;
386-
} catch (PluginManagerError& e) {
387-
throw IPXPError(storage_name + std::string(": ") + e.what());
385+
} catch (std::runtime_error& ex) {
386+
throw IPXPError(storage_name + std::string(": ") + ex.what());
388387
}
389-
*/
390388

391389
std::vector<ProcessPlugin*> storage_process_plugins;
392-
for (auto& it : *process_plugins) {
390+
for (auto& it : processPlugins) {
393391
ProcessPlugin* tmp = it.second->copy();
394-
storage_plugin->add_plugin(tmp);
392+
storagePlugin->add_plugin(tmp);
395393
conf.active.process.push_back(tmp);
396394
conf.active.all.push_back(tmp);
397395
storage_process_plugins.push_back(tmp);
@@ -404,18 +402,18 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
404402
conf.input_stats.push_back(input_stats);
405403

406404
WorkPipeline tmp
407-
= {{input_plugin,
405+
= {{inputPlugin,
408406
new std::thread(
409407
input_storage_worker,
410-
input_plugin,
411-
storage_plugin,
408+
inputPlugin,
409+
storagePlugin,
412410
conf.iqueue_size,
413411
conf.max_pkts,
414412
input_res,
415413
input_stats),
416414
input_res,
417415
input_stats},
418-
{storage_plugin, storage_process_plugins}};
416+
{storagePlugin, storage_process_plugins}};
419417
set_thread_details(
420418
tmp.input.thread->native_handle(),
421419
"in_" + std::to_string(pipeline_idx) + "_" + input_name,
@@ -435,7 +433,7 @@ void finish(ipxp_conf_t& conf)
435433
terminate_input = 1;
436434
for (auto& it : conf.pipelines) {
437435
it.input.thread->join();
438-
it.input.plugin->close();
436+
it.input.inputPlugin->close();
439437
}
440438

441439
// Terminate all storages
@@ -452,7 +450,7 @@ void finish(ipxp_conf_t& conf)
452450
}
453451

454452
for (auto& it : conf.pipelines) {
455-
it.storage.plugin->close();
453+
it.storage.storagePlugin->close();
456454
}
457455

458456
std::cout << "Input stats:" << std::endl
@@ -576,8 +574,8 @@ void main_loop(ipxp_conf_t& conf)
576574
}
577575

578576
struct pollfd pfds[2] = {
579-
{.fd = -1, .events = POLL_IN}, // Server
580-
{.fd = -1, .events = POLL_IN} // Client
577+
{.fd = -1, .events = POLL_IN, .revents = 0}, // Server
578+
{.fd = -1, .events = POLL_IN, .revents = 0} // Client
581579
};
582580

583581
std::string sock_path = create_sockpath(std::to_string(getpid()).c_str());
@@ -641,11 +639,13 @@ int run(int argc, char* argv[])
641639

642640
conf.pluginManager.loadPlugins("/usr/local/lib64/ipfixprobe/", loadPluginsRecursive);
643641

642+
printPlugins();
643+
644644
if (parser.m_help) {
645645
if (parser.m_help_str.empty()) {
646646
parser.usage(std::cout, 0, IPXP_APP_NAME);
647647
} else {
648-
print_help(conf, parser.m_help_str);
648+
print_help(parser.m_help_str);
649649
}
650650
goto EXIT;
651651
}

src/core/ipfixprobe.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ class IpfixprobeOptParser : public OptionsParser {
244244
"",
245245
"Run as a standalone process",
246246
[this](const char* arg) {
247+
(void) arg;
247248
m_daemon = true;
248249
return true;
249250
},
@@ -265,6 +266,7 @@ class IpfixprobeOptParser : public OptionsParser {
265266
"",
266267
"Show version and exit",
267268
[this](const char* arg) {
269+
(void) arg;
268270
m_version = true;
269271
return true;
270272
},
@@ -296,6 +298,10 @@ struct ipxp_conf_t {
296298
uint32_t fps;
297299
uint32_t max_pkts;
298300

301+
std::vector<std::shared_ptr<InputPlugin>> inputPlugins;
302+
std::vector<std::shared_ptr<StoragePlugin>> storagePlugins;
303+
std::shared_ptr<OutputPlugin> outputPlugin;
304+
299305
PluginManager pluginManager;
300306
struct Plugins {
301307
std::vector<InputPlugin*> input;
@@ -351,15 +357,10 @@ struct ipxp_conf_t {
351357
if (it.input.thread->joinable()) {
352358
it.input.thread->join();
353359
}
354-
delete it.input.plugin;
355360
delete it.input.thread;
356361
delete it.input.promise;
357362
}
358363

359-
for (auto& it : pipelines) {
360-
delete it.storage.plugin;
361-
}
362-
363364
for (auto& it : pipelines) {
364365
for (auto& itp : it.storage.plugins) {
365366
delete itp;
@@ -373,7 +374,6 @@ struct ipxp_conf_t {
373374
}
374375
delete it.thread;
375376
delete it.promise;
376-
delete it.plugin;
377377
ipx_ring_destroy(it.queue);
378378
}
379379

0 commit comments

Comments
 (0)