@@ -68,6 +68,18 @@ def process_input_dpdk_plugin(settings):
6868 if eal_opts :
6969 eal += f" { eal_opts } "
7070
71+ workers_cpu_list = settings .get ("workers_cpu_list" )
72+ if workers_cpu_list :
73+ if isinstance (workers_cpu_list , str ):
74+ workers_cpu_list = [cpu .strip () for cpu in workers_cpu_list .split ("," )]
75+ elif not isinstance (workers_cpu_list , list ):
76+ raise ValueError ("workers_cpu_list must be a list or a comma-separated string" )
77+
78+ if len (workers_cpu_list ) != rx_queues :
79+ raise ValueError ("The number of CPUs in workers_cpu_list must match the number of RX queues" )
80+ else :
81+ workers_cpu_list = [None ] * rx_queues
82+
7183 # Main parameter for DPDK with $eal_opts
7284 primary_param = f"-i \" dpdk;p={ ',' .join (str (i ) for i in range (nic_count ))} ;"
7385 burst_size = settings .get ("burst_size" , 64 )
@@ -83,7 +95,19 @@ def process_input_dpdk_plugin(settings):
8395 primary_param += f"mtu={ mtu } ;"
8496 primary_param += f"eal={ eal } \" "
8597
86- params = [primary_param ] + [f"-i dpdk" for _ in range (rx_queues - 1 )]
98+ params = []
99+ first_cpu = workers_cpu_list [0 ]
100+ if first_cpu is not None :
101+ params .append (f"{ primary_param } @{ first_cpu } " )
102+ else :
103+ params .append (primary_param )
104+
105+ for i in range (1 , rx_queues ):
106+ cpu = workers_cpu_list [i ]
107+ if cpu is not None :
108+ params .append (f"-i dpdk@{ cpu } " )
109+ else :
110+ params .append (f"-i dpdk" )
87111
88112 return " " .join (params )
89113
@@ -285,11 +309,21 @@ def process_output_ipfix_plugin(settings):
285309 return f'{ ";" .join (params )} "'
286310
287311 # Main parameters
288- host = settings .get ("collector" , {}).get ("host" )
289- port = settings .get ("collector" , {}).get ("port" )
290- mtu = settings .get ("mtu" )
291- exporter_id = settings .get ("exporter" , {}).get ("id" )
292- exporter_dir = settings .get ("exporter" , {}).get ("dir" )
312+ collector = settings .get ("collector" )
313+ if collector is None :
314+ raise ValueError ("collector must be specified in the ipfix plugin configuration." )
315+
316+ host = collector .get ("host" )
317+ if host is None :
318+ raise ValueError ("host must be specified in the ipfix (collector) configuration. " )
319+
320+ port = collector .get ("port" )
321+ if port is None :
322+ raise ValueError ("port must be specified in the ipfix (collector) configuration. " )
323+
324+ mtu = settings .get ("mtu" , 1518 )
325+ exporter_id = settings .get ("exporter" , {}).get ("id" , 1 )
326+ exporter_dir = settings .get ("exporter" , {}).get ("dir" , 0 )
293327
294328 if host is not None :
295329 params .append (f"host={ host } " )
@@ -311,24 +345,36 @@ def process_output_ipfix_plugin(settings):
311345 is_tcp = False
312346 if "udp" in protocol :
313347 params .append ("udp" )
314- template_refresh = protocol ["udp" ].get ("template_refresh" )
315- if template_refresh is not None :
348+ udp = protocol .get ("udp" )
349+ if udp is not None :
350+ template_refresh = udp .get ("template_refresh" , 600 )
351+ if template_refresh is not None :
352+ params .append (f"template={ template_refresh } " )
353+ else :
354+ template_refresh = 600
316355 params .append (f"template={ template_refresh } " )
317356 elif "tcp" in protocol :
318357 is_tcp = True
319- non_blocking = protocol ["tcp" ].get ("non_blocking" )
320- if non_blocking is not None :
321- params .append ("non-blocking-tcp" )
358+ tcp = protocol .get ("tcp" )
359+ if tcp is not None :
360+ non_blocking = tcp .get ("non_blocking" , {})
361+ if non_blocking is not None :
362+ params .append ("non-blocking-tcp" )
363+ else :
364+ raise ValueError ("Invalid options for ipfix protocol. Must be either 'udp' or 'tcp'." )
322365
323366 # LZ4 compression (only valid with TCP)
324- compression = settings .get ("compression" , {}).get ("lz4" , {})
325- if compression .get ("enabled" ):
326- if not is_tcp :
327- raise ValueError ("LZ4 compression can only be used with TCP." )
328- params .append ("lz4-compression" )
329- buffer_size = compression .get ("buffer_size" )
330- if buffer_size is not None :
331- params .append (f"lz4-buffer-size={ buffer_size } " )
367+ compression = settings .get ("compression" , {})
368+ if compression is not None :
369+ lz4 = compression .get ("lz4" , {})
370+ if lz4 is not None :
371+ if lz4 .get ("enabled" ):
372+ if not is_tcp :
373+ raise ValueError ("LZ4 compression can only be used with TCP." )
374+ params .append ("lz4-compression" )
375+ buffer_size = lz4 .get ("buffer_size" )
376+ if buffer_size is not None :
377+ params .append (f"lz4-buffer-size={ buffer_size } " )
332378
333379 return f'{ ";" .join (params )} "'
334380
0 commit comments