Skip to content

Commit 4130e0b

Browse files
committed
nit: make common functions between programs
Signed-off-by: vsoch <[email protected]>
1 parent 8580bc3 commit 4130e0b

File tree

15 files changed

+1820
-783
lines changed

15 files changed

+1820
-783
lines changed

README.md

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ NODES=1
5858
GOOGLE_PROJECT=llnl-flux
5959
INSTANCE=h3-standard-88
6060
time gcloud container clusters create test-cluster --threads-per-core=1 --num-nodes=$NODES --machine-type=$INSTANCE --placement-type=COMPACT --image-type=UBUNTU_CONTAINERD --region=us-central1-a --project=${GOOGLE_PROJECT}
61+
62+
# When time to delete
63+
gcloud container clusters delete test-cluster --region=us-central1-a
6164
```
6265

6366
Finally, install the Flux Operator
@@ -579,22 +582,13 @@ Here is how to see futex wait times.
579582

580583
```bash
581584
helm install \
582-
--set experiment.monitor_program=futex \
585+
--set experiment.monitor_program=futex-model \
583586
--set experiment.monitor=true \
584587
--set minicluster.save_logs=true \
585588
lammps ./lammps-reax
586589
```
587590

588-
This likely needs to be consolidated (it's a lot of data). Here is an example.
589-
590-
```bash
591-
{"event_type": "FUTEX_WAIT_END", "timestamp_sec": 3891.635076022, "tgid": 34931, "tid": 35146, "comm": "containerd-shim", "cgroup_id": 6520, "futex_op_full": 128, "futex_op_str": "FUTEX_WAIT_PRIVATE", "wait_duration_ns": 21462, "wait_duration_human": "21.46us"}
592-
{"event_type": "FUTEX_WAIT_END", "timestamp_sec": 3891.635086712, "tgid": 34931, "tid": 35388, "comm": "containerd-shim", "cgroup_id": 6520, "futex_op_full": 128, "futex_op_str": "FUTEX_WAIT_PRIVATE", "wait_duration_ns": 42892, "wait_duration_human": "42.89us"}
593-
{"event_type": "FUTEX_WAIT_END", "timestamp_sec": 3891.635102633, "tgid": 3600, "tid": 3602, "comm": "containerd", "cgroup_id": 6520, "futex_op_full": 128, "futex_op_str": "FUTEX_WAIT_PRIVATE", "wait_duration_ns": 16693, "wait_duration_human": "16.69us"}
594-
{"event_type": "FUTEX_WAIT_END", "timestamp_sec": 3891.635107428, "tgid": 3600, "tid": 6823, "comm": "containerd", "cgroup_id": 6520, "futex_op_full": 128, "futex_op_str": "FUTEX_WAIT_PRIVATE", "wait_duration_ns": 34781, "wait_duration_human": "34.78us"}
595-
```
596-
597-
Finally, here is tcp
591+
The `futex-model` is going to print river models grouped by tgid and comm. Here is tcp
598592

599593
```bash
600594
helm install \
@@ -613,6 +607,27 @@ Here is example data:
613607
{"event_type": "RECV", "timestamp_sec": 3978.855737251, "tgid": 36693, "tid": 36693, "comm": "lmp", "cgroup_id":
614608
```
615609

610+
Try shared memory:
611+
612+
```bash
613+
helm install \
614+
--set experiment.monitor_program=shmem \
615+
--set experiment.monitor=true \
616+
--set minicluster.save_logs=true \
617+
lammps ./lammps-reax
618+
```
619+
620+
And cpu (using river ML for summary models):
621+
622+
```bash
623+
helm install \
624+
--set experiment.monitor_program=cpu-models \
625+
--set experiment.monitor=true \
626+
--set minicluster.save_logs=true \
627+
lammps ./lammps-reax
628+
```
629+
630+
616631
Also try changing the command entirely.
617632

618633
```bash

base-template/docker/bcc-sidecar/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ RUN git clone -b v0.34.0 https://github.com/iovisor/bcc /usr/src/bcc && \
4848

4949
# Set up environment for Python BCC tools (if needed)
5050
ENV PYTHONPATH=/usr/lib/python3/dist-packages
51+
RUN python3 -m pip install river --break-system-packages
5152
COPY ./programs /opt/programs
52-
5353
# Command to keep the container running
5454
CMD ["tail", "-f", "/dev/null"]
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# Helper functions shared across programs
2+
3+
import argparse
4+
import os
5+
import sys
6+
import time
7+
8+
9+
def log(message, prefix="", exit_flag=False):
10+
ts = time.strftime("%H:%M:%S", time.localtime())
11+
if prefix:
12+
prefix = f"{prefix} "
13+
print(f"[{ts}] {prefix}{message}", file=sys.stderr)
14+
if exit_flag:
15+
sys.exit(1)
16+
17+
18+
def read_file(filepath):
19+
"""
20+
Read a text file
21+
"""
22+
with open(filepath, "r") as fd:
23+
content = fd.read()
24+
return content
25+
26+
27+
def read_bpf_text(dirname, c_filename="ebpf-collect.c"):
28+
"""
29+
Find the c program alongside the python program
30+
"""
31+
# Ensure we get the c program alongside
32+
script_dir_path = os.path.dirname(os.path.abspath(__file__))
33+
filename = os.path.join(script_dir_path, c_filename)
34+
print(f"Looking for {filename}")
35+
if not os.path.exists(filename):
36+
sys.exit(f"Missing BPF C code file: {filename}")
37+
return read_file(filename)
38+
39+
40+
def get_cgroup_filter(cgroup_indicator_file):
41+
"""
42+
Filtering to a cgroup id can scope the results to one container.
43+
"""
44+
try:
45+
with open(cgroup_indicator_file, "r") as f:
46+
cgroup_id_filter = f.read().strip()
47+
if cgroup_id_filter:
48+
log(f"Scoping to cgroup {cgroup_id_filter}")
49+
else:
50+
log(
51+
f"Warning: Cgroup indicator file '{cgroup_indicator_file}' is empty."
52+
)
53+
cgroup_id_filter = None
54+
except Exception as e:
55+
log(
56+
f"Warning: Could not read cgroup indicator file '{cgroup_indicator_file}': {e}"
57+
)
58+
cgroup_id_filter = None # Treat as no filter
59+
return cgroup_id_filter
60+
61+
62+
def get_parser(description):
63+
"""
64+
Get the argument parser.
65+
"""
66+
parser = argparse.ArgumentParser(description=description)
67+
parser.add_argument(
68+
"--cgroup-indicator-file",
69+
help="Filename with a cgroup to filter to",
70+
)
71+
parser.add_argument(
72+
"--stop-indicator-file",
73+
help="Indicator file path to stop",
74+
)
75+
parser.add_argument(
76+
"--start-indicator-file",
77+
help="Indicator file path to start",
78+
)
79+
parser.add_argument(
80+
"--include-pattern",
81+
default=None,
82+
action="append",
83+
help="Include these patterns only",
84+
)
85+
parser.add_argument(
86+
"--exclude-pattern",
87+
default=None,
88+
action="append",
89+
help="Exclude these patterns in commands",
90+
)
91+
parser.add_argument(
92+
"--debug",
93+
action="store_true",
94+
default=False,
95+
help="Print debug calls for open",
96+
)
97+
parser.add_argument(
98+
"-j",
99+
"--json",
100+
action="store_true",
101+
default=False,
102+
help="Print records as json instead of in table",
103+
)
104+
return parser
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
2+
3+
// Define SEC macro because got error and not provided by BCC
4+
#ifndef SEC
5+
# define SEC(name) __attribute__((section(name), used))
6+
#endif
7+
8+
// Basic types, another import error...
9+
typedef unsigned long long u64;
10+
typedef unsigned int u32;
11+
typedef unsigned char u8;
12+
// pid_t is usually defined by system headers included by BCC
13+
14+
#define TASK_COMM_LEN 16
15+
16+
// --- Manually defined tracepoint context structures (from your original script) ---
17+
struct trace_event_raw_sched_switch {
18+
unsigned long long __unused_header;
19+
char prev_comm[TASK_COMM_LEN];
20+
pid_t prev_pid;
21+
int prev_prio;
22+
long prev_state;
23+
char next_comm[TASK_COMM_LEN];
24+
pid_t next_pid;
25+
int next_prio;
26+
};
27+
28+
struct trace_event_raw_sched_wakeup {
29+
unsigned long long __unused_header;
30+
char comm[TASK_COMM_LEN];
31+
pid_t pid;
32+
int prio;
33+
int success;
34+
int target_cpu;
35+
};
36+
37+
// Event types sent to user-space
38+
#define EVENT_TYPE_SCHED_STATS 1
39+
40+
// Data structure for events sent to user-space
41+
struct sched_event_data {
42+
u64 timestamp_ns;
43+
u32 tgid;
44+
u32 tid;
45+
u64 cgroup_id;
46+
char comm[TASK_COMM_LEN];
47+
u64 on_cpu_ns;
48+
u64 runq_latency_ns;
49+
u8 event_type;
50+
u8 prev_state_task_switched_out;
51+
};
52+
53+
// --- BCC Style Map Definitions ---
54+
BPF_HASH(task_scheduled_in_ts, pid_t, u64, 10240);
55+
BPF_HASH(task_wakeup_ts, pid_t, u64, 10240);
56+
57+
// Use BCC's perf output mechanism
58+
BPF_PERF_OUTPUT(events_out);
59+
60+
61+
// --- BPF Program Functions ---
62+
63+
SEC("tracepoint/sched/sched_wakeup")
64+
int tp_sched_wakeup(struct trace_event_raw_sched_wakeup *ctx) {
65+
pid_t tid = ctx->pid;
66+
u64 ts = bpf_ktime_get_ns();
67+
task_wakeup_ts.update(&tid, &ts);
68+
return 0;
69+
}
70+
71+
SEC("tracepoint/sched/sched_wakeup_new")
72+
int tp_sched_wakeup_new(struct trace_event_raw_sched_wakeup *ctx) {
73+
pid_t tid = ctx->pid;
74+
u64 ts = bpf_ktime_get_ns();
75+
task_wakeup_ts.update(&tid, &ts);
76+
return 0;
77+
}
78+
79+
SEC("tracepoint/sched/sched_switch")
80+
int tp_sched_switch(struct trace_event_raw_sched_switch *ctx) {
81+
u64 current_ts = bpf_ktime_get_ns();
82+
pid_t prev_tid = ctx->prev_pid;
83+
pid_t next_tid = ctx->next_pid;
84+
// This initializes to zero
85+
struct sched_event_data data = {};
86+
u64 *scheduled_in_ts_ptr;
87+
u64 *wakeup_ts_ptr;
88+
89+
// --- Handle previous task switching out ---
90+
scheduled_in_ts_ptr = task_scheduled_in_ts.lookup(&prev_tid);
91+
92+
if (scheduled_in_ts_ptr) {
93+
u64 on_cpu_duration = current_ts - *scheduled_in_ts_ptr;
94+
task_scheduled_in_ts.delete(&prev_tid);
95+
96+
data.timestamp_ns = current_ts;
97+
data.tgid = prev_tid; // Note: This is TID
98+
data.tid = prev_tid;
99+
100+
for (int i = 0; i < TASK_COMM_LEN; ++i) {
101+
data.comm[i] = ctx->prev_comm[i];
102+
if (ctx->prev_comm[i] == '\0') break;
103+
}
104+
data.comm[TASK_COMM_LEN - 1] = '\0';
105+
106+
data.cgroup_id = bpf_get_current_cgroup_id();
107+
data.on_cpu_ns = on_cpu_duration;
108+
data.runq_latency_ns = 0;
109+
data.event_type = EVENT_TYPE_SCHED_STATS;
110+
data.prev_state_task_switched_out = (u8)ctx->prev_state;
111+
112+
events_out.perf_submit(ctx, &data, sizeof(data));
113+
}
114+
115+
// --- Handle next task switching in ---
116+
u64 current_ts_val = current_ts;
117+
task_scheduled_in_ts.update(&next_tid, &current_ts_val);
118+
119+
wakeup_ts_ptr = task_wakeup_ts.lookup(&next_tid);
120+
if (wakeup_ts_ptr) {
121+
122+
// This means it's ready and wants to use the CPU)
123+
// but was waiting for a CPU core to become available.
124+
u64 runq_latency = current_ts - *wakeup_ts_ptr;
125+
task_wakeup_ts.delete(&next_tid);
126+
127+
struct sched_event_data data_next = {};
128+
data_next.timestamp_ns = current_ts;
129+
data_next.tgid = next_tid; // Note: This is TID
130+
data_next.tid = next_tid;
131+
132+
for (int i = 0; i < TASK_COMM_LEN; ++i) {
133+
data_next.comm[i] = ctx->next_comm[i];
134+
if (ctx->next_comm[i] == '\0') break;
135+
}
136+
data_next.comm[TASK_COMM_LEN - 1] = '\0';
137+
138+
data_next.cgroup_id = bpf_get_current_cgroup_id();
139+
data_next.on_cpu_ns = 0;
140+
data_next.runq_latency_ns = runq_latency;
141+
data_next.event_type = EVENT_TYPE_SCHED_STATS;
142+
data_next.prev_state_task_switched_out = 0;
143+
144+
events_out.perf_submit(ctx, &data_next, sizeof(data_next));
145+
}
146+
return 0;
147+
}

0 commit comments

Comments
 (0)