diff --git a/Cargo.toml b/Cargo.toml index fcbc74d6..492a016f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,8 @@ members = [ "examples/basic", "examples/basic_file", "examples/log_ssh", - "examples/streaming", + "examples/streaming", + "examples/ip_subs", ] resolver = "2" diff --git a/examples/ip_subs/Cargo.toml b/examples/ip_subs/Cargo.toml new file mode 100644 index 00000000..aa13ff7b --- /dev/null +++ b/examples/ip_subs/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "ip_subs" +version = "0.1.0" +edition = "2024" + +[dependencies] +clap = { version = "3.2.23", features = ["derive"] } +env_logger = "0.8.4" +retina-core = { path = "../../core" } +retina-filtergen = { path = "../../filtergen" } +retina-datatypes = { path = "../../datatypes" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.96" +lazy_static = "1.4.0" +regex = "1.7.3" diff --git a/examples/ip_subs/src/main.rs b/examples/ip_subs/src/main.rs new file mode 100644 index 00000000..bb0b5cae --- /dev/null +++ b/examples/ip_subs/src/main.rs @@ -0,0 +1,49 @@ +use retina_core::{Runtime, config::load_config}; +use retina_datatypes::ConnRecord; +use retina_filtergen::subscription; + +use clap::Parser; +use lazy_static::lazy_static; +use std::fs::File; +use std::io::{BufWriter, Write}; +use std::path::PathBuf; +use std::sync::Mutex; + +lazy_static! { + static ref file: Mutex> = + Mutex::new(BufWriter::new(File::create("ip_subs.jsonl").unwrap())); +} + +#[derive(Parser, Debug)] +struct Args { + #[clap(short, long, parse(from_os_str), value_name = "FILE")] + config: PathBuf, + #[clap( + short, + long, + parse(from_os_str), + value_name = "FILE", + default_value = "ip_subs.jsonl" + )] + outfile: PathBuf, +} + +fn ip_cb(conn_record: &ConnRecord) { + if let Ok(serialized) = serde_json::to_string(&conn_record) { + let mut wtr = file.lock().unwrap(); + wtr.write_all(serialized.as_bytes()).unwrap(); + wtr.write_all(b"\n").unwrap(); + } +} + +#[subscription("./spec.toml")] +fn main() { + let args = Args::parse(); + let config = load_config(&args.config); + + let mut runtime: Runtime = Runtime::new(config, filter).unwrap(); + runtime.run(); + + let mut wtr = file.lock().unwrap(); + wtr.flush().unwrap(); +} diff --git a/tests/perf/README.md b/tests/perf/README.md new file mode 100644 index 00000000..ea391fcf --- /dev/null +++ b/tests/perf/README.md @@ -0,0 +1,31 @@ +# Performance Testing + +## Dependencies +To use bcc and eBPF, you will need to [install bcc](https://github.com/iovisor/bcc/blob/master/INSTALL.md). If using Ubuntu, we recommend following the instructions to [build from source](https://github.com/iovisor/bcc/blob/master/INSTALL.md#ubuntu---source). + +If you want to run the scripts in a Python virtual environment, you can run: +``` +python3 -m venv env +source env/bin/activate +python3 -m pip install -U matplotlib +pip install pandas +pip install hdrhistogram +pip install tomli-w +source env/bin/activate +``` + +## Number of Subscriptions vs. Function Latency +`generate_ip_subs.py` shards the IPv4 address space into `n` subnets to generate `n` Retina subscriptions, where `n` is passed in by the user. The subscriptions are written to `spec.toml`. + +`func_latency.py` uses bcc to profile function latency when running an application by attaching eBPF programs to uprobes at the entry and exit point of functions. Latency is measured in nanoseconds by default. The code for profiling function latency was based on the [example provided by bcc](https://github.com/iovisor/bcc/blob/master/tools/funclatency.py). + +`run_app.py` runs the `ip_subs` application and measures how the latency of a function changes as the number of subscriptions changes. It generates subscriptions using `generate_ip_subs.py`, then runs `ip_subs` with these subscriptions and measures latency using `func_latency.py`. The latencies are written to `stats/ip_subs_latency_stats.csv` and plots on the number of subscriptions vs. latency for different stats (e.g. average, 99th percentile) can be found in the `figs` directory. The `stats` and `figs` directory get created by the script if they don't already exist. + +When running `run_app.py`, you can specify which function to profile, the number of subscriptions, and the config file path. For example, to measure the latency of the `process_packet` function in online mode when the number of subscriptions is 64 and 256, you can run: +``` +sudo -E env PATH=$PATH LD_LIBRARY_PATH=$LD_LIBRARY_PATH python3 tests/perf/run_app.py -n 64,256 -f process_packet -c ./configs/online.toml +``` +Note that you must use `sudo` since bcc requires root privileges to attach eBPF programs to uprobes. + +### Rust Inlining +`func_latency.py` looks for a function in an application's binary to determine where to attach uprobes. The Rust compiler may inline function names, which can prevent the function from being found in the binary. You can add a `#[inline(never)]` tag to a function to prevent it from being inlined. diff --git a/tests/perf/func_latency.py b/tests/perf/func_latency.py new file mode 100644 index 00000000..7572f829 --- /dev/null +++ b/tests/perf/func_latency.py @@ -0,0 +1,206 @@ +# code for profiling function latency with bcc based on https://github.com/iovisor/bcc/blob/master/tools/funclatency.py + +import argparse +import subprocess +import sys +import os +from bcc import BPF +from hdrh.histogram import HdrHistogram +import ctypes +import csv + +CWD = os.getcwd() +LD_LIB_PATH = os.environ.get("LD_LIBRARY_PATH") + +PERF_DIR = f"{CWD}/tests/perf" +PERF_STATS_DIR = f"{PERF_DIR}/stats" + +STATS = ["func", "unit", "cnt", "avg", "min", "p05", "p25", "p50", "p75", "p95", "p99", "p999", "max", "std"] + +class Data(ctypes.Structure): + _fields_ = [ + ("pid", ctypes.c_uint32), + ("func_id", ctypes.c_uint64), + ("latency", ctypes.c_ulonglong), + ] + +def profile_latency(args): + setup_code = """ + #include + + struct key_t { + u64 pid; + u64 func_id; + }; + + struct data_t { + u32 pid; + u64 func_id; + u64 latency; + }; + + struct hist_key_t { + u64 func_id; + u64 slot; + }; + + BPF_HISTOGRAM(dist, struct hist_key_t); + BPF_HASH(start, struct key_t, u64); + BPF_PERF_OUTPUT(latencies); + """ + + entry_exit_code = """ + int trace_func_{id}_entry(struct pt_regs *ctx) + {{ + struct key_t key = {{}}; + key.pid = bpf_get_current_pid_tgid(); + key.func_id = {id}; + + u64 ts = bpf_ktime_get_ns(); + + start.update(&key, &ts); + return 0; + }} + + int trace_func_{id}_exit(struct pt_regs *ctx) + {{ + struct key_t key = {{}}; + key.pid = bpf_get_current_pid_tgid(); + key.func_id = {id}; + + u64 *tsp = start.lookup(&key); + if (tsp == 0) return 0; + + u64 delta = bpf_ktime_get_ns() - *tsp; + TIMING_UNIT + + struct data_t data = {{}}; + data.pid = key.pid; + data.func_id = key.func_id; + data.latency = delta; + latencies.perf_submit(ctx, &data, sizeof(data)); + + struct hist_key_t hkey = {{}}; + hkey.func_id = key.func_id; + hkey.slot = bpf_log2l(delta); + dist.increment(hkey); + + start.delete(&key); + return 0; + }} + """ + probing_code = "" + FUNC_ID_MAPPINGS = {} + for i, func in enumerate(args.function): + func_id = i + 1 + FUNC_ID_MAPPINGS[func_id] = func + probing_code += entry_exit_code.format(id=func_id) + bpf_program = setup_code + probing_code + + if args.microseconds: + bpf_program = bpf_program.replace('TIMING_UNIT', 'delta /= 1000;') + unit = "usecs" + else: + bpf_program = bpf_program.replace('TIMING_UNIT', '') + unit = "nsecs" + + funcs = [] + for func in args.function: + get_mangled_name_cmd = f"nm {args.binary} | grep {func} | awk '{{print $3}}'" + p1 = subprocess.run(get_mangled_name_cmd, shell=True, capture_output=True, text=True) + mangled_name = p1.stdout.strip() + + if not mangled_name: + print(f"{func} is never called.") + continue + + funcs.append(mangled_name) + + if not funcs: + return + + b = BPF(text=bpf_program) + + for i, func_mangled_name in enumerate(funcs): + try: + func_id = i + 1 + entry_func = f"trace_func_{func_id}_entry" + exit_func = f"trace_func_{func_id}_exit" + b.attach_uprobe(name=args.binary, sym=func_mangled_name, fn_name=entry_func, pid=-1) + b.attach_uretprobe(name=args.binary, sym=func_mangled_name, fn_name=exit_func, pid=-1) + except Exception as e: + print(f"Failed to attach uprobes: {e}") + + FUNCS_AND_HISTS = {} + + def handle_event(cpu, data, size): + event = ctypes.cast(data, ctypes.POINTER(Data)).contents + if event.func_id not in FUNCS_AND_HISTS: + if args.microseconds: + FUNCS_AND_HISTS[event.func_id] = HdrHistogram(1, 60 * 60 * 1000 * 1000, 3) + else: # nanoseconds + FUNCS_AND_HISTS[event.func_id] = HdrHistogram(1, 60 * 60 * 1000 * 1000 * 1000, 3) + FUNCS_AND_HISTS[event.func_id].record_value(event.latency) + + b["latencies"].open_perf_buffer(handle_event) + + cmd = [ + "sudo", + "env", f"LD_LIBRARY_PATH={LD_LIB_PATH}", + "RUST_LOG=error", args.binary, + "-c", args.config + ] + p2 = subprocess.Popen(cmd) + + try: + while p2.poll() is None: + b.perf_buffer_poll(timeout=1) + p2.terminate() + p2.wait() + except KeyboardInterrupt: + p2.kill() + + dump_stats(args.app, unit, FUNCS_AND_HISTS, FUNC_ID_MAPPINGS) + +def dump_stats(app, unit, funcs_and_hists, func_id_mappings): + os.makedirs(PERF_STATS_DIR, exist_ok=True) + csv_path = os.path.join(PERF_STATS_DIR, f"{app}_latency_hist.csv") + + with open(csv_path, mode='w', newline='') as f: + writer = csv.writer(f) + writer.writerow(STATS) + + for func_id, hist in funcs_and_hists.items(): + func_name = func_id_mappings[func_id] + row = [ + func_name, + unit, + hist.get_total_count(), + f"{hist.get_mean_value():.3f}", + hist.get_min_value(), + hist.get_value_at_percentile(5), + hist.get_value_at_percentile(25), + hist.get_value_at_percentile(50), + hist.get_value_at_percentile(75), + hist.get_value_at_percentile(95), + hist.get_value_at_percentile(99), + hist.get_value_at_percentile(99.9), + hist.get_max_value(), + f"{hist.get_stddev():.3f}" + ] + + writer.writerow(row) + +def comma_sep_list(value): + return value.split(',') + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("app") + parser.add_argument("-b", "--binary") + parser.add_argument("-c", "--config") + parser.add_argument("-f", "--function", type=comma_sep_list) + parser.add_argument("-u", "--microseconds", action="store_true", default=False) + args = parser.parse_args() + + profile_latency(args) \ No newline at end of file diff --git a/tests/perf/generate_ip_subs.py b/tests/perf/generate_ip_subs.py new file mode 100644 index 00000000..2e54aedb --- /dev/null +++ b/tests/perf/generate_ip_subs.py @@ -0,0 +1,49 @@ +import argparse +import ipaddress +import tomli_w + +class SubscriptionSpec: + def __init__(self, addr): + self.filter = f"ipv4.addr = {addr}" + self.datatypes = ["ConnRecord"] + self.callback = "ip_cb" + + def to_dict(self): + return { + "filter": self.filter, + "datatypes": self.datatypes, + "callback": self.callback, + } + +def shard_ipv4_addr_space(n): + root = ipaddress.IPv4Network("0.0.0.0/0") + + # divide 0.0.0.0/0 into n subnets + # number of subnets will be a power of 2 + return list(root.subnets(new_prefix=int((n - 1).bit_length()))) + +def generate_ip_subs(n): + subnets = shard_ipv4_addr_space(n) + n = len(subnets) + + toml_content = {} + toml_content["subscriptions"] = [] + for net in subnets: + subscription = SubscriptionSpec(net.with_prefixlen) + toml_content["subscriptions"].append(subscription.to_dict()) + + out_file = "spec.toml" + with open(out_file, "wb") as f: + tomli_w.dump(toml_content, f) + print(f"Generated {out_file} with {n} subscriptions") + +def comma_sep_list(value): + return value.split(',') + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-n", "--num_subs", type=comma_sep_list) + args = parser.parse_args() + + for num_subs in args.num_subs: + generate_ip_subs(int(num_subs)) \ No newline at end of file diff --git a/tests/perf/run_app.py b/tests/perf/run_app.py new file mode 100644 index 00000000..eb4d16d0 --- /dev/null +++ b/tests/perf/run_app.py @@ -0,0 +1,128 @@ +import argparse +import subprocess +import sys +import os +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +import pandas as pd +import csv +import math + +CWD = os.getcwd() +HOME = os.environ.get("HOME") +LD_LIB_PATH = os.environ.get("LD_LIBRARY_PATH") +PYTHON_EXEC = sys.executable + +PERF_DIR = f"{CWD}/tests/perf" +PERF_FIGS_DIR = f"{PERF_DIR}/figs" +PERF_STATS_DIR = f"{PERF_DIR}/stats" + +STATS = ["cnt", "avg", "p25", "p50", "p75", "p95", "p99"] + +def run_app(args): + os.makedirs(PERF_STATS_DIR, exist_ok=True) + STATS_CSV_PATH = os.path.join(PERF_STATS_DIR, "ip_subs_latency_stats.csv") + + if args.force_execute and os.path.isfile(STATS_CSV_PATH): + os.remove(STATS_CSV_PATH) + + for n in args.num_subs: + n = next_pow_of_2(int(n)) + + if not args.force_execute and already_profiled_sub_count(n, STATS_CSV_PATH): + continue + + print("Generating spec.toml...") + subprocess.run([PYTHON_EXEC, "./tests/perf/generate_ip_subs.py", "-n", f"{n}"], cwd=CWD) + + print("Deleting old ip_subs binaries...") + subprocess.run("rm -f ./target/release/deps/ip_subs-*", shell=True, cwd=CWD) + + print("Rebuilding ip_subs...") + subprocess.run(["cargo", "build", "--release", "--bin", "ip_subs"], cwd=CWD) + + cmd = [ + "sudo", "-E", "env", + f"LD_LIBRARY_PATH={LD_LIB_PATH}", + PYTHON_EXEC, + "./tests/perf/func_latency.py", + "ip_subs", + "-b", "./target/release/ip_subs", + "-c", args.config, + "-f", args.function, + ] + print("Running func_latency.py...") + subprocess.run(cmd, cwd=CWD) + + print("Reading ip_subs_latency_hist.csv...") + df = pd.read_csv(f"{PERF_STATS_DIR}/ip_subs_latency_hist.csv") + results = [df.loc[0, stat] for stat in STATS] + + print(f"Writing stats for {n} subscriptions to ip_subs_latency_stats.csv...") + write_stats(STATS_CSV_PATH, "ip_subs", args.function, "nsecs", n, results) + + print("Creating plots...") + create_plots(STATS_CSV_PATH, "ip_subs", args.function, "nsecs") + +def next_pow_of_2(n): + exp = math.ceil(math.log2(n)) + return 2 ** exp + +def already_profiled_sub_count(n, path): + if not os.path.isfile(path): + return False + df = pd.read_csv(path) + return df['num_subs'].isin([n]).any() + +def create_stats_csv(path): + with open(path, mode='a', newline='') as f: + writer = csv.writer(f) + headers = ["func", "unit", "num_subs"] + headers.extend(STATS) + writer.writerow(headers) + +def write_stats(path, app, func, unit, n, results): + if not os.path.isfile(path): + create_stats_csv(path) + + with open(path, mode='a', newline='') as f: + writer = csv.writer(f) + + row = [func, unit, n] + row.extend(results) + + writer.writerow(row) + +def create_plots(path, app, func, unit): + os.makedirs(PERF_FIGS_DIR, exist_ok=True) + + df = pd.read_csv(path) + df = df.sort_values(by='num_subs') + + x_vals = df['num_subs'].to_list() + + for stat in STATS: + if stat == 'cnt': + continue + y_vals = df[stat].to_list() + plt.plot(x_vals, y_vals, label=stat) + plt.xlabel('number of subscriptions') + plt.ylabel(f'latency ({unit})') + plt.title(f"app: {app}, function: {func}") + plt.legend() + plt.savefig(os.path.join(PERF_FIGS_DIR, f"{app}_{stat}.png"), dpi=300, bbox_inches='tight') + plt.clf() + +def comma_sep_list(value): + return value.split(',') + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-n", "--num_subs", type=comma_sep_list) + parser.add_argument("-c", "--config") + parser.add_argument("-f", "--function") + parser.add_argument("--force-execute", action="store_true", default=False) + args = parser.parse_args() + + run_app(args) \ No newline at end of file