Skip to content

Commit 19ffa43

Browse files
author
Pavel Siska
committed
ipfixprobed: Introduce new version of init script
1 parent c1b1a88 commit 19ffa43

File tree

1 file changed

+326
-0
lines changed

1 file changed

+326
-0
lines changed

init/ipfixprobed.py

Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
import yaml
2+
import argparse
3+
import subprocess
4+
5+
def load_config(file_path):
6+
try:
7+
with open(file_path, 'r') as file:
8+
return yaml.safe_load(file)
9+
except FileNotFoundError:
10+
raise FileNotFoundError(f"Configuration file '{file_path}' not found.")
11+
except yaml.YAMLError as e:
12+
raise ValueError(f"Error parsing YAML file: {e}")
13+
14+
def parse_args():
15+
parser = argparse.ArgumentParser(description="Process input plugin parameters.")
16+
parser.add_argument("--config", required=True, help="Path to YAML configuration file")
17+
return parser.parse_args()
18+
19+
def process_input_plugin(config):
20+
input_plugin = config.get("input_plugin", {})
21+
22+
if not isinstance(input_plugin, dict):
23+
raise ValueError("Invalid input plugin configuration format.")
24+
25+
if len(input_plugin) != 1:
26+
raise ValueError("Exactly one input plugin must be specified in the configuration.")
27+
28+
plugin, settings = next(iter(input_plugin.items()))
29+
30+
if plugin == "dpdk":
31+
return process_dpdk_plugin(settings)
32+
if plugin == "raw":
33+
return process_raw_plugin(settings)
34+
35+
params = [f"--{plugin}"]
36+
for key, value in settings.items():
37+
if value is not None:
38+
params.append(f"--{plugin}-{key}={value}")
39+
40+
return " ".join(params)
41+
42+
def process_dpdk_plugin(settings):
43+
if "rx_queues" not in settings:
44+
raise ValueError("Missing required setting: rx_queues")
45+
rx_queues = settings.get("rx_queues")
46+
try:
47+
rx_queues = int(rx_queues)
48+
except ValueError:
49+
raise ValueError("rx_queues must be an integer")
50+
51+
if "allowed_nics" not in settings:
52+
raise ValueError("Missing required setting: allowed_nics")
53+
54+
allowed_nics = settings.get("allowed_nics")
55+
nic_list = allowed_nics.split(",")
56+
nic_count = len(nic_list)
57+
58+
params_list = [f"p={','.join(str(i) for i in range(nic_count))}"]
59+
60+
for key, param_flag in {"burst_size": "b", "mempool_size": "m", "mtu": "mtu"}.items():
61+
value = settings.get(key)
62+
if value is not None:
63+
params_list.append(f"{param_flag}={value}")
64+
65+
# Generating EAL parameters
66+
eal_params = [f"-a {nic}" for nic in nic_list]
67+
eal_opts = settings.get("eal_opts", "")
68+
69+
eal = " ".join(eal_params)
70+
if eal_opts:
71+
eal += f" {eal_opts}"
72+
73+
# Main parameter for DPDK with $eal_opts
74+
primary_param = f"-i \"dpdk;p={','.join(str(i) for i in range(nic_count))},"
75+
primary_param += f"m={settings.get('mempool_size', 8192)},eal={eal}\""
76+
77+
params = [primary_param] + [f"-i dpdk" for _ in range(rx_queues - 1)]
78+
79+
return " ".join(params)
80+
81+
82+
def process_raw_plugin(settings):
83+
interfaces = settings.get("interface")
84+
if not interfaces:
85+
raise ValueError("At least one interface must be specified in the raw plugin configuration.")
86+
87+
interfaces_list = interfaces.split(",")
88+
89+
blocks_count = settings.get("blocks_count")
90+
packets_in_block = settings.get("packets_in_block")
91+
92+
params = []
93+
for interface in interfaces_list:
94+
param = f"-i \"raw;ifc={interface}\""
95+
96+
# Add blocks_count and packets_in_block only if they have a value
97+
if blocks_count:
98+
param += f";blocks={blocks_count}"
99+
if packets_in_block:
100+
param += f";pkts={packets_in_block}"
101+
102+
params.append(param)
103+
104+
return " ".join(params)
105+
106+
107+
def process_process_plugins(config):
108+
process_plugins = config.get("process_plugins", [])
109+
110+
if not isinstance(process_plugins, list):
111+
raise ValueError("Invalid process plugins configuration format.")
112+
113+
params = []
114+
for plugin_config in process_plugins:
115+
if isinstance(plugin_config, dict):
116+
for plugin, settings in plugin_config.items():
117+
plugin_param_str = f'-p "{plugin}'
118+
119+
if isinstance(settings, dict):
120+
# Add plugin parameters if they exist
121+
plugin_params = [f"{key}={value}" for key, value in settings.items() if value is not None]
122+
if plugin_params:
123+
plugin_param_str += ";" + ";".join(plugin_params)
124+
125+
params.append(f'{plugin_param_str}"')
126+
else:
127+
# If there's no specific plugin parameters, just output the plugin
128+
params.append(f"-p {plugin_config}")
129+
130+
return " ".join(params)
131+
132+
def process_storage(config):
133+
storage = config.get("storage", {})
134+
if not isinstance(storage, dict):
135+
raise ValueError("Invalid storage configuration format.")
136+
137+
params = ['-s "cache'] # Start with "-s cache" for the storage section
138+
139+
# Cache settings
140+
cache = storage.get("cache", {})
141+
if isinstance(cache, dict):
142+
cache_params = []
143+
if "size_exponent" in cache:
144+
cache_params.append(f"s={cache['size_exponent']}")
145+
if "line_size_exponent" in cache:
146+
cache_params.append(f"l={cache['line_size_exponent']}")
147+
if cache_params:
148+
params.append(f"{';'.join(cache_params)}")
149+
150+
# Timeouts settings
151+
timeouts = storage.get("timeouts", {})
152+
if isinstance(timeouts, dict):
153+
timeout_params = []
154+
if "active" in timeouts:
155+
timeout_params.append(f"a={timeouts['active']}")
156+
if "inactive" in timeouts:
157+
timeout_params.append(f"i={timeouts['inactive']}")
158+
if timeout_params:
159+
params.append(f"{';'.join(timeout_params)}")
160+
161+
# Split biflow (flag if true)
162+
split_biflow = storage.get("split_biflow", None)
163+
if split_biflow:
164+
params.append("S")
165+
166+
# Fragmentation cache settings
167+
fragmentation_cache = storage.get("fragmentation_cache", {})
168+
if isinstance(fragmentation_cache, dict):
169+
if fragmentation_cache.get("enabled"):
170+
frag_cache_params = []
171+
if "enabled" in fragmentation_cache:
172+
frag_cache_params.append(f"fe=true")
173+
if "size" in fragmentation_cache:
174+
frag_cache_params.append(f"fs={fragmentation_cache['size']}")
175+
if "timeout" in fragmentation_cache:
176+
frag_cache_params.append(f"ft={fragmentation_cache['timeout']}")
177+
if frag_cache_params:
178+
params.append(f"{';'.join(frag_cache_params)}")
179+
180+
# Return the properly joined parameters with semicolons separating all values
181+
return f'{";".join(params)}"'
182+
183+
def process_output_plugin(config):
184+
output_plugin = config.get("output_plugin", {})
185+
if not isinstance(output_plugin, dict):
186+
raise ValueError("Invalid output plugin configuration format.")
187+
188+
if len(output_plugin) != 1:
189+
raise ValueError("Exactly one output plugin must be specified in the configuration.")
190+
191+
plugin, settings = next(iter(output_plugin.items()))
192+
193+
if plugin == "ipfix":
194+
return process_ipfix_plugin(settings)
195+
196+
raise ValueError(f"Unsupported output plugin: {plugin}")
197+
198+
def process_ipfix_plugin(settings):
199+
params = ['-o "ipfix']
200+
201+
# Main parameters
202+
host = settings.get("collector", {}).get("host")
203+
port = settings.get("collector", {}).get("port")
204+
mtu = settings.get("mtu")
205+
exporter_id = settings.get("exporter", {}).get("id")
206+
exporter_dir = settings.get("exporter", {}).get("dir")
207+
208+
if host is not None:
209+
params.append(f"host={host}")
210+
if port is not None:
211+
params.append(f"port={port}")
212+
if mtu is not None:
213+
params.append(f"mtu={mtu}")
214+
if exporter_id is not None:
215+
params.append(f"id={exporter_id}")
216+
if exporter_dir is not None:
217+
params.append(f"dir={exporter_dir}")
218+
219+
# Validate that only one protocol is specified
220+
protocol = settings.get("protocol", {})
221+
if "udp" in protocol and "tcp" in protocol:
222+
raise ValueError("Only one protocol (udp or tcp) can be specified, not both.")
223+
224+
# Process protocol
225+
is_tcp = False
226+
if "udp" in protocol:
227+
params.append("udp")
228+
template_refresh = protocol["udp"].get("template_refresh")
229+
if template_refresh is not None:
230+
params.append(f"template={template_refresh}")
231+
elif "tcp" in protocol:
232+
is_tcp = True
233+
non_blocking = protocol["tcp"].get("non_blocking")
234+
if non_blocking is not None:
235+
params.append("non-blocking-tcp")
236+
237+
# LZ4 compression (only valid with TCP)
238+
compression = settings.get("compression", {}).get("lz4", {})
239+
if compression.get("enabled"):
240+
if not is_tcp:
241+
raise ValueError("LZ4 compression can only be used with TCP.")
242+
params.append("lz4-compression")
243+
buffer_size = compression.get("buffer_size")
244+
if buffer_size is not None:
245+
params.append(f"lz4-buffer-size={buffer_size}")
246+
247+
return f'{";".join(params)}"'
248+
249+
def process_telemetry(config):
250+
telemetry = config.get("telemetry", {})
251+
if not isinstance(telemetry, dict):
252+
raise ValueError("Invalid telemetry configuration format.")
253+
254+
if "appfs" in telemetry:
255+
return process_appfs_telemetry(telemetry["appfs"])
256+
257+
return "" # No telemetry specified
258+
259+
def process_appfs_telemetry(settings):
260+
if not isinstance(settings, dict):
261+
raise ValueError("Invalid appfs telemetry configuration format.")
262+
263+
enabled = settings.get("enabled", False)
264+
if not enabled:
265+
return "" # Telemetry is disabled, return empty string
266+
267+
mount_point = settings.get("mount_point")
268+
if not mount_point:
269+
raise ValueError("Mount point must be specified when AppFS telemetry is enabled.")
270+
271+
return f'"--telemetry={mount_point}"'
272+
273+
def process_general(config):
274+
general = config.get("general", {})
275+
if not isinstance(general, dict):
276+
raise ValueError("Invalid general configuration format.")
277+
278+
params = []
279+
280+
# Queue sizes
281+
queues_size = general.get("queues_size", {})
282+
if isinstance(queues_size, dict):
283+
if "input" in queues_size:
284+
params.append(f'"--iqueue={queues_size["input"]}"')
285+
if "output" in queues_size:
286+
params.append(f'"--oqueue={queues_size["output"]}"')
287+
288+
# CPU mask
289+
cpu_mask = general.get("cpu_mask")
290+
if cpu_mask is not None:
291+
params.append(f'"--cpus={cpu_mask}"')
292+
293+
return " ".join(params)
294+
295+
296+
def main():
297+
try:
298+
args = parse_args()
299+
config = load_config(args.config)
300+
input_params = process_input_plugin(config)
301+
process_plugin_params = process_process_plugins(config)
302+
storage_params = process_storage(config)
303+
output_params = process_output_plugin(config)
304+
telemetry_params = process_telemetry(config)
305+
general_params = process_general(config)
306+
307+
# Output both input plugin and process plugin parameters
308+
print(input_params)
309+
print(process_plugin_params)
310+
print(storage_params)
311+
print(output_params)
312+
print(telemetry_params)
313+
print(general_params)
314+
315+
command = f"/usr/bin/ipfixprobe {input_params} {process_plugin_params} {storage_params} {output_params} {telemetry_params} {general_params}"
316+
print(f"Executing: {command}")
317+
318+
subprocess.run(command, shell=True, check=True)
319+
320+
except Exception as e:
321+
print(f"Error: {e}", file=sys.stderr)
322+
exit(1)
323+
324+
if __name__ == "__main__":
325+
import sys
326+
main()

0 commit comments

Comments
 (0)