2626 *
2727 */
2828
29- #include " buildConfig.hpp"
3029#include " ipfixprobe.hpp"
3130
31+ #include " buildConfig.hpp"
32+
3233#include < fstream>
3334#include < future>
3435#include < iomanip>
@@ -65,6 +66,7 @@ const uint32_t DEFAULT_FPS = 0; // unlimited
6566 */
6667void signal_handler (int sig)
6768{
69+ (void ) sig;
6870#ifdef WITH_LIBUNWIND
6971 if (sig == SIGSEGV) {
7072 st_dump (STDERR_FILENO, sig);
@@ -100,7 +102,31 @@ static void printPluginsUsage(const std::vector<PluginManifest>& pluginsManifest
100102 }
101103}
102104
103- void print_help (ipxp_conf_t & conf, const std::string& arg)
105+ void printRegisteredPlugins (
106+ const std::string& pluginType,
107+ const std::vector<PluginManifest>& pluginsManifest)
108+ {
109+ std::cout << " Registered " << pluginType << " plugins:" << std::endl;
110+ for (const auto & pluginManifest : pluginsManifest) {
111+ std::cout << " " << pluginManifest.name << std::endl;
112+ }
113+ std::cout << " #####################\n " ;
114+ }
115+
116+ void printPlugins ()
117+ {
118+ auto & inputPluginFactory = InputPluginFactory::getInstance ();
119+ auto & storagePluginFactory = StoragePluginFactory::getInstance ();
120+ auto & processPluginFactory = ProcessPluginFactory::getInstance ();
121+ auto & outputPluginFactory = OutputPluginFactory::getInstance ();
122+
123+ printRegisteredPlugins (" input" , inputPluginFactory.getRegisteredPlugins ());
124+ printRegisteredPlugins (" storage" , storagePluginFactory.getRegisteredPlugins ());
125+ printRegisteredPlugins (" process" , processPluginFactory.getRegisteredPlugins ());
126+ printRegisteredPlugins (" output" , outputPluginFactory.getRegisteredPlugins ());
127+ }
128+
129+ void print_help (const std::string& arg)
104130{
105131 if (arg == " input" ) {
106132 auto & inputPluginFactory = InputPluginFactory::getInstance ();
@@ -192,15 +218,7 @@ void set_thread_details(pthread_t thread, const std::string& name, const std::ve
192218
193219bool process_plugin_args (ipxp_conf_t & conf, IpfixprobeOptParser& parser)
194220{
195- auto deleter = [&](OutputPlugin::Plugins* p) {
196- for (auto & it : *p) {
197- delete it.second ;
198- }
199- delete p;
200- };
201- auto process_plugins = std::unique_ptr<OutputPlugin::Plugins, decltype (deleter)>(
202- new OutputPlugin::Plugins (),
203- deleter);
221+ OutputPlugin::ProcessPlugins processPlugins;
204222 std::string storage_name = " cache" ;
205223 std::string storage_params = " " ;
206224 std::string output_name = " ipfix" ;
@@ -226,7 +244,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
226244
227245 // Process
228246 for (auto & it : parser.m_process ) {
229- ProcessPlugin* process_plugin = nullptr ;
247+ std::shared_ptr< ProcessPlugin> processPlugin ;
230248 std::string process_params;
231249 std::string process_name;
232250 std::vector<int > affinity;
@@ -236,7 +254,7 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
236254 " cannot set CPU affinity for process plugin (process plugins are invoked inside "
237255 " input threads)" );
238256 }
239- for (auto & it : *process_plugins ) {
257+ for (auto & it : processPlugins ) {
240258 std::string plugin_name = it.first ;
241259 if (plugin_name == process_name) {
242260 throw IPXPError (process_name + " plugin was specified multiple times" );
@@ -245,25 +263,21 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
245263 if (process_name == BASIC_PLUGIN_NAME) {
246264 continue ;
247265 }
248- /*
266+
249267 try {
250- process_plugin = dynamic_cast<ProcessPlugin*>(conf.mgr.get(process_name));
251- if (process_plugin == nullptr) {
252- throw IPXPError("invalid processing plugin " + process_name);
268+ auto & processPluginFactory = ProcessPluginFactory::getInstance ();
269+ processPlugin = processPluginFactory.createShared (process_name, process_params);
270+ if (processPlugin == nullptr ) {
271+ throw IPXPError (" invalid process plugin " + process_name);
253272 }
254-
255- process_plugin->init(process_params.c_str());
256- process_plugins->push_back(std::make_pair(process_name, process_plugin));
273+ processPlugins.emplace_back (process_name, processPlugin);
257274 } catch (PluginError& e) {
258- delete process_plugin;
259275 throw IPXPError (process_name + std::string (" : " ) + e.what ());
260276 } catch (PluginExit& e) {
261- delete process_plugin;
262277 return true ;
263- } catch (PluginManagerError& e ) {
264- throw IPXPError(process_name + std::string(": ") + e .what());
278+ } catch (std::runtime_error& ex ) {
279+ throw IPXPError (process_name + std::string (" : " ) + ex .what ());
265280 }
266- */
267281 }
268282
269283 // telemetry
@@ -281,40 +295,32 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
281295 auto statsFile = ipxRingTelemetryDir->addFile (" stats" , statsOps);
282296 conf.holder .add (statsFile);
283297
284- OutputPlugin* output_plugin = nullptr ;
285- /*
298+ std::shared_ptr< OutputPlugin> outputPlugin ;
299+
286300 try {
287- output_plugin = dynamic_cast<OutputPlugin*>(conf.mgr.get(output_name) );
288- if (output_plugin == nullptr) {
289- ipx_ring_destroy(output_queue);
301+ auto & outputPluginFactory = OutputPluginFactory::getInstance ( );
302+ outputPlugin = outputPluginFactory. createShared (output_name, output_params, processPlugins);
303+ if (outputPlugin == nullptr ) {
290304 throw IPXPError (" invalid output plugin " + output_name);
291305 }
292-
293- output_plugin->init(output_params.c_str(), *process_plugins);
294- conf.active.output.push_back(output_plugin);
295- conf.active.all.push_back(output_plugin);
306+ conf.outputPlugin = outputPlugin;
296307 } catch (PluginError& e) {
297- ipx_ring_destroy(output_queue);
298- delete output_plugin;
299308 throw IPXPError (output_name + std::string (" : " ) + e.what ());
300309 } catch (PluginExit& e) {
301- ipx_ring_destroy(output_queue);
302- delete output_plugin;
303310 return true ;
304- } catch (PluginManagerError& e ) {
305- throw IPXPError(output_name + std::string(": ") + e .what());
311+ } catch (std::runtime_error& ex ) {
312+ throw IPXPError (output_name + std::string (" : " ) + ex .what ());
306313 }
307- */
308314
309315 {
310316 std::promise<WorkerResult>* output_res = new std::promise<WorkerResult>();
311317 auto output_stats = new std::atomic<OutputStats>();
312318 conf.output_stats .push_back (output_stats);
313319 OutputWorker tmp
314- = {output_plugin ,
320+ = {outputPlugin ,
315321 new std::thread (
316322 output_worker,
317- output_plugin ,
323+ outputPlugin ,
318324 output_queue,
319325 output_res,
320326 output_stats,
@@ -336,8 +342,8 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
336342 auto flowcache_dir = conf.telemetry_root_node ->addDir (" flowcache" );
337343 size_t pipeline_idx = 0 ;
338344 for (auto & it : parser.m_input ) {
339- InputPlugin* input_plugin = nullptr ;
340- StoragePlugin* storage_plugin = nullptr ;
345+ std::shared_ptr< InputPlugin> inputPlugin ;
346+ std::shared_ptr< StoragePlugin> storagePlugin ;
341347 std::string input_params;
342348 std::string input_name;
343349 std::vector<int > affinity;
@@ -347,51 +353,43 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
347353 auto pipeline_queue_dir
348354 = pipeline_dir->addDir (" queues" )->addDir (std::to_string (pipeline_idx));
349355
350- /*
351356 try {
352- input_plugin = dynamic_cast<InputPlugin*>(conf.mgr.get(input_name));
353- if (input_plugin == nullptr) {
357+ auto & inputPluginFactory = InputPluginFactory::getInstance ();
358+ inputPlugin = inputPluginFactory.createShared (input_name, input_params);
359+ if (inputPlugin == nullptr ) {
354360 throw IPXPError (" invalid input plugin " + input_name);
355361 }
356- input_plugin->init(input_params.c_str());
357- input_plugin->set_telemetry_dirs(input_plugin_dir, pipeline_queue_dir);
358- conf.active.input.push_back(input_plugin);
359- conf.active.all.push_back(input_plugin);
362+ inputPlugin->set_telemetry_dirs (input_plugin_dir, pipeline_queue_dir);
363+ conf.inputPlugins .emplace_back (inputPlugin);
360364 } catch (PluginError& e) {
361- delete input_plugin;
362365 throw IPXPError (input_name + std::string (" : " ) + e.what ());
363366 } catch (PluginExit& e) {
364- delete input_plugin;
365367 return true ;
366- } catch (PluginManagerError& e ) {
367- throw IPXPError(input_name + std::string(": ") + e .what());
368+ } catch (std::runtime_error& ex ) {
369+ throw IPXPError (input_name + std::string (" : " ) + ex .what ());
368370 }
369371
370372 try {
371- storage_plugin = dynamic_cast<StoragePlugin*>(conf.mgr.get(storage_name));
372- if (storage_plugin == nullptr) {
373+ auto & storagePluginFactory = StoragePluginFactory::getInstance ();
374+ storagePlugin
375+ = storagePluginFactory.createShared (storage_name, storage_params, output_queue);
376+ if (storagePlugin == nullptr ) {
373377 throw IPXPError (" invalid storage plugin " + storage_name);
374378 }
375- storage_plugin->set_queue(output_queue);
376- storage_plugin->init(storage_params.c_str());
377- storage_plugin->set_telemetry_dir(pipeline_queue_dir);
378- conf.active.storage.push_back(storage_plugin);
379- conf.active.all.push_back(storage_plugin);
379+ storagePlugin->set_telemetry_dir (pipeline_queue_dir);
380+ conf.storagePlugins .emplace_back (storagePlugin);
380381 } catch (PluginError& e) {
381- delete storage_plugin;
382382 throw IPXPError (storage_name + std::string (" : " ) + e.what ());
383383 } catch (PluginExit& e) {
384- delete storage_plugin;
385384 return true ;
386- } catch (PluginManagerError& e ) {
387- throw IPXPError(storage_name + std::string(": ") + e .what());
385+ } catch (std::runtime_error& ex ) {
386+ throw IPXPError (storage_name + std::string (" : " ) + ex .what ());
388387 }
389- */
390388
391389 std::vector<ProcessPlugin*> storage_process_plugins;
392- for (auto & it : *process_plugins ) {
390+ for (auto & it : processPlugins ) {
393391 ProcessPlugin* tmp = it.second ->copy ();
394- storage_plugin ->add_plugin (tmp);
392+ storagePlugin ->add_plugin (tmp);
395393 conf.active .process .push_back (tmp);
396394 conf.active .all .push_back (tmp);
397395 storage_process_plugins.push_back (tmp);
@@ -404,18 +402,18 @@ bool process_plugin_args(ipxp_conf_t& conf, IpfixprobeOptParser& parser)
404402 conf.input_stats .push_back (input_stats);
405403
406404 WorkPipeline tmp
407- = {{input_plugin ,
405+ = {{inputPlugin ,
408406 new std::thread (
409407 input_storage_worker,
410- input_plugin ,
411- storage_plugin ,
408+ inputPlugin ,
409+ storagePlugin ,
412410 conf.iqueue_size ,
413411 conf.max_pkts ,
414412 input_res,
415413 input_stats),
416414 input_res,
417415 input_stats},
418- {storage_plugin , storage_process_plugins}};
416+ {storagePlugin , storage_process_plugins}};
419417 set_thread_details (
420418 tmp.input .thread ->native_handle (),
421419 " in_" + std::to_string (pipeline_idx) + " _" + input_name,
@@ -435,7 +433,7 @@ void finish(ipxp_conf_t& conf)
435433 terminate_input = 1 ;
436434 for (auto & it : conf.pipelines ) {
437435 it.input .thread ->join ();
438- it.input .plugin ->close ();
436+ it.input .inputPlugin ->close ();
439437 }
440438
441439 // Terminate all storages
@@ -452,7 +450,7 @@ void finish(ipxp_conf_t& conf)
452450 }
453451
454452 for (auto & it : conf.pipelines ) {
455- it.storage .plugin ->close ();
453+ it.storage .storagePlugin ->close ();
456454 }
457455
458456 std::cout << " Input stats:" << std::endl
@@ -576,8 +574,8 @@ void main_loop(ipxp_conf_t& conf)
576574 }
577575
578576 struct pollfd pfds[2 ] = {
579- {.fd = -1 , .events = POLL_IN}, // Server
580- {.fd = -1 , .events = POLL_IN} // Client
577+ {.fd = -1 , .events = POLL_IN, . revents = 0 }, // Server
578+ {.fd = -1 , .events = POLL_IN, . revents = 0 } // Client
581579 };
582580
583581 std::string sock_path = create_sockpath (std::to_string (getpid ()).c_str ());
@@ -641,11 +639,13 @@ int run(int argc, char* argv[])
641639
642640 conf.pluginManager .loadPlugins (" /usr/local/lib64/ipfixprobe/" , loadPluginsRecursive);
643641
642+ printPlugins ();
643+
644644 if (parser.m_help ) {
645645 if (parser.m_help_str .empty ()) {
646646 parser.usage (std::cout, 0 , IPXP_APP_NAME);
647647 } else {
648- print_help (conf, parser.m_help_str );
648+ print_help (parser.m_help_str );
649649 }
650650 goto EXIT;
651651 }
0 commit comments