-
Notifications
You must be signed in to change notification settings - Fork 598
Make stress tests independent of batch size #2306
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,21 +1,21 @@ | ||
| use num_format::{Locale, ToFormattedString}; | ||
| use std::cell::UnsafeCell; | ||
| use std::env; | ||
| use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; | ||
| use std::sync::atomic::{AtomicBool, Ordering}; | ||
| use std::sync::Arc; | ||
| use std::thread; | ||
| use std::time::{Duration, Instant}; | ||
| #[cfg(feature = "stats")] | ||
| use sysinfo::{Pid, System}; | ||
|
|
||
| const SLIDING_WINDOW_SIZE: u64 = 2; // In seconds | ||
| const BATCH_SIZE: u64 = 1000; | ||
|
|
||
| static STOP: AtomicBool = AtomicBool::new(false); | ||
|
|
||
| #[repr(C)] | ||
| #[derive(Default)] | ||
| struct WorkerStats { | ||
| count: AtomicU64, | ||
| count: u64, | ||
| /// We use a padding for the struct to allow each thread to have exclusive access to each WorkerStat | ||
| /// Otherwise, there would be some cpu contention with threads needing to take ownership of the cache lines | ||
| padding: [u64; 15], | ||
|
|
@@ -58,91 +58,111 @@ where | |
| } | ||
|
|
||
| println!("Number of threads: {}\n", num_threads); | ||
| let mut handles = Vec::with_capacity(num_threads); | ||
| let func_arc = Arc::new(func); | ||
| let mut worker_stats_vec: Vec<WorkerStats> = Vec::new(); | ||
|
|
||
| for _ in 0..num_threads { | ||
| worker_stats_vec.push(WorkerStats::default()); | ||
| } | ||
| let worker_stats_shared = Arc::new(worker_stats_vec); | ||
| let worker_stats_shared_monitor = Arc::clone(&worker_stats_shared); | ||
|
|
||
| let handle_main_thread = thread::spawn(move || { | ||
| let mut last_collect_time = Instant::now(); | ||
| let mut total_count_old: u64 = 0; | ||
|
|
||
| #[cfg(feature = "stats")] | ||
| let pid = Pid::from(std::process::id() as usize); | ||
| #[cfg(feature = "stats")] | ||
| let mut system = System::new_all(); | ||
|
|
||
| loop { | ||
| let current_time = Instant::now(); | ||
| let elapsed = current_time.duration_since(last_collect_time).as_secs(); | ||
| if elapsed >= SLIDING_WINDOW_SIZE { | ||
| let total_count_u64: u64 = worker_stats_shared_monitor | ||
| .iter() | ||
| .map(|worker_stat| worker_stat.count.load(Ordering::Relaxed)) | ||
| .sum(); | ||
| last_collect_time = Instant::now(); | ||
| let current_count = total_count_u64 - total_count_old; | ||
| total_count_old = total_count_u64; | ||
| let throughput = current_count / elapsed; | ||
| println!( | ||
| "Throughput: {} iterations/sec", | ||
| throughput.to_formatted_string(&Locale::en) | ||
| ); | ||
|
|
||
| #[cfg(feature = "stats")] | ||
| { | ||
| system.refresh_all(); | ||
| if let Some(process) = system.process(pid) { | ||
| println!( | ||
| "Memory usage: {:.2} MB", | ||
| process.memory() as f64 / (1024.0 * 1024.0) | ||
| ); | ||
| println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32); | ||
| println!( | ||
| "Virtual memory usage: {:.2} MB", | ||
| process.virtual_memory() as f64 / (1024.0 * 1024.0) | ||
| ); | ||
| } else { | ||
| println!("Process not found"); | ||
| let shared_mutable_stats_slice = UnsafeSlice::new(&mut worker_stats_vec); | ||
|
|
||
| thread::scope(|s| { | ||
| s.spawn(|| { | ||
| let mut last_collect_time = Instant::now(); | ||
| let mut total_count_old: u64 = 0; | ||
|
|
||
| #[cfg(feature = "stats")] | ||
| let pid = Pid::from(std::process::id() as usize); | ||
| #[cfg(feature = "stats")] | ||
| let mut system = System::new_all(); | ||
|
|
||
| loop { | ||
| let current_time = Instant::now(); | ||
| let elapsed = current_time.duration_since(last_collect_time).as_secs(); | ||
| if elapsed >= SLIDING_WINDOW_SIZE { | ||
| let total_count_u64 = shared_mutable_stats_slice.sum(); | ||
| last_collect_time = Instant::now(); | ||
| let current_count = total_count_u64 - total_count_old; | ||
| total_count_old = total_count_u64; | ||
| let throughput = current_count / elapsed; | ||
| println!( | ||
| "Throughput: {} iterations/sec", | ||
| throughput.to_formatted_string(&Locale::en) | ||
| ); | ||
|
|
||
| #[cfg(feature = "stats")] | ||
| { | ||
| system.refresh_all(); | ||
| if let Some(process) = system.process(pid) { | ||
| println!( | ||
| "Memory usage: {:.2} MB", | ||
| process.memory() as f64 / (1024.0 * 1024.0) | ||
| ); | ||
| println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32); | ||
| println!( | ||
| "Virtual memory usage: {:.2} MB", | ||
| process.virtual_memory() as f64 / (1024.0 * 1024.0) | ||
| ); | ||
| } else { | ||
| println!("Process not found"); | ||
| } | ||
| } | ||
|
|
||
| println!("\n"); | ||
| } | ||
|
|
||
| println!("\n"); | ||
| } | ||
| if STOP.load(Ordering::SeqCst) { | ||
| break; | ||
| } | ||
|
|
||
| if STOP.load(Ordering::SeqCst) { | ||
| break; | ||
| thread::sleep(Duration::from_millis(5000)); | ||
| } | ||
| }); | ||
|
|
||
| thread::sleep(Duration::from_millis(5000)); | ||
| for thread_index in 0..num_threads { | ||
| let func_arc_clone = Arc::clone(&func_arc); | ||
| s.spawn(move || loop { | ||
| func_arc_clone(); | ||
| unsafe { | ||
| shared_mutable_stats_slice.increment(thread_index); | ||
| } | ||
| if STOP.load(Ordering::SeqCst) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't we be now forced to check this AtomicBool for each iteration? I think that still be avoided and instead can be checked every BATCH_SIZE?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we still want to avoid it, I'd prefer if remove the I ran the stress test with the above-mentioned change, and there isn't much of a difference in the results. I would prefer removing it as the existing code for using
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not a blocker for this PR, lets revisit removing it alltogether in a future pr, if needed. |
||
| break; | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| handles.push(handle_main_thread); | ||
| #[derive(Copy, Clone)] | ||
| struct UnsafeSlice<'a> { | ||
| slice: &'a [UnsafeCell<WorkerStats>], | ||
| } | ||
|
|
||
| for thread_index in 0..num_threads { | ||
| let worker_stats_shared = Arc::clone(&worker_stats_shared); | ||
| let func_arc_clone = Arc::clone(&func_arc); | ||
| let handle = thread::spawn(move || loop { | ||
| for _ in 0..BATCH_SIZE { | ||
| func_arc_clone(); | ||
| } | ||
| worker_stats_shared[thread_index] | ||
| .count | ||
| .fetch_add(BATCH_SIZE, Ordering::Relaxed); | ||
| if STOP.load(Ordering::SeqCst) { | ||
| break; | ||
| } | ||
| }); | ||
| handles.push(handle) | ||
| unsafe impl<'a> Send for UnsafeSlice<'a> {} | ||
| unsafe impl<'a> Sync for UnsafeSlice<'a> {} | ||
|
|
||
| impl<'a> UnsafeSlice<'a> { | ||
| fn new(slice: &'a mut [WorkerStats]) -> Self { | ||
| let ptr = slice as *mut [WorkerStats] as *const [UnsafeCell<WorkerStats>]; | ||
| Self { | ||
| slice: unsafe { &*ptr }, | ||
| } | ||
| } | ||
|
|
||
| // SAFETY: It's assumed that no two threads will write to the same index at the same time | ||
| #[inline(always)] | ||
| unsafe fn increment(&self, i: usize) { | ||
| let value = self.slice[i].get(); | ||
| (*value).count = (*value).count + 1; | ||
| } | ||
|
|
||
| for handle in handles { | ||
| handle.join().unwrap(); | ||
| #[inline(always)] | ||
| fn sum(&self) -> u64 { | ||
| self.slice | ||
| .iter() | ||
| .map(|cell| unsafe { (*cell.get()).count }) | ||
| .sum() | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this
readoperation conflict with the concurrentwrites, as there is no safety withUnsafeCelloperation?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at worse, we'll underreport the numbers. ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At worst, we won't get the most up-to-date sum which is okay?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes under-reporting should be fine at worse. Though this can result in data corruption due to reading of partially updated values in some 32-bit machines (64-bit writes may not atomic in all 32-bit machines), but we needn't worry about that for stress test.