Skip to content

Commit e0df038

Browse files
committed
Measure metrics latency
1 parent 465fcc2 commit e0df038

File tree

2 files changed

+234
-0
lines changed

2 files changed

+234
-0
lines changed

stress/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ name = "metrics_histogram"
1919
path = "src/metrics_histogram.rs"
2020
doc = false
2121

22+
[[bin]] # Bin to run measure latency in various conditions
23+
name = "metrics_latency"
24+
path = "src/metrics_latency.rs"
25+
doc = false
26+
2227
[[bin]] # Bin to run the metrics overflow stress tests
2328
name = "metrics_overflow"
2429
path = "src/metrics_overflow.rs"

stress/src/metrics_latency.rs

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
use std::{
2+
collections::{BTreeMap, HashMap},
3+
sync::{Arc, Mutex, Weak},
4+
time::{Duration, Instant},
5+
};
6+
7+
use opentelemetry::{metrics::MeterProvider, KeyValue};
8+
use opentelemetry_sdk::{
9+
metrics::{
10+
data::{self, ResourceMetrics},
11+
reader::MetricReader,
12+
InstrumentKind, ManualReader, MetricError, Pipeline, SdkMeterProvider, Temporality,
13+
},
14+
Resource,
15+
};
16+
17+
// copy/paste from opentelemetry-sdk/benches/metric.rs
18+
#[derive(Clone, Debug)]
19+
pub struct SharedReader(Arc<dyn MetricReader>);
20+
21+
impl SharedReader {
22+
pub fn new<R>(reader: R) -> Self
23+
where
24+
R: MetricReader,
25+
{
26+
Self(Arc::new(reader))
27+
}
28+
}
29+
30+
impl MetricReader for SharedReader {
31+
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
32+
self.0.register_pipeline(pipeline)
33+
}
34+
35+
fn collect(&self, rm: &mut ResourceMetrics) -> Result<(), MetricError> {
36+
self.0.collect(rm)
37+
}
38+
39+
fn force_flush(&self) -> Result<(), MetricError> {
40+
self.0.force_flush()
41+
}
42+
43+
fn shutdown(&self) -> Result<(), MetricError> {
44+
self.0.shutdown()
45+
}
46+
47+
fn temporality(&self, kind: InstrumentKind) -> Temporality {
48+
self.0.temporality(kind)
49+
}
50+
}
51+
52+
const ITERATIONS_PER_THREAD: usize = 10000;
53+
54+
fn main() {
55+
let available_threads: usize = std::thread::available_parallelism().map_or(1, |p| p.get());
56+
57+
measure_update_latency(
58+
&format!("half threads = {available_threads}/2, lots of inserts"),
59+
available_threads / 2,
60+
|_i, j| format!("{}", j % 2000),
61+
);
62+
measure_update_latency(
63+
&format!("half threads = {available_threads}/2, updates only"),
64+
available_threads / 2,
65+
|_i, _j| String::new(),
66+
);
67+
measure_update_latency(
68+
&format!("twice threads = {available_threads}*2, lots of inserts"),
69+
available_threads * 2,
70+
|_i, j| format!("{}", j % 2000),
71+
);
72+
measure_update_latency(
73+
&format!("twice threads = {available_threads}*2, updates only"),
74+
available_threads * 2,
75+
|_i, _j| String::new(),
76+
);
77+
}
78+
79+
fn measure_update_latency(msg: &str, threads_count: usize, keyname: fn(usize, usize) -> String) {
80+
let reader = SharedReader::new(
81+
ManualReader::builder()
82+
.with_temporality(Temporality::Delta)
83+
.build(),
84+
);
85+
let provider = SdkMeterProvider::builder()
86+
.with_reader(reader.clone())
87+
.build();
88+
let histogram = provider.meter("test").u64_counter("hello").build();
89+
let mut threads = Vec::new();
90+
let mut stats = Vec::new();
91+
stats.resize_with(threads_count, || {
92+
Arc::new(Mutex::new(HashMap::<u64, u64>::new()))
93+
});
94+
// run multiple threads and measure how time it takes to update metric
95+
for thread_idx in 0..threads_count {
96+
let hist = histogram.clone();
97+
let stat = stats[thread_idx].clone();
98+
threads.push(std::thread::spawn(move || {
99+
let mut stat = stat.lock().unwrap();
100+
for iter_idx in 0..ITERATIONS_PER_THREAD {
101+
let kv = KeyValue::new(keyname(thread_idx, iter_idx), 1);
102+
let start = Instant::now();
103+
hist.add(1, &[kv]);
104+
let curr = stat.entry(start.elapsed().as_nanos() as u64).or_default();
105+
*curr += 1;
106+
}
107+
}));
108+
}
109+
// collect threads in short intervals
110+
let mut total_count = 0;
111+
while threads.iter().any(|t| !t.is_finished()) {
112+
// collect agressively so we could measure inserts properly,
113+
// but not loop, as we'll see no difference in updates vs inserts due to super high contention
114+
std::thread::sleep(Duration::from_micros(500));
115+
total_count += collect_and_return_count(&reader);
116+
}
117+
threads.into_iter().for_each(|t| {
118+
t.join().unwrap();
119+
});
120+
total_count += collect_and_return_count(&reader);
121+
122+
let total_measurements = (threads_count * ITERATIONS_PER_THREAD) as u64;
123+
assert_eq!(total_count, total_measurements);
124+
125+
let stats = stats
126+
.into_iter()
127+
.map(|s| Arc::into_inner(s).unwrap().into_inner().unwrap())
128+
.flat_map(|s| s.into_iter())
129+
.fold(BTreeMap::<u64, u64>::default(), |mut acc, (time, count)| {
130+
*acc.entry(time).or_default() += count;
131+
acc
132+
});
133+
134+
let sum = stats.iter().fold(0, |mut acc, (&time, &count)| {
135+
acc += time * count;
136+
acc
137+
});
138+
139+
println!("{msg}");
140+
println!("avg {}", format_time(sum / total_measurements as u64));
141+
println!(
142+
"p50 {}",
143+
format_time(get_percentile_value(total_measurements, &stats, 50))
144+
);
145+
println!(
146+
"p95 {}",
147+
format_time(get_percentile_value(total_measurements, &stats, 95))
148+
);
149+
println!(
150+
"p99 {}",
151+
format_time(get_percentile_value(total_measurements, &stats, 99))
152+
);
153+
}
154+
155+
fn collect_and_return_count(reader: &SharedReader) -> u64 {
156+
let mut rm = ResourceMetrics {
157+
resource: Resource::empty(),
158+
scope_metrics: Vec::new(),
159+
};
160+
reader.collect(&mut rm).unwrap();
161+
rm.scope_metrics
162+
.into_iter()
163+
.flat_map(|sm| sm.metrics.into_iter())
164+
.flat_map(|m| {
165+
m.data
166+
.as_any()
167+
.downcast_ref::<data::Sum<u64>>()
168+
.unwrap()
169+
.data_points
170+
.clone()
171+
.into_iter()
172+
})
173+
.map(|dp| dp.value)
174+
.sum()
175+
}
176+
177+
fn get_percentile_value(
178+
total_measurements: u64,
179+
stats: &BTreeMap<u64, u64>,
180+
percentile: u64,
181+
) -> u64 {
182+
assert!(percentile > 0 && percentile < 100);
183+
let break_point = ((total_measurements as f64 * percentile as f64) / 100.0) as u64;
184+
let mut iter = stats.iter().peekable();
185+
let mut sum = 0;
186+
while let Some(left) = iter.next() {
187+
sum += left.1;
188+
if let Some(&right) = iter.peek() {
189+
let next_sum = sum + right.1;
190+
if next_sum > break_point {
191+
// interpolate
192+
let diff = (next_sum - sum) as f32;
193+
let ratio = (break_point - sum) as f32 / diff;
194+
let time_diff = (right.0 - left.0) as f32;
195+
return *left.0 + (time_diff * ratio) as u64;
196+
}
197+
}
198+
}
199+
0
200+
}
201+
202+
fn format_time(nanos: u64) -> String {
203+
let nanos = nanos as f64;
204+
let (val, symbol) = if nanos > 1000000.0 {
205+
(nanos / 1000000.0, "ms")
206+
} else if nanos > 1000.0 {
207+
(nanos / 1000.0, "μs")
208+
} else {
209+
(nanos, "ns")
210+
};
211+
if val > 100.0 {
212+
format!("{val:>5.1}{symbol}")
213+
} else if val > 10.0 {
214+
format!("{val:>5.2}{symbol}")
215+
} else {
216+
format!("{val:>5.3}{symbol}")
217+
}
218+
}
219+
220+
#[test]
221+
fn format_number() {
222+
assert_eq!("12.00ns", format_time(12));
223+
assert_eq!("123.0ns", format_time(123));
224+
assert_eq!("1.234μs", format_time(1234));
225+
assert_eq!("12.35μs", format_time(12349));
226+
assert_eq!("123.4μs", format_time(123400));
227+
assert_eq!("1.235ms", format_time(1234900));
228+
assert_eq!("12.34ms", format_time(12340000));
229+
}

0 commit comments

Comments
 (0)