Skip to content

Commit 037f914

Browse files
lalitbcijothomas
andauthored
Add stress-test framework to contrib repo. (#138)
Co-authored-by: Cijo Thomas <[email protected]>
1 parent 4f2ac34 commit 037f914

File tree

4 files changed

+231
-0
lines changed

4 files changed

+231
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
members = [
33
"opentelemetry-*",
44
"examples/*",
5+
"stress",
56
]
67
resolver = "2"
78

stress/Cargo.toml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[package]
2+
name = "stress"
3+
version = "0.1.0"
4+
edition = "2021"
5+
publish = false
6+
7+
[[bin]] # Bin to run the metrics stress tests for Logs UserEvent Exporter
8+
name = "user-events-enabled"
9+
path = "src/user_events_enabled.rs"
10+
doc = false
11+
12+
[dependencies]
13+
ctrlc = "3.2.5"
14+
lazy_static = "1.4.0"
15+
num_cpus = "1.15.0"
16+
num-format = "0.4.4"
17+
sysinfo = { version = "0.32", optional = true }
18+
eventheader_dynamic = "0.4.0"
19+
20+
[features]
21+
stats = ["sysinfo"]
22+
23+
[profile.release]
24+
debug = true

stress/src/throughput.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
use num_format::{Locale, ToFormattedString};
2+
use std::cell::UnsafeCell;
3+
use std::env;
4+
use std::sync::atomic::{AtomicBool, Ordering};
5+
use std::sync::Arc;
6+
use std::thread;
7+
use std::time::{Duration, Instant};
8+
#[cfg(feature = "stats")]
9+
use sysinfo::{Pid, System};
10+
11+
const SLIDING_WINDOW_SIZE: u64 = 2; // In seconds
12+
13+
static STOP: AtomicBool = AtomicBool::new(false);
14+
15+
#[repr(C)]
16+
#[derive(Default)]
17+
struct WorkerStats {
18+
count: u64,
19+
/// We use a padding for the struct to allow each thread to have exclusive access to each WorkerStat
20+
/// Otherwise, there would be some cpu contention with threads needing to take ownership of the cache lines
21+
padding: [u64; 15],
22+
}
23+
24+
pub fn test_throughput<F>(func: F)
25+
where
26+
F: Fn() + Sync + Send + 'static,
27+
{
28+
ctrlc::set_handler(move || {
29+
STOP.store(true, Ordering::SeqCst);
30+
})
31+
.expect("Error setting Ctrl-C handler");
32+
33+
let mut num_threads = num_cpus::get();
34+
let mut args_iter = env::args();
35+
36+
if let Some(arg_str) = args_iter.nth(1) {
37+
let arg = arg_str.parse::<usize>();
38+
39+
if arg.is_err() {
40+
eprintln!("Invalid command line argument '{}' as number of threads. Make sure the value is a positive integer.", arg_str);
41+
std::process::exit(1);
42+
}
43+
44+
let arg_num = arg.unwrap();
45+
46+
if arg_num > 0 {
47+
if arg_num > num_cpus::get() {
48+
println!(
49+
"Specified {} threads which is larger than the number of logical cores ({})!",
50+
arg_num, num_threads
51+
);
52+
}
53+
num_threads = arg_num;
54+
} else {
55+
eprintln!("Invalid command line argument {} as number of threads. Make sure the value is above 0 and less than or equal to number of available logical cores ({}).", arg_num, num_threads);
56+
std::process::exit(1);
57+
}
58+
}
59+
60+
println!("Number of threads: {}\n", num_threads);
61+
let func_arc = Arc::new(func);
62+
let mut worker_stats_vec: Vec<WorkerStats> = Vec::new();
63+
64+
for _ in 0..num_threads {
65+
worker_stats_vec.push(WorkerStats::default());
66+
}
67+
68+
let shared_mutable_stats_slice = UnsafeSlice::new(&mut worker_stats_vec);
69+
70+
thread::scope(|s| {
71+
s.spawn(|| {
72+
let mut last_collect_time = Instant::now();
73+
let mut total_count_old: u64 = 0;
74+
75+
#[cfg(feature = "stats")]
76+
let pid = Pid::from(std::process::id() as usize);
77+
#[cfg(feature = "stats")]
78+
let mut system = System::new_all();
79+
80+
loop {
81+
let current_time = Instant::now();
82+
let elapsed = current_time.duration_since(last_collect_time).as_secs();
83+
if elapsed >= SLIDING_WINDOW_SIZE {
84+
let total_count_u64 = shared_mutable_stats_slice.sum();
85+
last_collect_time = Instant::now();
86+
let current_count = total_count_u64 - total_count_old;
87+
total_count_old = total_count_u64;
88+
let throughput = current_count / elapsed;
89+
println!(
90+
"Throughput: {} iterations/sec",
91+
throughput.to_formatted_string(&Locale::en)
92+
);
93+
94+
#[cfg(feature = "stats")]
95+
{
96+
system.refresh_all();
97+
if let Some(process) = system.process(pid) {
98+
println!(
99+
"Memory usage: {:.2} MB",
100+
process.memory() as f64 / (1024.0 * 1024.0)
101+
);
102+
println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32);
103+
println!(
104+
"Virtual memory usage: {:.2} MB",
105+
process.virtual_memory() as f64 / (1024.0 * 1024.0)
106+
);
107+
} else {
108+
println!("Process not found");
109+
}
110+
}
111+
112+
println!("\n");
113+
}
114+
115+
if STOP.load(Ordering::SeqCst) {
116+
break;
117+
}
118+
119+
thread::sleep(Duration::from_millis(5000));
120+
}
121+
});
122+
123+
for thread_index in 0..num_threads {
124+
let func_arc_clone = Arc::clone(&func_arc);
125+
s.spawn(move || loop {
126+
func_arc_clone();
127+
unsafe {
128+
shared_mutable_stats_slice.increment(thread_index);
129+
}
130+
if STOP.load(Ordering::SeqCst) {
131+
break;
132+
}
133+
});
134+
}
135+
});
136+
}
137+
138+
#[derive(Copy, Clone)]
139+
struct UnsafeSlice<'a> {
140+
slice: &'a [UnsafeCell<WorkerStats>],
141+
}
142+
143+
unsafe impl Send for UnsafeSlice<'_> {}
144+
unsafe impl Sync for UnsafeSlice<'_> {}
145+
146+
impl<'a> UnsafeSlice<'a> {
147+
fn new(slice: &'a mut [WorkerStats]) -> Self {
148+
let ptr = slice as *mut [WorkerStats] as *const [UnsafeCell<WorkerStats>];
149+
Self {
150+
slice: unsafe { &*ptr },
151+
}
152+
}
153+
154+
// SAFETY: It's assumed that no two threads will write to the same index at the same time
155+
#[inline(always)]
156+
unsafe fn increment(&self, i: usize) {
157+
let value = self.slice[i].get();
158+
(*value).count += 1;
159+
}
160+
161+
#[inline(always)]
162+
fn sum(&self) -> u64 {
163+
self.slice
164+
.iter()
165+
.map(|cell| unsafe { (*cell.get()).count })
166+
.sum()
167+
}
168+
}

stress/src/user_events_enabled.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// To run the test, execute the following command in the stress directory as sudo:
2+
// sudo -E ~/.cargo/bin/cargo rnu --bin user-events-enabled --release
3+
4+
// TODO : Add stess result here.
5+
6+
mod throughput;
7+
use eventheader_dynamic::{Provider, ProviderOptions};
8+
use lazy_static::lazy_static;
9+
10+
// Global constants for level and keyword
11+
const LEVEL: u8 = 4; // Example level (Informational)
12+
const KEYWORD: u64 = 0x01; // Example keyword
13+
14+
lazy_static! {
15+
static ref PROVIDER: Provider = {
16+
// Initialize the Provider with dynamic options
17+
let mut options = ProviderOptions::new();
18+
options = *options.group_name("testprovider");
19+
let mut provider = Provider::new("testprovider", &options);
20+
21+
// Register events with specific levels and keywords
22+
provider.register_set(LEVEL.into(), KEYWORD);
23+
24+
provider
25+
};
26+
}
27+
28+
fn main() {
29+
// Execute the throughput test with the test_log function
30+
throughput::test_throughput(test_user_events_enabled);
31+
}
32+
33+
fn test_user_events_enabled() {
34+
// Find and check if the event is enabled
35+
if let Some(event_set) = PROVIDER.find_set(LEVEL.into(), KEYWORD) {
36+
let _ = event_set.enabled(); // Perform the enabled check
37+
}
38+
}

0 commit comments

Comments
 (0)