Skip to content

Commit 41afd7f

Browse files
authored
Make stress tests independent of batch size (#2306)
1 parent 3ac2d9f commit 41afd7f

File tree

1 file changed

+90
-70
lines changed

1 file changed

+90
-70
lines changed

stress/src/throughput.rs

Lines changed: 90 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
use num_format::{Locale, ToFormattedString};
2+
use std::cell::UnsafeCell;
23
use std::env;
3-
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4+
use std::sync::atomic::{AtomicBool, Ordering};
45
use std::sync::Arc;
56
use std::thread;
67
use std::time::{Duration, Instant};
78
#[cfg(feature = "stats")]
89
use sysinfo::{Pid, System};
910

1011
const SLIDING_WINDOW_SIZE: u64 = 2; // In seconds
11-
const BATCH_SIZE: u64 = 1000;
1212

1313
static STOP: AtomicBool = AtomicBool::new(false);
1414

1515
#[repr(C)]
1616
#[derive(Default)]
1717
struct WorkerStats {
18-
count: AtomicU64,
18+
count: u64,
1919
/// We use a padding for the struct to allow each thread to have exclusive access to each WorkerStat
2020
/// Otherwise, there would be some cpu contention with threads needing to take ownership of the cache lines
2121
padding: [u64; 15],
@@ -58,91 +58,111 @@ where
5858
}
5959

6060
println!("Number of threads: {}\n", num_threads);
61-
let mut handles = Vec::with_capacity(num_threads);
6261
let func_arc = Arc::new(func);
6362
let mut worker_stats_vec: Vec<WorkerStats> = Vec::new();
6463

6564
for _ in 0..num_threads {
6665
worker_stats_vec.push(WorkerStats::default());
6766
}
68-
let worker_stats_shared = Arc::new(worker_stats_vec);
69-
let worker_stats_shared_monitor = Arc::clone(&worker_stats_shared);
70-
71-
let handle_main_thread = thread::spawn(move || {
72-
let mut last_collect_time = Instant::now();
73-
let mut total_count_old: u64 = 0;
74-
75-
#[cfg(feature = "stats")]
76-
let pid = Pid::from(std::process::id() as usize);
77-
#[cfg(feature = "stats")]
78-
let mut system = System::new_all();
79-
80-
loop {
81-
let current_time = Instant::now();
82-
let elapsed = current_time.duration_since(last_collect_time).as_secs();
83-
if elapsed >= SLIDING_WINDOW_SIZE {
84-
let total_count_u64: u64 = worker_stats_shared_monitor
85-
.iter()
86-
.map(|worker_stat| worker_stat.count.load(Ordering::Relaxed))
87-
.sum();
88-
last_collect_time = Instant::now();
89-
let current_count = total_count_u64 - total_count_old;
90-
total_count_old = total_count_u64;
91-
let throughput = current_count / elapsed;
92-
println!(
93-
"Throughput: {} iterations/sec",
94-
throughput.to_formatted_string(&Locale::en)
95-
);
9667

97-
#[cfg(feature = "stats")]
98-
{
99-
system.refresh_all();
100-
if let Some(process) = system.process(pid) {
101-
println!(
102-
"Memory usage: {:.2} MB",
103-
process.memory() as f64 / (1024.0 * 1024.0)
104-
);
105-
println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32);
106-
println!(
107-
"Virtual memory usage: {:.2} MB",
108-
process.virtual_memory() as f64 / (1024.0 * 1024.0)
109-
);
110-
} else {
111-
println!("Process not found");
68+
let shared_mutable_stats_slice = UnsafeSlice::new(&mut worker_stats_vec);
69+
70+
thread::scope(|s| {
71+
s.spawn(|| {
72+
let mut last_collect_time = Instant::now();
73+
let mut total_count_old: u64 = 0;
74+
75+
#[cfg(feature = "stats")]
76+
let pid = Pid::from(std::process::id() as usize);
77+
#[cfg(feature = "stats")]
78+
let mut system = System::new_all();
79+
80+
loop {
81+
let current_time = Instant::now();
82+
let elapsed = current_time.duration_since(last_collect_time).as_secs();
83+
if elapsed >= SLIDING_WINDOW_SIZE {
84+
let total_count_u64 = shared_mutable_stats_slice.sum();
85+
last_collect_time = Instant::now();
86+
let current_count = total_count_u64 - total_count_old;
87+
total_count_old = total_count_u64;
88+
let throughput = current_count / elapsed;
89+
println!(
90+
"Throughput: {} iterations/sec",
91+
throughput.to_formatted_string(&Locale::en)
92+
);
93+
94+
#[cfg(feature = "stats")]
95+
{
96+
system.refresh_all();
97+
if let Some(process) = system.process(pid) {
98+
println!(
99+
"Memory usage: {:.2} MB",
100+
process.memory() as f64 / (1024.0 * 1024.0)
101+
);
102+
println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32);
103+
println!(
104+
"Virtual memory usage: {:.2} MB",
105+
process.virtual_memory() as f64 / (1024.0 * 1024.0)
106+
);
107+
} else {
108+
println!("Process not found");
109+
}
112110
}
111+
112+
println!("\n");
113113
}
114114

115-
println!("\n");
116-
}
115+
if STOP.load(Ordering::SeqCst) {
116+
break;
117+
}
117118

118-
if STOP.load(Ordering::SeqCst) {
119-
break;
119+
thread::sleep(Duration::from_millis(5000));
120120
}
121+
});
121122

122-
thread::sleep(Duration::from_millis(5000));
123+
for thread_index in 0..num_threads {
124+
let func_arc_clone = Arc::clone(&func_arc);
125+
s.spawn(move || loop {
126+
func_arc_clone();
127+
unsafe {
128+
shared_mutable_stats_slice.increment(thread_index);
129+
}
130+
if STOP.load(Ordering::SeqCst) {
131+
break;
132+
}
133+
});
123134
}
124135
});
136+
}
125137

126-
handles.push(handle_main_thread);
138+
#[derive(Copy, Clone)]
139+
struct UnsafeSlice<'a> {
140+
slice: &'a [UnsafeCell<WorkerStats>],
141+
}
127142

128-
for thread_index in 0..num_threads {
129-
let worker_stats_shared = Arc::clone(&worker_stats_shared);
130-
let func_arc_clone = Arc::clone(&func_arc);
131-
let handle = thread::spawn(move || loop {
132-
for _ in 0..BATCH_SIZE {
133-
func_arc_clone();
134-
}
135-
worker_stats_shared[thread_index]
136-
.count
137-
.fetch_add(BATCH_SIZE, Ordering::Relaxed);
138-
if STOP.load(Ordering::SeqCst) {
139-
break;
140-
}
141-
});
142-
handles.push(handle)
143+
unsafe impl<'a> Send for UnsafeSlice<'a> {}
144+
unsafe impl<'a> Sync for UnsafeSlice<'a> {}
145+
146+
impl<'a> UnsafeSlice<'a> {
147+
fn new(slice: &'a mut [WorkerStats]) -> Self {
148+
let ptr = slice as *mut [WorkerStats] as *const [UnsafeCell<WorkerStats>];
149+
Self {
150+
slice: unsafe { &*ptr },
151+
}
152+
}
153+
154+
// SAFETY: It's assumed that no two threads will write to the same index at the same time
155+
#[inline(always)]
156+
unsafe fn increment(&self, i: usize) {
157+
let value = self.slice[i].get();
158+
(*value).count = (*value).count + 1;
143159
}
144160

145-
for handle in handles {
146-
handle.join().unwrap();
161+
#[inline(always)]
162+
fn sum(&self) -> u64 {
163+
self.slice
164+
.iter()
165+
.map(|cell| unsafe { (*cell.get()).count })
166+
.sum()
147167
}
148168
}

0 commit comments

Comments
 (0)