Skip to content

Commit aa55bb8

Browse files
committed
Measure metrics latency
1 parent 465fcc2 commit aa55bb8

File tree

2 files changed

+305
-0
lines changed

2 files changed

+305
-0
lines changed

stress/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ name = "metrics_histogram"
1919
path = "src/metrics_histogram.rs"
2020
doc = false
2121

22+
[[bin]] # Bin to run measure latency in various conditions
23+
name = "metrics_latency"
24+
path = "src/metrics_latency.rs"
25+
doc = false
26+
2227
[[bin]] # Bin to run the metrics overflow stress tests
2328
name = "metrics_overflow"
2429
path = "src/metrics_overflow.rs"

stress/src/metrics_latency.rs

Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
use std::{
2+
collections::{BTreeMap, HashMap},
3+
sync::{
4+
atomic::{AtomicBool, AtomicU64, Ordering},
5+
Arc, Mutex, Weak,
6+
},
7+
time::{Duration, Instant},
8+
};
9+
10+
use opentelemetry::{metrics::MeterProvider, KeyValue};
11+
use opentelemetry_sdk::{
12+
metrics::{
13+
data::{self, ResourceMetrics},
14+
reader::MetricReader,
15+
InstrumentKind, ManualReader, MetricError, Pipeline, SdkMeterProvider, Temporality,
16+
},
17+
Resource,
18+
};
19+
20+
// copy/paste from opentelemetry-sdk/benches/metric.rs
21+
#[derive(Clone, Debug)]
22+
pub struct SharedReader(Arc<dyn MetricReader>);
23+
24+
impl SharedReader {
25+
pub fn new<R>(reader: R) -> Self
26+
where
27+
R: MetricReader,
28+
{
29+
Self(Arc::new(reader))
30+
}
31+
}
32+
33+
impl MetricReader for SharedReader {
34+
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
35+
self.0.register_pipeline(pipeline)
36+
}
37+
38+
fn collect(&self, rm: &mut ResourceMetrics) -> Result<(), MetricError> {
39+
self.0.collect(rm)
40+
}
41+
42+
fn force_flush(&self) -> Result<(), MetricError> {
43+
self.0.force_flush()
44+
}
45+
46+
fn shutdown(&self) -> Result<(), MetricError> {
47+
self.0.shutdown()
48+
}
49+
50+
fn temporality(&self, kind: InstrumentKind) -> Temporality {
51+
self.0.temporality(kind)
52+
}
53+
}
54+
55+
fn main() {
56+
let available_threads: usize = std::thread::available_parallelism().map_or(1, |p| p.get());
57+
58+
for threads_count in [available_threads / 3, available_threads * 3] {
59+
println!("*** updates, using {threads_count} threads ***");
60+
measure_update_latency(&format!("no attribs"), threads_count, |_i, _j| []);
61+
measure_update_latency(&format!("1 attrib"), threads_count, |_i, _j| {
62+
[KeyValue::new("some_key", 1)]
63+
});
64+
65+
measure_update_latency(&format!("9 attribs"), threads_count, |_i, _j| {
66+
// for http.server.request.duration as defined in https://opentelemetry.io/docs/specs/semconv/http/http-metrics/
67+
[
68+
KeyValue::new("http.request.method", "GET"),
69+
KeyValue::new("url.scheme", "not_found"),
70+
KeyValue::new("error.type", 404),
71+
KeyValue::new("http.response.status_code", 404),
72+
KeyValue::new("http.route", "testing/metrics/latency"),
73+
KeyValue::new("network.protocol.name", "http"),
74+
KeyValue::new("network.protocol.version", 2),
75+
KeyValue::new("server.address", "example.com"),
76+
KeyValue::new("server.port", 8080),
77+
]
78+
});
79+
println!("*** inserts, using {threads_count} threads ***");
80+
measure_update_latency(&format!("1 attrib"), threads_count, |i, j| {
81+
[KeyValue::new(format!("some_key{i}"), j as i64)]
82+
});
83+
84+
measure_update_latency(&format!("10 attribs"), threads_count, |i, j| {
85+
[
86+
KeyValue::new(format!("random{i}"), j as i64),
87+
KeyValue::new("http.request.method", "GET"),
88+
KeyValue::new("url.scheme", "not_found"),
89+
KeyValue::new("error.type", 404),
90+
KeyValue::new("http.response.status_code", 404),
91+
KeyValue::new("http.route", "testing/metrics/latency"),
92+
KeyValue::new("network.protocol.name", "http"),
93+
KeyValue::new("network.protocol.version", 2),
94+
KeyValue::new("server.address", "example.com"),
95+
KeyValue::new("server.port", 8080),
96+
]
97+
});
98+
println!("*** mix mostly updates (200 attribute-sets), using {threads_count} threads ***");
99+
measure_update_latency(&format!("10 attribs"), threads_count, |_i, j| {
100+
[
101+
KeyValue::new("random", (j % 200) as i64),
102+
KeyValue::new("http.request.method", "GET"),
103+
KeyValue::new("url.scheme", "not_found"),
104+
KeyValue::new("error.type", 404),
105+
KeyValue::new("http.response.status_code", 404),
106+
KeyValue::new("http.route", "testing/metrics/latency"),
107+
KeyValue::new("network.protocol.name", "http"),
108+
KeyValue::new("network.protocol.version", 2),
109+
KeyValue::new("server.address", "example.com"),
110+
KeyValue::new("server.port", 8080),
111+
]
112+
});
113+
}
114+
}
115+
116+
fn measure_update_latency<const N: usize>(
117+
msg: &str,
118+
threads_count: usize,
119+
attribs: fn(usize, u64) -> [KeyValue; N],
120+
) {
121+
let reader = SharedReader::new(
122+
ManualReader::builder()
123+
.with_temporality(Temporality::Delta)
124+
.build(),
125+
);
126+
let provider = SdkMeterProvider::builder()
127+
.with_reader(reader.clone())
128+
.build();
129+
let histogram = provider.meter("test").u64_counter("hello").build();
130+
let mut threads = Vec::new();
131+
let mut stats = Vec::new();
132+
stats.resize_with(threads_count, || {
133+
Arc::new(Mutex::new(HashMap::<u64, u64>::new()))
134+
});
135+
let total_iterations = Arc::new(AtomicU64::new(0));
136+
let iterate_flag = Arc::new(AtomicBool::new(true));
137+
let start = Instant::now();
138+
// run multiple threads and measure how time it takes to update metric
139+
for thread_idx in 0..threads_count {
140+
let hist = histogram.clone();
141+
let stat = stats[thread_idx].clone();
142+
let iterate_flag = iterate_flag.clone();
143+
let total_iterations = total_iterations.clone();
144+
threads.push(std::thread::spawn(move || {
145+
let mut stat = stat.lock().unwrap();
146+
let mut iter_idx = 0;
147+
while iterate_flag.load(Ordering::Acquire) {
148+
let kv = attribs(thread_idx, iter_idx);
149+
let start = Instant::now();
150+
hist.add(1, &kv);
151+
let curr = stat.entry(start.elapsed().as_nanos() as u64).or_default();
152+
*curr += 1;
153+
iter_idx += 1;
154+
}
155+
total_iterations.fetch_add(iter_idx, Ordering::AcqRel);
156+
}));
157+
}
158+
let mut total_count = 0;
159+
while start.elapsed() < Duration::from_secs(1) {
160+
// we should collect frequently enough, so that inserts doesn't reach overflow (2000)
161+
// but not too frequently, so that it will be visible in p99 (have effect on +1% of measurements)
162+
// with 0.3ms sleep, collect will be called around 1900-2500 times (depending on load)
163+
// so we might get around ~2M/s inserts, until they start overflow
164+
// and it's low enough so it shouldn't influence 1% of updates (p99).
165+
std::thread::sleep(Duration::from_micros(300));
166+
total_count += collect_and_return_count(&reader);
167+
}
168+
iterate_flag.store(false, Ordering::Release);
169+
threads.into_iter().for_each(|t| {
170+
t.join().unwrap();
171+
});
172+
total_count += collect_and_return_count(&reader);
173+
174+
let total_measurements = total_iterations.load(Ordering::Acquire);
175+
assert_eq!(total_count, total_measurements);
176+
177+
let stats = stats
178+
.into_iter()
179+
.map(|s| Arc::into_inner(s).unwrap().into_inner().unwrap())
180+
.flat_map(|s| s.into_iter())
181+
.fold(BTreeMap::<u64, u64>::default(), |mut acc, (time, count)| {
182+
*acc.entry(time).or_default() += count;
183+
acc
184+
});
185+
186+
let sum = stats.iter().fold(0, |mut acc, (&time, &count)| {
187+
acc += time * count;
188+
acc
189+
});
190+
191+
println!("{msg}");
192+
println!("\titer {}", format_count(total_measurements));
193+
println!("\tavg {}", format_time(sum / total_measurements as u64));
194+
println!(
195+
"\tp50 {}",
196+
format_time(get_percentile_value(total_measurements, &stats, 50))
197+
);
198+
println!(
199+
"\tp95 {}",
200+
format_time(get_percentile_value(total_measurements, &stats, 95))
201+
);
202+
println!(
203+
"\tp99 {}",
204+
format_time(get_percentile_value(total_measurements, &stats, 99))
205+
);
206+
}
207+
208+
fn collect_and_return_count(reader: &SharedReader) -> u64 {
209+
let mut rm = ResourceMetrics {
210+
resource: Resource::empty(),
211+
scope_metrics: Vec::new(),
212+
};
213+
reader.collect(&mut rm).unwrap();
214+
rm.scope_metrics
215+
.into_iter()
216+
.flat_map(|sm| sm.metrics.into_iter())
217+
.flat_map(|m| {
218+
m.data
219+
.as_any()
220+
.downcast_ref::<data::Sum<u64>>()
221+
.unwrap()
222+
.data_points
223+
.clone()
224+
.into_iter()
225+
})
226+
.map(|dp| dp.value)
227+
.sum()
228+
}
229+
230+
fn get_percentile_value(
231+
total_measurements: u64,
232+
stats: &BTreeMap<u64, u64>,
233+
percentile: u64,
234+
) -> u64 {
235+
assert!(percentile > 0 && percentile < 100);
236+
let break_point = ((total_measurements as f64 * percentile as f64) / 100.0) as u64;
237+
let mut iter = stats.iter().peekable();
238+
let mut sum = 0;
239+
while let Some(left) = iter.next() {
240+
sum += left.1;
241+
if let Some(&right) = iter.peek() {
242+
let next_sum = sum + right.1;
243+
if next_sum > break_point {
244+
// interpolate
245+
let diff = (next_sum - sum) as f32;
246+
let ratio = (break_point - sum) as f32 / diff;
247+
let time_diff = (right.0 - left.0) as f32;
248+
return *left.0 + (time_diff * ratio) as u64;
249+
}
250+
}
251+
}
252+
0
253+
}
254+
255+
fn format_count(count: u64) -> String {
256+
let count = count as f64;
257+
let (val, symbol) = if count > 1000000.0 {
258+
(count / 1000000.0, "M")
259+
} else if count > 1000.0 {
260+
(count / 1000.0, "K")
261+
} else {
262+
(count, "")
263+
};
264+
if val > 100.0 {
265+
format!("{val:>5.1}{symbol}")
266+
} else if val > 10.0 {
267+
format!("{val:>5.2}{symbol}")
268+
} else {
269+
format!("{val:>5.3}{symbol}")
270+
}
271+
}
272+
273+
fn format_time(nanos: u64) -> String {
274+
let nanos = nanos as f64;
275+
let (val, symbol) = if nanos > 1000000.0 {
276+
(nanos / 1000000.0, "ms")
277+
} else if nanos > 1000.0 {
278+
(nanos / 1000.0, "μs")
279+
} else {
280+
(nanos, "ns")
281+
};
282+
if val > 100.0 {
283+
format!("{val:>5.1}{symbol}")
284+
} else if val > 10.0 {
285+
format!("{val:>5.2}{symbol}")
286+
} else {
287+
format!("{val:>5.3}{symbol}")
288+
}
289+
}
290+
291+
#[test]
292+
fn test_format_time() {
293+
assert_eq!("12.00ns", format_time(12));
294+
assert_eq!("123.0ns", format_time(123));
295+
assert_eq!("1.234μs", format_time(1234));
296+
assert_eq!("12.35μs", format_time(12349));
297+
assert_eq!("123.4μs", format_time(123400));
298+
assert_eq!("1.235ms", format_time(1234900));
299+
assert_eq!("12.34ms", format_time(12340000));
300+
}

0 commit comments

Comments
 (0)