Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ NODES=1
GOOGLE_PROJECT=llnl-flux
INSTANCE=h3-standard-88
time gcloud container clusters create test-cluster --threads-per-core=1 --num-nodes=$NODES --machine-type=$INSTANCE --placement-type=COMPACT --image-type=UBUNTU_CONTAINERD --region=us-central1-a --project=${GOOGLE_PROJECT}

# When time to delete
gcloud container clusters delete test-cluster --region=us-central1-a
```

Finally, install the Flux Operator
Expand Down Expand Up @@ -579,22 +582,13 @@ Here is how to see futex wait times.

```bash
helm install \
--set experiment.monitor_program=futex \
--set experiment.monitor_program=futex-model \
--set experiment.monitor=true \
--set minicluster.save_logs=true \
lammps ./lammps-reax
```

This likely needs to be consolidated (it's a lot of data). Here is an example.

```bash
{"event_type": "FUTEX_WAIT_END", "timestamp_sec": 3891.635076022, "tgid": 34931, "tid": 35146, "comm": "containerd-shim", "cgroup_id": 6520, "futex_op_full": 128, "futex_op_str": "FUTEX_WAIT_PRIVATE", "wait_duration_ns": 21462, "wait_duration_human": "21.46us"}
{"event_type": "FUTEX_WAIT_END", "timestamp_sec": 3891.635086712, "tgid": 34931, "tid": 35388, "comm": "containerd-shim", "cgroup_id": 6520, "futex_op_full": 128, "futex_op_str": "FUTEX_WAIT_PRIVATE", "wait_duration_ns": 42892, "wait_duration_human": "42.89us"}
{"event_type": "FUTEX_WAIT_END", "timestamp_sec": 3891.635102633, "tgid": 3600, "tid": 3602, "comm": "containerd", "cgroup_id": 6520, "futex_op_full": 128, "futex_op_str": "FUTEX_WAIT_PRIVATE", "wait_duration_ns": 16693, "wait_duration_human": "16.69us"}
{"event_type": "FUTEX_WAIT_END", "timestamp_sec": 3891.635107428, "tgid": 3600, "tid": 6823, "comm": "containerd", "cgroup_id": 6520, "futex_op_full": 128, "futex_op_str": "FUTEX_WAIT_PRIVATE", "wait_duration_ns": 34781, "wait_duration_human": "34.78us"}
```

Finally, here is tcp
The `futex-model` is going to print river models grouped by tgid and comm. Here is tcp

```bash
helm install \
Expand All @@ -613,6 +607,27 @@ Here is example data:
{"event_type": "RECV", "timestamp_sec": 3978.855737251, "tgid": 36693, "tid": 36693, "comm": "lmp", "cgroup_id":
```

Try shared memory:

```bash
helm install \
--set experiment.monitor_program=shmem \
--set experiment.monitor=true \
--set minicluster.save_logs=true \
lammps ./lammps-reax
```

And cpu (using river ML for summary models):

```bash
helm install \
--set experiment.monitor_program=cpu-models \
--set experiment.monitor=true \
--set minicluster.save_logs=true \
lammps ./lammps-reax
```


Also try changing the command entirely.

```bash
Expand Down
2 changes: 1 addition & 1 deletion base-template/docker/bcc-sidecar/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ RUN git clone -b v0.34.0 https://github.com/iovisor/bcc /usr/src/bcc && \

# Set up environment for Python BCC tools (if needed)
ENV PYTHONPATH=/usr/lib/python3/dist-packages
RUN python3 -m pip install river --break-system-packages
COPY ./programs /opt/programs

# Command to keep the container running
CMD ["tail", "-f", "/dev/null"]
104 changes: 104 additions & 0 deletions base-template/docker/bcc-sidecar/programs/bcchelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Helper functions shared across programs

import argparse
import os
import sys
import time


def log(message, prefix="", exit_flag=False):
ts = time.strftime("%H:%M:%S", time.localtime())
if prefix:
prefix = f"{prefix} "
print(f"[{ts}] {prefix}{message}", file=sys.stderr)
if exit_flag:
sys.exit(1)


def read_file(filepath):
"""
Read a text file
"""
with open(filepath, "r") as fd:
content = fd.read()
return content


def read_bpf_text(dirname, c_filename="ebpf-collect.c"):
"""
Find the c program alongside the python program
"""
# Ensure we get the c program alongside
script_dir_path = os.path.dirname(os.path.abspath(__file__))
filename = os.path.join(script_dir_path, c_filename)
print(f"Looking for {filename}")
if not os.path.exists(filename):
sys.exit(f"Missing BPF C code file: {filename}")
return read_file(filename)


def get_cgroup_filter(cgroup_indicator_file):
"""
Filtering to a cgroup id can scope the results to one container.
"""
try:
with open(cgroup_indicator_file, "r") as f:
cgroup_id_filter = f.read().strip()
if cgroup_id_filter:
log(f"Scoping to cgroup {cgroup_id_filter}")
else:
log(
f"Warning: Cgroup indicator file '{cgroup_indicator_file}' is empty."
)
cgroup_id_filter = None
except Exception as e:
log(
f"Warning: Could not read cgroup indicator file '{cgroup_indicator_file}': {e}"
)
cgroup_id_filter = None # Treat as no filter
return cgroup_id_filter


def get_parser(description):
"""
Get the argument parser.
"""
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"--cgroup-indicator-file",
help="Filename with a cgroup to filter to",
)
parser.add_argument(
"--stop-indicator-file",
help="Indicator file path to stop",
)
parser.add_argument(
"--start-indicator-file",
help="Indicator file path to start",
)
parser.add_argument(
"--include-pattern",
default=None,
action="append",
help="Include these patterns only",
)
parser.add_argument(
"--exclude-pattern",
default=None,
action="append",
help="Exclude these patterns in commands",
)
parser.add_argument(
"--debug",
action="store_true",
default=False,
help="Print debug calls for open",
)
parser.add_argument(
"-j",
"--json",
action="store_true",
default=False,
help="Print records as json instead of in table",
)
return parser
147 changes: 147 additions & 0 deletions base-template/docker/bcc-sidecar/programs/cpu-model/ebpf-collect.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)

// Define SEC macro because got error and not provided by BCC
#ifndef SEC
# define SEC(name) __attribute__((section(name), used))
#endif

// Basic types, another import error...
typedef unsigned long long u64;
typedef unsigned int u32;
typedef unsigned char u8;
// pid_t is usually defined by system headers included by BCC

#define TASK_COMM_LEN 16

// --- Manually defined tracepoint context structures (from your original script) ---
struct trace_event_raw_sched_switch {
unsigned long long __unused_header;
char prev_comm[TASK_COMM_LEN];
pid_t prev_pid;
int prev_prio;
long prev_state;
char next_comm[TASK_COMM_LEN];
pid_t next_pid;
int next_prio;
};

struct trace_event_raw_sched_wakeup {
unsigned long long __unused_header;
char comm[TASK_COMM_LEN];
pid_t pid;
int prio;
int success;
int target_cpu;
};

// Event types sent to user-space
#define EVENT_TYPE_SCHED_STATS 1

// Data structure for events sent to user-space
struct sched_event_data {
u64 timestamp_ns;
u32 tgid;
u32 tid;
u64 cgroup_id;
char comm[TASK_COMM_LEN];
u64 on_cpu_ns;
u64 runq_latency_ns;
u8 event_type;
u8 prev_state_task_switched_out;
};

// --- BCC Style Map Definitions ---
BPF_HASH(task_scheduled_in_ts, pid_t, u64, 10240);
BPF_HASH(task_wakeup_ts, pid_t, u64, 10240);

// Use BCC's perf output mechanism
BPF_PERF_OUTPUT(events_out);


// --- BPF Program Functions ---

SEC("tracepoint/sched/sched_wakeup")
int tp_sched_wakeup(struct trace_event_raw_sched_wakeup *ctx) {
pid_t tid = ctx->pid;
u64 ts = bpf_ktime_get_ns();
task_wakeup_ts.update(&tid, &ts);
return 0;
}

SEC("tracepoint/sched/sched_wakeup_new")
int tp_sched_wakeup_new(struct trace_event_raw_sched_wakeup *ctx) {
pid_t tid = ctx->pid;
u64 ts = bpf_ktime_get_ns();
task_wakeup_ts.update(&tid, &ts);
return 0;
}

SEC("tracepoint/sched/sched_switch")
int tp_sched_switch(struct trace_event_raw_sched_switch *ctx) {
u64 current_ts = bpf_ktime_get_ns();
pid_t prev_tid = ctx->prev_pid;
pid_t next_tid = ctx->next_pid;
// This initializes to zero
struct sched_event_data data = {};
u64 *scheduled_in_ts_ptr;
u64 *wakeup_ts_ptr;

// --- Handle previous task switching out ---
scheduled_in_ts_ptr = task_scheduled_in_ts.lookup(&prev_tid);

if (scheduled_in_ts_ptr) {
u64 on_cpu_duration = current_ts - *scheduled_in_ts_ptr;
task_scheduled_in_ts.delete(&prev_tid);

data.timestamp_ns = current_ts;
data.tgid = prev_tid; // Note: This is TID
data.tid = prev_tid;

for (int i = 0; i < TASK_COMM_LEN; ++i) {
data.comm[i] = ctx->prev_comm[i];
if (ctx->prev_comm[i] == '\0') break;
}
data.comm[TASK_COMM_LEN - 1] = '\0';

data.cgroup_id = bpf_get_current_cgroup_id();
data.on_cpu_ns = on_cpu_duration;
data.runq_latency_ns = 0;
data.event_type = EVENT_TYPE_SCHED_STATS;
data.prev_state_task_switched_out = (u8)ctx->prev_state;

events_out.perf_submit(ctx, &data, sizeof(data));
}

// --- Handle next task switching in ---
u64 current_ts_val = current_ts;
task_scheduled_in_ts.update(&next_tid, &current_ts_val);

wakeup_ts_ptr = task_wakeup_ts.lookup(&next_tid);
if (wakeup_ts_ptr) {

// This means it's ready and wants to use the CPU)
// but was waiting for a CPU core to become available.
u64 runq_latency = current_ts - *wakeup_ts_ptr;
task_wakeup_ts.delete(&next_tid);

struct sched_event_data data_next = {};
data_next.timestamp_ns = current_ts;
data_next.tgid = next_tid; // Note: This is TID
data_next.tid = next_tid;

for (int i = 0; i < TASK_COMM_LEN; ++i) {
data_next.comm[i] = ctx->next_comm[i];
if (ctx->next_comm[i] == '\0') break;
}
data_next.comm[TASK_COMM_LEN - 1] = '\0';

data_next.cgroup_id = bpf_get_current_cgroup_id();
data_next.on_cpu_ns = 0;
data_next.runq_latency_ns = runq_latency;
data_next.event_type = EVENT_TYPE_SCHED_STATS;
data_next.prev_state_task_switched_out = 0;

events_out.perf_submit(ctx, &data_next, sizeof(data_next));
}
return 0;
}
Loading