diff --git a/stress/src/throughput.rs b/stress/src/throughput.rs index c77832a33c..8116f904ee 100644 --- a/stress/src/throughput.rs +++ b/stress/src/throughput.rs @@ -1,6 +1,7 @@ use num_format::{Locale, ToFormattedString}; +use std::cell::UnsafeCell; use std::env; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; @@ -8,14 +9,13 @@ use std::time::{Duration, Instant}; use sysinfo::{Pid, System}; const SLIDING_WINDOW_SIZE: u64 = 2; // In seconds -const BATCH_SIZE: u64 = 1000; static STOP: AtomicBool = AtomicBool::new(false); #[repr(C)] #[derive(Default)] struct WorkerStats { - count: AtomicU64, + count: u64, /// We use a padding for the struct to allow each thread to have exclusive access to each WorkerStat /// Otherwise, there would be some cpu contention with threads needing to take ownership of the cache lines padding: [u64; 15], @@ -58,91 +58,111 @@ where } println!("Number of threads: {}\n", num_threads); - let mut handles = Vec::with_capacity(num_threads); let func_arc = Arc::new(func); let mut worker_stats_vec: Vec = Vec::new(); for _ in 0..num_threads { worker_stats_vec.push(WorkerStats::default()); } - let worker_stats_shared = Arc::new(worker_stats_vec); - let worker_stats_shared_monitor = Arc::clone(&worker_stats_shared); - - let handle_main_thread = thread::spawn(move || { - let mut last_collect_time = Instant::now(); - let mut total_count_old: u64 = 0; - - #[cfg(feature = "stats")] - let pid = Pid::from(std::process::id() as usize); - #[cfg(feature = "stats")] - let mut system = System::new_all(); - - loop { - let current_time = Instant::now(); - let elapsed = current_time.duration_since(last_collect_time).as_secs(); - if elapsed >= SLIDING_WINDOW_SIZE { - let total_count_u64: u64 = worker_stats_shared_monitor - .iter() - .map(|worker_stat| worker_stat.count.load(Ordering::Relaxed)) - .sum(); - last_collect_time = Instant::now(); - let current_count = total_count_u64 - total_count_old; - total_count_old = total_count_u64; - let throughput = current_count / elapsed; - println!( - "Throughput: {} iterations/sec", - throughput.to_formatted_string(&Locale::en) - ); - #[cfg(feature = "stats")] - { - system.refresh_all(); - if let Some(process) = system.process(pid) { - println!( - "Memory usage: {:.2} MB", - process.memory() as f64 / (1024.0 * 1024.0) - ); - println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32); - println!( - "Virtual memory usage: {:.2} MB", - process.virtual_memory() as f64 / (1024.0 * 1024.0) - ); - } else { - println!("Process not found"); + let shared_mutable_stats_slice = UnsafeSlice::new(&mut worker_stats_vec); + + thread::scope(|s| { + s.spawn(|| { + let mut last_collect_time = Instant::now(); + let mut total_count_old: u64 = 0; + + #[cfg(feature = "stats")] + let pid = Pid::from(std::process::id() as usize); + #[cfg(feature = "stats")] + let mut system = System::new_all(); + + loop { + let current_time = Instant::now(); + let elapsed = current_time.duration_since(last_collect_time).as_secs(); + if elapsed >= SLIDING_WINDOW_SIZE { + let total_count_u64 = shared_mutable_stats_slice.sum(); + last_collect_time = Instant::now(); + let current_count = total_count_u64 - total_count_old; + total_count_old = total_count_u64; + let throughput = current_count / elapsed; + println!( + "Throughput: {} iterations/sec", + throughput.to_formatted_string(&Locale::en) + ); + + #[cfg(feature = "stats")] + { + system.refresh_all(); + if let Some(process) = system.process(pid) { + println!( + "Memory usage: {:.2} MB", + process.memory() as f64 / (1024.0 * 1024.0) + ); + println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32); + println!( + "Virtual memory usage: {:.2} MB", + process.virtual_memory() as f64 / (1024.0 * 1024.0) + ); + } else { + println!("Process not found"); + } } + + println!("\n"); } - println!("\n"); - } + if STOP.load(Ordering::SeqCst) { + break; + } - if STOP.load(Ordering::SeqCst) { - break; + thread::sleep(Duration::from_millis(5000)); } + }); - thread::sleep(Duration::from_millis(5000)); + for thread_index in 0..num_threads { + let func_arc_clone = Arc::clone(&func_arc); + s.spawn(move || loop { + func_arc_clone(); + unsafe { + shared_mutable_stats_slice.increment(thread_index); + } + if STOP.load(Ordering::SeqCst) { + break; + } + }); } }); +} - handles.push(handle_main_thread); +#[derive(Copy, Clone)] +struct UnsafeSlice<'a> { + slice: &'a [UnsafeCell], +} - for thread_index in 0..num_threads { - let worker_stats_shared = Arc::clone(&worker_stats_shared); - let func_arc_clone = Arc::clone(&func_arc); - let handle = thread::spawn(move || loop { - for _ in 0..BATCH_SIZE { - func_arc_clone(); - } - worker_stats_shared[thread_index] - .count - .fetch_add(BATCH_SIZE, Ordering::Relaxed); - if STOP.load(Ordering::SeqCst) { - break; - } - }); - handles.push(handle) +unsafe impl<'a> Send for UnsafeSlice<'a> {} +unsafe impl<'a> Sync for UnsafeSlice<'a> {} + +impl<'a> UnsafeSlice<'a> { + fn new(slice: &'a mut [WorkerStats]) -> Self { + let ptr = slice as *mut [WorkerStats] as *const [UnsafeCell]; + Self { + slice: unsafe { &*ptr }, + } + } + + // SAFETY: It's assumed that no two threads will write to the same index at the same time + #[inline(always)] + unsafe fn increment(&self, i: usize) { + let value = self.slice[i].get(); + (*value).count = (*value).count + 1; } - for handle in handles { - handle.join().unwrap(); + #[inline(always)] + fn sum(&self) -> u64 { + self.slice + .iter() + .map(|cell| unsafe { (*cell.get()).count }) + .sum() } }