Skip to content

Commit 0499d27

Browse files
author
Pavel Siska
committed
ipfixprobe - refactor ipfixprobe and workers
1 parent e1af865 commit 0499d27

File tree

4 files changed

+113
-115
lines changed

4 files changed

+113
-115
lines changed

src/core/ipfixprobe.cpp

Lines changed: 77 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ const uint32_t DEFAULT_FPS = 0; // unlimited
6565
*/
6666
void signal_handler(int sig)
6767
{
68+
(void) sig;
6869
#ifdef WITH_LIBUNWIND
6970
if (sig == SIGSEGV) {
7071
st_dump(STDERR_FILENO, sig);
@@ -100,7 +101,31 @@ static void printPluginsUsage(const std::vector<PluginManifest>& pluginsManifest
100101
}
101102
}
102103

103-
void print_help(ipxp_conf_t& conf, const std::string& arg)
104+
void printRegisteredPlugins(
105+
const std::string& pluginType,
106+
const std::vector<PluginManifest>& pluginsManifest)
107+
{
108+
std::cout << "Registered " << pluginType << " plugins:" << std::endl;
109+
for (const auto& pluginManifest : pluginsManifest) {
110+
std::cout << " " << pluginManifest.name << std::endl;
111+
}
112+
std::cout << "#####################\n";
113+
}
114+
115+
void printPlugins()
116+
{
117+
auto& inputPluginFactory = InputPluginFactory::getInstance();
118+
auto& storagePluginFactory = StoragePluginFactory::getInstance();
119+
auto& processPluginFactory = ProcessPluginFactory::getInstance();
120+
auto& outputPluginFactory = OutputPluginFactory::getInstance();
121+
122+
printRegisteredPlugins("input", inputPluginFactory.getRegisteredPlugins());
123+
printRegisteredPlugins("storage", storagePluginFactory.getRegisteredPlugins());
124+
printRegisteredPlugins("process", processPluginFactory.getRegisteredPlugins());
125+
printRegisteredPlugins("output", outputPluginFactory.getRegisteredPlugins());
126+
}
127+
128+
void print_help(const std::string& arg)
104129
{
105130
if (arg == "input") {
106131
auto& inputPluginFactory = InputPluginFactory::getInstance();
@@ -192,15 +217,7 @@ void set_thread_details(pthread_t thread, const std::string& name, const std::ve
192217

193218
bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
194219
{
195-
auto deleter = [&](OutputPlugin::Plugins* p) {
196-
for (auto& it : *p) {
197-
delete it.second;
198-
}
199-
delete p;
200-
};
201-
auto process_plugins = std::unique_ptr<OutputPlugin::Plugins, decltype(deleter)>(
202-
new OutputPlugin::Plugins(),
203-
deleter);
220+
OutputPlugin::ProcessPlugins processPlugins;
204221
std::string storage_name = "cache";
205222
std::string storage_params = "";
206223
std::string output_name = "ipfix";
@@ -226,7 +243,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
226243

227244
// Process
228245
for (auto& it : parser.m_process) {
229-
ProcessPlugin* process_plugin = nullptr;
246+
std::shared_ptr<ProcessPlugin> processPlugin;
230247
std::string process_params;
231248
std::string process_name;
232249
std::vector<int> affinity;
@@ -236,7 +253,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
236253
"cannot set CPU affinity for process plugin (process plugins are invoked inside "
237254
"input threads)");
238255
}
239-
for (auto& it : *process_plugins) {
256+
for (auto& it : processPlugins) {
240257
std::string plugin_name = it.first;
241258
if (plugin_name == process_name) {
242259
throw IPXPError(process_name + " plugin was specified multiple times");
@@ -245,25 +262,21 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
245262
if (process_name == BASIC_PLUGIN_NAME) {
246263
continue;
247264
}
248-
/*
265+
249266
try {
250-
process_plugin = dynamic_cast<ProcessPlugin*>(conf.mgr.get(process_name));
251-
if (process_plugin == nullptr) {
252-
throw IPXPError("invalid processing plugin " + process_name);
267+
auto& processPluginFactory = ProcessPluginFactory::getInstance();
268+
processPlugin = processPluginFactory.createShared(process_name, process_params);
269+
if (processPlugin == nullptr) {
270+
throw IPXPError("invalid process plugin " + process_name);
253271
}
254-
255-
process_plugin->init(process_params.c_str());
256-
process_plugins->push_back(std::make_pair(process_name, process_plugin));
272+
processPlugins.emplace_back(process_name, processPlugin);
257273
} catch (PluginError& e) {
258-
delete process_plugin;
259274
throw IPXPError(process_name + std::string(": ") + e.what());
260275
} catch (PluginExit& e) {
261-
delete process_plugin;
262276
return true;
263-
} catch (PluginManagerError& e) {
264-
throw IPXPError(process_name + std::string(": ") + e.what());
277+
} catch (std::runtime_error& ex) {
278+
throw IPXPError(process_name + std::string(": ") + ex.what());
265279
}
266-
*/
267280
}
268281

269282
// telemetry
@@ -281,40 +294,32 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
281294
auto statsFile = ipxRingTelemetryDir->addFile("stats", statsOps);
282295
conf.holder.add(statsFile);
283296

284-
OutputPlugin* output_plugin = nullptr;
285-
/*
297+
std::shared_ptr<OutputPlugin> outputPlugin;
298+
286299
try {
287-
output_plugin = dynamic_cast<OutputPlugin*>(conf.mgr.get(output_name));
288-
if (output_plugin == nullptr) {
289-
ipx_ring_destroy(output_queue);
300+
auto& outputPluginFactory = OutputPluginFactory::getInstance();
301+
outputPlugin = outputPluginFactory.createShared(output_name, output_params, processPlugins);
302+
if (outputPlugin == nullptr) {
290303
throw IPXPError("invalid output plugin " + output_name);
291304
}
292-
293-
output_plugin->init(output_params.c_str(), *process_plugins);
294-
conf.active.output.push_back(output_plugin);
295-
conf.active.all.push_back(output_plugin);
305+
conf.outputPlugin = outputPlugin;
296306
} catch (PluginError& e) {
297-
ipx_ring_destroy(output_queue);
298-
delete output_plugin;
299307
throw IPXPError(output_name + std::string(": ") + e.what());
300308
} catch (PluginExit& e) {
301-
ipx_ring_destroy(output_queue);
302-
delete output_plugin;
303309
return true;
304-
} catch (PluginManagerError& e) {
305-
throw IPXPError(output_name + std::string(": ") + e.what());
310+
} catch (std::runtime_error& ex) {
311+
throw IPXPError(output_name + std::string(": ") + ex.what());
306312
}
307-
*/
308313

309314
{
310315
std::promise<WorkerResult>* output_res = new std::promise<WorkerResult>();
311316
auto output_stats = new std::atomic<OutputStats>();
312317
conf.output_stats.push_back(output_stats);
313318
OutputWorker tmp
314-
= {output_plugin,
319+
= {outputPlugin,
315320
new std::thread(
316321
output_worker,
317-
output_plugin,
322+
outputPlugin,
318323
output_queue,
319324
output_res,
320325
output_stats,
@@ -336,8 +341,8 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
336341
auto flowcache_dir = conf.telemetry_root_node->addDir("flowcache");
337342
size_t pipeline_idx = 0;
338343
for (auto& it : parser.m_input) {
339-
InputPlugin* input_plugin = nullptr;
340-
StoragePlugin* storage_plugin = nullptr;
344+
std::shared_ptr<InputPlugin> inputPlugin;
345+
std::shared_ptr<StoragePlugin> storagePlugin;
341346
std::string input_params;
342347
std::string input_name;
343348
std::vector<int> affinity;
@@ -347,51 +352,43 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
347352
auto pipeline_queue_dir
348353
= pipeline_dir->addDir("queues")->addDir(std::to_string(pipeline_idx));
349354

350-
/*
351355
try {
352-
input_plugin = dynamic_cast<InputPlugin*>(conf.mgr.get(input_name));
353-
if (input_plugin == nullptr) {
356+
auto& inputPluginFactory = InputPluginFactory::getInstance();
357+
inputPlugin = inputPluginFactory.createShared(input_name, input_params);
358+
if (inputPlugin == nullptr) {
354359
throw IPXPError("invalid input plugin " + input_name);
355360
}
356-
input_plugin->init(input_params.c_str());
357-
input_plugin->set_telemetry_dirs(input_plugin_dir, pipeline_queue_dir);
358-
conf.active.input.push_back(input_plugin);
359-
conf.active.all.push_back(input_plugin);
361+
inputPlugin->set_telemetry_dirs(input_plugin_dir, pipeline_queue_dir);
362+
conf.inputPlugins.emplace_back(inputPlugin);
360363
} catch (PluginError& e) {
361-
delete input_plugin;
362364
throw IPXPError(input_name + std::string(": ") + e.what());
363365
} catch (PluginExit& e) {
364-
delete input_plugin;
365366
return true;
366-
} catch (PluginManagerError& e) {
367-
throw IPXPError(input_name + std::string(": ") + e.what());
367+
} catch (std::runtime_error& ex) {
368+
throw IPXPError(input_name + std::string(": ") + ex.what());
368369
}
369370

370371
try {
371-
storage_plugin = dynamic_cast<StoragePlugin*>(conf.mgr.get(storage_name));
372-
if (storage_plugin == nullptr) {
372+
auto& storagePluginFactory = StoragePluginFactory::getInstance();
373+
storagePlugin
374+
= storagePluginFactory.createShared(storage_name, storage_params, output_queue);
375+
if (storagePlugin == nullptr) {
373376
throw IPXPError("invalid storage plugin " + storage_name);
374377
}
375-
storage_plugin->set_queue(output_queue);
376-
storage_plugin->init(storage_params.c_str());
377-
storage_plugin->set_telemetry_dir(pipeline_queue_dir);
378-
conf.active.storage.push_back(storage_plugin);
379-
conf.active.all.push_back(storage_plugin);
378+
storagePlugin->set_telemetry_dir(pipeline_queue_dir);
379+
conf.storagePlugins.emplace_back(storagePlugin);
380380
} catch (PluginError& e) {
381-
delete storage_plugin;
382381
throw IPXPError(storage_name + std::string(": ") + e.what());
383382
} catch (PluginExit& e) {
384-
delete storage_plugin;
385383
return true;
386-
} catch (PluginManagerError& e) {
387-
throw IPXPError(storage_name + std::string(": ") + e.what());
384+
} catch (std::runtime_error& ex) {
385+
throw IPXPError(storage_name + std::string(": ") + ex.what());
388386
}
389-
*/
390387

391388
std::vector<ProcessPlugin*> storage_process_plugins;
392-
for (auto& it : *process_plugins) {
389+
for (auto& it : processPlugins) {
393390
ProcessPlugin* tmp = it.second->copy();
394-
storage_plugin->add_plugin(tmp);
391+
storagePlugin->add_plugin(tmp);
395392
conf.active.process.push_back(tmp);
396393
conf.active.all.push_back(tmp);
397394
storage_process_plugins.push_back(tmp);
@@ -404,18 +401,18 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
404401
conf.input_stats.push_back(input_stats);
405402

406403
WorkPipeline tmp
407-
= {{input_plugin,
404+
= {{inputPlugin,
408405
new std::thread(
409406
input_storage_worker,
410-
input_plugin,
411-
storage_plugin,
407+
inputPlugin,
408+
storagePlugin,
412409
conf.iqueue_size,
413410
conf.max_pkts,
414411
input_res,
415412
input_stats),
416413
input_res,
417414
input_stats},
418-
{storage_plugin, storage_process_plugins}};
415+
{storagePlugin, storage_process_plugins}};
419416
set_thread_details(
420417
tmp.input.thread->native_handle(),
421418
"in_" + std::to_string(pipeline_idx) + "_" + input_name,
@@ -435,7 +432,7 @@ void finish(ipxp_conf_t& conf)
435432
terminate_input = 1;
436433
for (auto& it : conf.pipelines) {
437434
it.input.thread->join();
438-
it.input.plugin->close();
435+
it.input.inputPlugin->close();
439436
}
440437

441438
// Terminate all storages
@@ -452,7 +449,7 @@ void finish(ipxp_conf_t& conf)
452449
}
453450

454451
for (auto& it : conf.pipelines) {
455-
it.storage.plugin->close();
452+
it.storage.storagePlugin->close();
456453
}
457454

458455
std::cout << "Input stats:" << std::endl
@@ -576,8 +573,8 @@ void main_loop(ipxp_conf_t& conf)
576573
}
577574

578575
struct pollfd pfds[2] = {
579-
{.fd = -1, .events = POLL_IN}, // Server
580-
{.fd = -1, .events = POLL_IN} // Client
576+
{.fd = -1, .events = POLL_IN, .revents = 0}, // Server
577+
{.fd = -1, .events = POLL_IN, .revents = 0} // Client
581578
};
582579

583580
std::string sock_path = create_sockpath(std::to_string(getpid()).c_str());
@@ -641,11 +638,13 @@ int run(int argc, char* argv[])
641638

642639
conf.pluginManager.loadPlugins("/usr/local/lib64/ipfixprobe/", loadPluginsRecursive);
643640

641+
printPlugins();
642+
644643
if (parser.m_help) {
645644
if (parser.m_help_str.empty()) {
646645
parser.usage(std::cout, 0, IPXP_APP_NAME);
647646
} else {
648-
print_help(conf, parser.m_help_str);
647+
print_help(parser.m_help_str);
649648
}
650649
goto EXIT;
651650
}

src/core/ipfixprobe.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ class IpfixprobeOptParser : public OptionsParser {
244244
"",
245245
"Run as a standalone process",
246246
[this](const char* arg) {
247+
(void) arg;
247248
m_daemon = true;
248249
return true;
249250
},
@@ -265,6 +266,7 @@ class IpfixprobeOptParser : public OptionsParser {
265266
"",
266267
"Show version and exit",
267268
[this](const char* arg) {
269+
(void) arg;
268270
m_version = true;
269271
return true;
270272
},
@@ -296,6 +298,10 @@ struct ipxp_conf_t {
296298
uint32_t fps;
297299
uint32_t max_pkts;
298300

301+
std::vector<std::shared_ptr<InputPlugin>> inputPlugins;
302+
std::vector<std::shared_ptr<StoragePlugin>> storagePlugins;
303+
std::shared_ptr<OutputPlugin> outputPlugin;
304+
299305
PluginManager pluginManager;
300306
struct Plugins {
301307
std::vector<InputPlugin*> input;
@@ -351,15 +357,10 @@ struct ipxp_conf_t {
351357
if (it.input.thread->joinable()) {
352358
it.input.thread->join();
353359
}
354-
delete it.input.plugin;
355360
delete it.input.thread;
356361
delete it.input.promise;
357362
}
358363

359-
for (auto& it : pipelines) {
360-
delete it.storage.plugin;
361-
}
362-
363364
for (auto& it : pipelines) {
364365
for (auto& itp : it.storage.plugins) {
365366
delete itp;
@@ -373,7 +374,6 @@ struct ipxp_conf_t {
373374
}
374375
delete it.thread;
375376
delete it.promise;
376-
delete it.plugin;
377377
ipx_ring_destroy(it.queue);
378378
}
379379

0 commit comments

Comments
 (0)