Skip to content

Commit b4fffe6

Browse files
frailltMindaugas Vinkelis
authored andcommitted
Metrics collect stress test
1 parent a5e2061 commit b4fffe6

File tree

2 files changed

+247
-1
lines changed

2 files changed

+247
-1
lines changed

stress/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ version = "0.1.0"
44
edition = "2021"
55
publish = false
66

7+
[[bin]]
8+
name = "metrics_collect"
9+
path = "src/metrics_collect.rs"
10+
doc = false
11+
712
[[bin]] # Bin to run the metrics stress tests for Counter
813
name = "metrics"
914
path = "src/metrics_counter.rs"
@@ -39,6 +44,7 @@ name = "random"
3944
path = "src/random.rs"
4045
doc = false
4146

47+
4248
[dependencies]
4349
ctrlc = "3.2.5"
4450
lazy_static = "1.4.0"
@@ -51,6 +57,7 @@ tracing = { workspace = true, features = ["std"]}
5157
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
5258
num-format = "0.4.4"
5359
sysinfo = { version = "0.30.12", optional = true }
60+
clap = { version = "4.5.20", features = ["derive"] }
5461

5562
[features]
56-
stats = ["sysinfo"]
63+
stats = ["sysinfo"]

stress/src/metrics_collect.rs

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
use std::{
2+
cell::RefCell,
3+
ops::DerefMut,
4+
sync::{
5+
atomic::{AtomicBool, AtomicUsize, Ordering},
6+
Arc, Barrier, Weak,
7+
},
8+
time::{Duration, Instant},
9+
};
10+
11+
use lazy_static::lazy_static;
12+
use opentelemetry::{
13+
metrics::{Histogram, MeterProvider, MetricResult},
14+
KeyValue,
15+
};
16+
use opentelemetry_sdk::{
17+
metrics::{
18+
data::{ResourceMetrics, Temporality},
19+
reader::MetricReader,
20+
InstrumentKind, ManualReader, Pipeline, SdkMeterProvider,
21+
},
22+
Resource,
23+
};
24+
25+
use rand::{
26+
rngs::{self, SmallRng},
27+
Rng, SeedableRng,
28+
};
29+
30+
use clap::{Parser, ValueEnum};
31+
32+
#[derive(Debug, Clone, Copy, ValueEnum)]
33+
enum CliTemporality {
34+
Cumulative,
35+
Delta,
36+
}
37+
38+
/// Simple program to greet a person
39+
#[derive(Parser, Debug)]
40+
#[command(
41+
version,
42+
about = "Measure metrics performance while collecting",
43+
long_about = "The purpose of this test is to see how collecing interferre with measurements.\n\
44+
Most of the test measure how fast is collecting phase, but more important is\n\
45+
that it doesn't \"stop-the-world\" while collection phase is running."
46+
)]
47+
struct Cli {
48+
/// Select collection phase temporality
49+
temporality: CliTemporality,
50+
}
51+
52+
lazy_static! {
53+
pub static ref ATTRIBUTE_VALUES: [&'static str; 10] = [
54+
"value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9",
55+
"value10"
56+
];
57+
}
58+
59+
thread_local! {
60+
61+
/// Store random number generator for each thread
62+
pub static CURRENT_RNG: RefCell<rngs::SmallRng> = RefCell::new(rngs::SmallRng::from_entropy());
63+
}
64+
65+
fn main() {
66+
let cli = Cli::parse();
67+
let temporality = match cli.temporality {
68+
CliTemporality::Cumulative => Temporality::Cumulative,
69+
CliTemporality::Delta => Temporality::Delta,
70+
};
71+
let reader = SharedReader::new(
72+
ManualReader::builder()
73+
.with_temporality(temporality)
74+
.build(),
75+
);
76+
let provider = SdkMeterProvider::builder()
77+
.with_reader(reader.clone())
78+
.build();
79+
// use histogram, as it is a bit more complicated during
80+
let histogram = provider.meter("test").u64_histogram("hello").build();
81+
82+
calculate_measurements_during_collection(histogram, reader).print_results();
83+
}
84+
85+
fn calculate_measurements_during_collection(
86+
histogram: Histogram<u64>,
87+
reader: SharedReader,
88+
) -> MeasurementResults {
89+
// we don't need to use every single CPU, better leave other CPU for operating system work,
90+
// so our running threads could be much more stable in performance.
91+
// just for the record, this is has HUGE effect on my machine (laptop intel i7-1355u)
92+
let num_threads = num_cpus::get() / 2;
93+
94+
let mut res = MeasurementResults {
95+
total_measurements_count: 0,
96+
total_time_collecting: 0,
97+
num_iterations: 0,
98+
};
99+
let start = Instant::now();
100+
while start.elapsed() < Duration::from_secs(3) {
101+
res.num_iterations += 1;
102+
let is_collecting = AtomicBool::new(false);
103+
let measurements_while_collecting = AtomicUsize::new(0);
104+
let time_while_collecting = AtomicUsize::new(0);
105+
let barrier = Barrier::new(num_threads + 1);
106+
std::thread::scope(|s| {
107+
// first create bunch of measurements,
108+
// so that collection phase wouldn't be "empty"
109+
let mut handles = Vec::new();
110+
for _t in 0..num_threads {
111+
handles.push(s.spawn(|| {
112+
for _i in 0..1000 {
113+
CURRENT_RNG.with(|rng| {
114+
histogram.record(
115+
1,
116+
&random_attribute_set3(
117+
rng.borrow_mut().deref_mut(),
118+
ATTRIBUTE_VALUES.as_ref(),
119+
),
120+
);
121+
});
122+
}
123+
}));
124+
}
125+
for handle in handles {
126+
handle.join().unwrap();
127+
}
128+
129+
// simultaneously start collecting and creating more measurements
130+
for _ in 0..num_threads - 1 {
131+
s.spawn(|| {
132+
barrier.wait();
133+
let now = Instant::now();
134+
let mut count = 0;
135+
while is_collecting.load(Ordering::Acquire) {
136+
CURRENT_RNG.with(|rng| {
137+
histogram.record(
138+
1,
139+
&random_attribute_set3(
140+
rng.borrow_mut().deref_mut(),
141+
ATTRIBUTE_VALUES.as_ref(),
142+
),
143+
);
144+
});
145+
count += 1;
146+
}
147+
measurements_while_collecting.fetch_add(count, Ordering::AcqRel);
148+
time_while_collecting
149+
.fetch_add(now.elapsed().as_micros() as usize, Ordering::AcqRel);
150+
});
151+
}
152+
153+
let collect_handle = s.spawn(|| {
154+
let mut rm = ResourceMetrics {
155+
resource: Resource::empty(),
156+
scope_metrics: Vec::new(),
157+
};
158+
is_collecting.store(true, Ordering::Release);
159+
barrier.wait();
160+
reader.collect(&mut rm).unwrap();
161+
is_collecting.store(false, Ordering::Release);
162+
});
163+
barrier.wait();
164+
collect_handle.join().unwrap();
165+
});
166+
res.total_measurements_count += measurements_while_collecting.load(Ordering::Acquire);
167+
res.total_time_collecting += time_while_collecting.load(Ordering::Acquire);
168+
}
169+
res
170+
}
171+
172+
struct MeasurementResults {
173+
total_measurements_count: usize,
174+
total_time_collecting: usize,
175+
num_iterations: usize,
176+
}
177+
178+
impl MeasurementResults {
179+
fn print_results(&self) {
180+
println!(
181+
"{:>10.2} measurements/ms",
182+
self.total_measurements_count as f32 / (self.total_time_collecting as f32 / 1000.0f32)
183+
);
184+
println!(
185+
"{:>10.2} measurements/it",
186+
self.total_measurements_count as f32 / self.num_iterations as f32,
187+
);
188+
println!(
189+
"{:>10.2} μs/it",
190+
self.total_time_collecting as f32 / self.num_iterations as f32,
191+
);
192+
}
193+
}
194+
195+
fn random_attribute_set3(rng: &mut SmallRng, values: &[&'static str]) -> [KeyValue; 3] {
196+
let len = values.len();
197+
unsafe {
198+
[
199+
KeyValue::new("attribute1", *values.get_unchecked(rng.gen_range(0..len))),
200+
KeyValue::new("attribute2", *values.get_unchecked(rng.gen_range(0..len))),
201+
KeyValue::new("attribute3", *values.get_unchecked(rng.gen_range(0..len))),
202+
]
203+
}
204+
}
205+
206+
// copy/paste from opentelemetry-sdk/benches/metric.rs
207+
#[derive(Clone, Debug)]
208+
pub struct SharedReader(Arc<dyn MetricReader>);
209+
210+
impl SharedReader {
211+
pub fn new<R>(reader: R) -> Self
212+
where
213+
R: MetricReader,
214+
{
215+
Self(Arc::new(reader))
216+
}
217+
}
218+
219+
impl MetricReader for SharedReader {
220+
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
221+
self.0.register_pipeline(pipeline)
222+
}
223+
224+
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
225+
self.0.collect(rm)
226+
}
227+
228+
fn force_flush(&self) -> MetricResult<()> {
229+
self.0.force_flush()
230+
}
231+
232+
fn shutdown(&self) -> MetricResult<()> {
233+
self.0.shutdown()
234+
}
235+
236+
fn temporality(&self, kind: InstrumentKind) -> Temporality {
237+
self.0.temporality(kind)
238+
}
239+
}

0 commit comments

Comments
 (0)