Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 90 additions & 70 deletions stress/src/throughput.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use num_format::{Locale, ToFormattedString};
use std::cell::UnsafeCell;
use std::env;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
#[cfg(feature = "stats")]
use sysinfo::{Pid, System};

const SLIDING_WINDOW_SIZE: u64 = 2; // In seconds
const BATCH_SIZE: u64 = 1000;

static STOP: AtomicBool = AtomicBool::new(false);

#[repr(C)]
#[derive(Default)]
struct WorkerStats {
count: AtomicU64,
count: u64,
/// We use a padding for the struct to allow each thread to have exclusive access to each WorkerStat
/// Otherwise, there would be some cpu contention with threads needing to take ownership of the cache lines
padding: [u64; 15],
Expand Down Expand Up @@ -58,91 +58,111 @@ where
}

println!("Number of threads: {}\n", num_threads);
let mut handles = Vec::with_capacity(num_threads);
let func_arc = Arc::new(func);
let mut worker_stats_vec: Vec<WorkerStats> = Vec::new();

for _ in 0..num_threads {
worker_stats_vec.push(WorkerStats::default());
}
let worker_stats_shared = Arc::new(worker_stats_vec);
let worker_stats_shared_monitor = Arc::clone(&worker_stats_shared);

let handle_main_thread = thread::spawn(move || {
let mut last_collect_time = Instant::now();
let mut total_count_old: u64 = 0;

#[cfg(feature = "stats")]
let pid = Pid::from(std::process::id() as usize);
#[cfg(feature = "stats")]
let mut system = System::new_all();

loop {
let current_time = Instant::now();
let elapsed = current_time.duration_since(last_collect_time).as_secs();
if elapsed >= SLIDING_WINDOW_SIZE {
let total_count_u64: u64 = worker_stats_shared_monitor
.iter()
.map(|worker_stat| worker_stat.count.load(Ordering::Relaxed))
.sum();
last_collect_time = Instant::now();
let current_count = total_count_u64 - total_count_old;
total_count_old = total_count_u64;
let throughput = current_count / elapsed;
println!(
"Throughput: {} iterations/sec",
throughput.to_formatted_string(&Locale::en)
);

#[cfg(feature = "stats")]
{
system.refresh_all();
if let Some(process) = system.process(pid) {
println!(
"Memory usage: {:.2} MB",
process.memory() as f64 / (1024.0 * 1024.0)
);
println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32);
println!(
"Virtual memory usage: {:.2} MB",
process.virtual_memory() as f64 / (1024.0 * 1024.0)
);
} else {
println!("Process not found");
let shared_mutable_stats_slice = UnsafeSlice::new(&mut worker_stats_vec);

thread::scope(|s| {
s.spawn(|| {
let mut last_collect_time = Instant::now();
let mut total_count_old: u64 = 0;

#[cfg(feature = "stats")]
let pid = Pid::from(std::process::id() as usize);
#[cfg(feature = "stats")]
let mut system = System::new_all();

loop {
let current_time = Instant::now();
let elapsed = current_time.duration_since(last_collect_time).as_secs();
if elapsed >= SLIDING_WINDOW_SIZE {
let total_count_u64 = shared_mutable_stats_slice.sum();
Copy link
Member

@lalitb lalitb Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this read operation conflict with the concurrent writes, as there is no safety with UnsafeCell operation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at worse, we'll underreport the numbers. ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At worst, we won't get the most up-to-date sum which is okay?

Copy link
Member

@lalitb lalitb Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes under-reporting should be fine at worse. Though this can result in data corruption due to reading of partially updated values in some 32-bit machines (64-bit writes may not atomic in all 32-bit machines), but we needn't worry about that for stress test.

last_collect_time = Instant::now();
let current_count = total_count_u64 - total_count_old;
total_count_old = total_count_u64;
let throughput = current_count / elapsed;
println!(
"Throughput: {} iterations/sec",
throughput.to_formatted_string(&Locale::en)
);

#[cfg(feature = "stats")]
{
system.refresh_all();
if let Some(process) = system.process(pid) {
println!(
"Memory usage: {:.2} MB",
process.memory() as f64 / (1024.0 * 1024.0)
);
println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32);
println!(
"Virtual memory usage: {:.2} MB",
process.virtual_memory() as f64 / (1024.0 * 1024.0)
);
} else {
println!("Process not found");
}
}

println!("\n");
}

println!("\n");
}
if STOP.load(Ordering::SeqCst) {
break;
}

if STOP.load(Ordering::SeqCst) {
break;
thread::sleep(Duration::from_millis(5000));
}
});

thread::sleep(Duration::from_millis(5000));
for thread_index in 0..num_threads {
let func_arc_clone = Arc::clone(&func_arc);
s.spawn(move || loop {
func_arc_clone();
unsafe {
shared_mutable_stats_slice.increment(thread_index);
}
if STOP.load(Ordering::SeqCst) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't we be now forced to check this AtomicBool for each iteration? I think that still be avoided and instead can be checked every BATCH_SIZE?

Copy link
Contributor Author

@utpilla utpilla Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the STOP is only ever changed when exiting the stress test, the cache line where STOP resides wouldn't have to be updated during the test. So, I don't expect it to have much of a perf implication. (don't think it suffers from false sharing here as STOP is static).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we still want to avoid it, I'd prefer if remove the STOP variable altogether and simply exit the process on Ctrl + C.

I ran the stress test with the above-mentioned change, and there isn't much of a difference in the results. I would prefer removing it as the existing code for using STOP doesn't add much value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a blocker for this PR, lets revisit removing it alltogether in a future pr, if needed.

break;
}
});
}
});
}

handles.push(handle_main_thread);
#[derive(Copy, Clone)]
struct UnsafeSlice<'a> {
slice: &'a [UnsafeCell<WorkerStats>],
}

for thread_index in 0..num_threads {
let worker_stats_shared = Arc::clone(&worker_stats_shared);
let func_arc_clone = Arc::clone(&func_arc);
let handle = thread::spawn(move || loop {
for _ in 0..BATCH_SIZE {
func_arc_clone();
}
worker_stats_shared[thread_index]
.count
.fetch_add(BATCH_SIZE, Ordering::Relaxed);
if STOP.load(Ordering::SeqCst) {
break;
}
});
handles.push(handle)
unsafe impl<'a> Send for UnsafeSlice<'a> {}
unsafe impl<'a> Sync for UnsafeSlice<'a> {}

impl<'a> UnsafeSlice<'a> {
fn new(slice: &'a mut [WorkerStats]) -> Self {
let ptr = slice as *mut [WorkerStats] as *const [UnsafeCell<WorkerStats>];
Self {
slice: unsafe { &*ptr },
}
}

// SAFETY: It's assumed that no two threads will write to the same index at the same time
#[inline(always)]
unsafe fn increment(&self, i: usize) {
let value = self.slice[i].get();
(*value).count = (*value).count + 1;
}

for handle in handles {
handle.join().unwrap();
#[inline(always)]
fn sum(&self) -> u64 {
self.slice
.iter()
.map(|cell| unsafe { (*cell.get()).count })
.sum()
}
}
Loading