Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
64a9bf3
set up scripts for generating num subs vs. runtime plots
diana-qing May 16, 2025
bbe39d5
use Python's ipaddress library to shard IPv4 addr space
diana-qing May 21, 2025
22867a9
add process_packet run results
diana-qing May 21, 2025
43a394a
update scripts
diana-qing May 21, 2025
35be0b3
clean and rebuild before each run
diana-qing May 21, 2025
1f9540e
use ld lib path env var. force rebuild binary on each run in run_app
diana-qing May 27, 2025
8d1224b
add signal to profile func latency and terminate after 10 sec
diana-qing May 28, 2025
e63d328
code cleanup
diana-qing May 28, 2025
650ce41
edit path
diana-qing May 28, 2025
49cd708
log conn records to file
diana-qing May 29, 2025
de07253
remove signaling code
diana-qing May 29, 2025
d9d7eca
rename app to ip_subs, script generate_subs.py to generate_ip_subs.py
diana-qing May 29, 2025
60af5f7
update num subs
diana-qing May 29, 2025
ca7c739
remove spec.toml
diana-qing May 29, 2025
ff3b447
remove benchmark_app
diana-qing May 29, 2025
2dba884
code cleanup
diana-qing May 29, 2025
da68a4b
delete figs
diana-qing May 29, 2025
eee480e
write stats to csv
diana-qing Jun 1, 2025
86e6da6
code cleanup
diana-qing Jun 1, 2025
3988168
more code cleanup
diana-qing Jun 1, 2025
c139245
code cleanup x3
diana-qing Jun 1, 2025
b422b44
use abs path to python interpreter
diana-qing Jun 1, 2025
82352bd
code cleanup
diana-qing Jun 1, 2025
e86c03c
get latency per pid
diana-qing Jun 1, 2025
12df91a
add force-execute arg. if force-execute is false, profile if haven't …
diana-qing Jun 12, 2025
22cba67
code cleanup
diana-qing Jun 12, 2025
e06ffa7
code cleanup x2
diana-qing Jun 12, 2025
512151d
code cleanup x3
diana-qing Jun 12, 2025
5e29652
delete comment
diana-qing Jun 12, 2025
33ab2b3
add documentation
diana-qing Jun 12, 2025
ad709a1
update docs
diana-qing Jun 12, 2025
0ef64a0
tests/perf/README.md
diana-qing Jun 12, 2025
3f573fb
fix wording
diana-qing Jun 12, 2025
b3a7a02
fmt and clippy
diana-qing Jun 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ members = [
"examples/basic",
"examples/basic_file",
"examples/log_ssh",
"examples/streaming",
"examples/streaming",
"examples/ip_subs",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this into the tests/perf folder?

]
resolver = "2"

Expand Down
15 changes: 15 additions & 0 deletions examples/ip_subs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "ip_subs"
version = "0.1.0"
edition = "2024"

[dependencies]
clap = { version = "3.2.23", features = ["derive"] }
env_logger = "0.8.4"
retina-core = { path = "../../core" }
retina-filtergen = { path = "../../filtergen" }
retina-datatypes = { path = "../../datatypes" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.96"
lazy_static = "1.4.0"
regex = "1.7.3"
49 changes: 49 additions & 0 deletions examples/ip_subs/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use retina_core::{Runtime, config::load_config};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a README for this example.

use retina_datatypes::ConnRecord;
use retina_filtergen::subscription;

use clap::Parser;
use lazy_static::lazy_static;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::Mutex;

lazy_static! {
static ref file: Mutex<BufWriter<File>> =
Mutex::new(BufWriter::new(File::create("ip_subs.jsonl").unwrap()));
}

#[derive(Parser, Debug)]
struct Args {
#[clap(short, long, parse(from_os_str), value_name = "FILE")]
config: PathBuf,
#[clap(
short,
long,
parse(from_os_str),
value_name = "FILE",
default_value = "ip_subs.jsonl"
)]
outfile: PathBuf,
}

fn ip_cb(conn_record: &ConnRecord) {
if let Ok(serialized) = serde_json::to_string(&conn_record) {
let mut wtr = file.lock().unwrap();
wtr.write_all(serialized.as_bytes()).unwrap();
wtr.write_all(b"\n").unwrap();
}
}

#[subscription("./spec.toml")]
fn main() {
let args = Args::parse();
let config = load_config(&args.config);

let mut runtime: Runtime<SubscribedWrapper> = Runtime::new(config, filter).unwrap();
runtime.run();

let mut wtr = file.lock().unwrap();
wtr.flush().unwrap();
}
31 changes: 31 additions & 0 deletions tests/perf/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Performance Testing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an intro with the high-level motivation for this and what it does! What you shared at the EOQ lab meeting was great.

Copy link
Collaborator

@thearossman thearossman Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also mention the initial testing you did to ensure that this approach is accurate!

Here's my best understanding of what you found:

  • You compared results with Retina's current timing infrastructure, which inlines cycle counts. You found that the uprobes add a constant overhead. That is, this will accurately surface patterns for the use-case of comparing function latency across different implementations or applications.
  • You can't run this at super high throughputs. IIRC, we were able to handle ~5Gbps of live traffic (unless you got more on the passive box). This gives plenty of data points for saying something about function latency.
  • You confirmed this separates entry/exit points by thread, so it'll be accurate even if there are multiple cores. (IMO this was a bit unclear in the documentation.)


## Dependencies
To use bcc and eBPF, you will need to [install bcc](https://github.com/iovisor/bcc/blob/master/INSTALL.md). If using Ubuntu, we recommend following the instructions to [build from source](https://github.com/iovisor/bcc/blob/master/INSTALL.md#ubuntu---source).

If you want to run the scripts in a Python virtual environment, you can run:
```
python3 -m venv env
source env/bin/activate
python3 -m pip install -U matplotlib
pip install pandas
pip install hdrhistogram
pip install tomli-w
source env/bin/activate
```

## Number of Subscriptions vs. Function Latency
`generate_ip_subs.py` shards the IPv4 address space into `n` subnets to generate `n` Retina subscriptions, where `n` is passed in by the user. The subscriptions are written to `spec.toml`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe clarify that this is a sample / basic application and more can easily be added. The main goal of your project was to set up the infrastructure.


`func_latency.py` uses bcc to profile function latency when running an application by attaching eBPF programs to uprobes at the entry and exit point of functions. Latency is measured in nanoseconds by default. The code for profiling function latency was based on the [example provided by bcc](https://github.com/iovisor/bcc/blob/master/tools/funclatency.py).

`run_app.py` runs the `ip_subs` application and measures how the latency of a function changes as the number of subscriptions changes. It generates subscriptions using `generate_ip_subs.py`, then runs `ip_subs` with these subscriptions and measures latency using `func_latency.py`. The latencies are written to `stats/ip_subs_latency_stats.csv` and plots on the number of subscriptions vs. latency for different stats (e.g. average, 99th percentile) can be found in the `figs` directory. The `stats` and `figs` directory get created by the script if they don't already exist.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be run from a specific directory within the Retina repo?


When running `run_app.py`, you can specify which function to profile, the number of subscriptions, and the config file path. For example, to measure the latency of the `process_packet` function in online mode when the number of subscriptions is 64 and 256, you can run:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably mention that you can profile multiple functions, but because it just records entry/exit timestamps, keep in mind that profiling functions that overlap will cause interference. (You observed this!)

```
sudo -E env PATH=$PATH LD_LIBRARY_PATH=$LD_LIBRARY_PATH python3 tests/perf/run_app.py -n 64,256 -f process_packet -c ./configs/online.toml
```
Note that you must use `sudo` since bcc requires root privileges to attach eBPF programs to uprobes.

### Rust Inlining
`func_latency.py` looks for a function in an application's binary to determine where to attach uprobes. The Rust compiler may inline function names, which can prevent the function from being found in the binary. You can add a `#[inline(never)]` tag to a function to prevent it from being inlined.
206 changes: 206 additions & 0 deletions tests/perf/func_latency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# code for profiling function latency with bcc based on https://github.com/iovisor/bcc/blob/master/tools/funclatency.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had talked a bit about managing output a couple of weeks ago in online mode by consuming the subprocess output and filtering it before printing:

  • Making it so that the "samples lost" alert isn't printed
  • Consuming the output and printing the updates on Gbps processed, packets lost, etc.

Did you try this and run into challenges? (I think this is not critical for accuracy, but it is extremely helpful for usability if it's reasonably easy to do.)


import argparse
import subprocess
import sys
import os
from bcc import BPF
from hdrh.histogram import HdrHistogram
import ctypes
import csv

CWD = os.getcwd()
LD_LIB_PATH = os.environ.get("LD_LIBRARY_PATH")

PERF_DIR = f"{CWD}/tests/perf"
PERF_STATS_DIR = f"{PERF_DIR}/stats"

STATS = ["func", "unit", "cnt", "avg", "min", "p05", "p25", "p50", "p75", "p95", "p99", "p999", "max", "std"]

class Data(ctypes.Structure):
_fields_ = [
("pid", ctypes.c_uint32),
("func_id", ctypes.c_uint64),
("latency", ctypes.c_ulonglong),
]

def profile_latency(args):
setup_code = """
#include <uapi/linux/ptrace.h>

struct key_t {
u64 pid;
u64 func_id;
};

struct data_t {
u32 pid;
u64 func_id;
u64 latency;
};

struct hist_key_t {
u64 func_id;
u64 slot;
};

BPF_HISTOGRAM(dist, struct hist_key_t);
BPF_HASH(start, struct key_t, u64);
BPF_PERF_OUTPUT(latencies);
"""

entry_exit_code = """
int trace_func_{id}_entry(struct pt_regs *ctx)
{{
struct key_t key = {{}};
key.pid = bpf_get_current_pid_tgid();
key.func_id = {id};

u64 ts = bpf_ktime_get_ns();

start.update(&key, &ts);
return 0;
}}

int trace_func_{id}_exit(struct pt_regs *ctx)
{{
struct key_t key = {{}};
key.pid = bpf_get_current_pid_tgid();
key.func_id = {id};

u64 *tsp = start.lookup(&key);
if (tsp == 0) return 0;

u64 delta = bpf_ktime_get_ns() - *tsp;
TIMING_UNIT

struct data_t data = {{}};
data.pid = key.pid;
data.func_id = key.func_id;
data.latency = delta;
latencies.perf_submit(ctx, &data, sizeof(data));

struct hist_key_t hkey = {{}};
hkey.func_id = key.func_id;
hkey.slot = bpf_log2l(delta);
dist.increment(hkey);

start.delete(&key);
return 0;
}}
"""
probing_code = ""
FUNC_ID_MAPPINGS = {}
for i, func in enumerate(args.function):
func_id = i + 1
FUNC_ID_MAPPINGS[func_id] = func
probing_code += entry_exit_code.format(id=func_id)
bpf_program = setup_code + probing_code

if args.microseconds:
bpf_program = bpf_program.replace('TIMING_UNIT', 'delta /= 1000;')
unit = "usecs"
else:
bpf_program = bpf_program.replace('TIMING_UNIT', '')
unit = "nsecs"

funcs = []
for func in args.function:
get_mangled_name_cmd = f"nm {args.binary} | grep {func} | awk '{{print $3}}'"
p1 = subprocess.run(get_mangled_name_cmd, shell=True, capture_output=True, text=True)
mangled_name = p1.stdout.strip()

if not mangled_name:
print(f"{func} is never called.")
continue

funcs.append(mangled_name)

if not funcs:
return

b = BPF(text=bpf_program)

for i, func_mangled_name in enumerate(funcs):
try:
func_id = i + 1
entry_func = f"trace_func_{func_id}_entry"
exit_func = f"trace_func_{func_id}_exit"
b.attach_uprobe(name=args.binary, sym=func_mangled_name, fn_name=entry_func, pid=-1)
b.attach_uretprobe(name=args.binary, sym=func_mangled_name, fn_name=exit_func, pid=-1)
except Exception as e:
print(f"Failed to attach uprobes: {e}")

FUNCS_AND_HISTS = {}

def handle_event(cpu, data, size):
event = ctypes.cast(data, ctypes.POINTER(Data)).contents
if event.func_id not in FUNCS_AND_HISTS:
if args.microseconds:
FUNCS_AND_HISTS[event.func_id] = HdrHistogram(1, 60 * 60 * 1000 * 1000, 3)
else: # nanoseconds
FUNCS_AND_HISTS[event.func_id] = HdrHistogram(1, 60 * 60 * 1000 * 1000 * 1000, 3)
FUNCS_AND_HISTS[event.func_id].record_value(event.latency)

b["latencies"].open_perf_buffer(handle_event)

cmd = [
"sudo",
"env", f"LD_LIBRARY_PATH={LD_LIB_PATH}",
"RUST_LOG=error", args.binary,
"-c", args.config
]
p2 = subprocess.Popen(cmd)

try:
while p2.poll() is None:
b.perf_buffer_poll(timeout=1)
p2.terminate()
p2.wait()
except KeyboardInterrupt:
p2.kill()

dump_stats(args.app, unit, FUNCS_AND_HISTS, FUNC_ID_MAPPINGS)

def dump_stats(app, unit, funcs_and_hists, func_id_mappings):
os.makedirs(PERF_STATS_DIR, exist_ok=True)
csv_path = os.path.join(PERF_STATS_DIR, f"{app}_latency_hist.csv")

with open(csv_path, mode='w', newline='') as f:
writer = csv.writer(f)
writer.writerow(STATS)

for func_id, hist in funcs_and_hists.items():
func_name = func_id_mappings[func_id]
row = [
func_name,
unit,
hist.get_total_count(),
f"{hist.get_mean_value():.3f}",
hist.get_min_value(),
hist.get_value_at_percentile(5),
hist.get_value_at_percentile(25),
hist.get_value_at_percentile(50),
hist.get_value_at_percentile(75),
hist.get_value_at_percentile(95),
hist.get_value_at_percentile(99),
hist.get_value_at_percentile(99.9),
hist.get_max_value(),
f"{hist.get_stddev():.3f}"
]

writer.writerow(row)

def comma_sep_list(value):
return value.split(',')

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("app")
parser.add_argument("-b", "--binary")
parser.add_argument("-c", "--config")
parser.add_argument("-f", "--function", type=comma_sep_list)
parser.add_argument("-u", "--microseconds", action="store_true", default=False)
args = parser.parse_args()

profile_latency(args)
49 changes: 49 additions & 0 deletions tests/perf/generate_ip_subs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import argparse
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File header comment

import ipaddress
import tomli_w

class SubscriptionSpec:
def __init__(self, addr):
self.filter = f"ipv4.addr = {addr}"
self.datatypes = ["ConnRecord"]
self.callback = "ip_cb"

def to_dict(self):
return {
"filter": self.filter,
"datatypes": self.datatypes,
"callback": self.callback,
}

def shard_ipv4_addr_space(n):
root = ipaddress.IPv4Network("0.0.0.0/0")

# divide 0.0.0.0/0 into n subnets
# number of subnets will be a power of 2
return list(root.subnets(new_prefix=int((n - 1).bit_length())))

def generate_ip_subs(n):
subnets = shard_ipv4_addr_space(n)
n = len(subnets)

toml_content = {}
toml_content["subscriptions"] = []
for net in subnets:
subscription = SubscriptionSpec(net.with_prefixlen)
toml_content["subscriptions"].append(subscription.to_dict())

out_file = "spec.toml"
with open(out_file, "wb") as f:
tomli_w.dump(toml_content, f)
print(f"Generated {out_file} with {n} subscriptions")

def comma_sep_list(value):
return value.split(',')

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--num_subs", type=comma_sep_list)
args = parser.parse_args()

for num_subs in args.num_subs:
generate_ip_subs(int(num_subs))
Loading
Loading