Skip to content

Commit 7ae7647

Browse files
committed
Adaptive collect interval
1 parent 7058272 commit 7ae7647

File tree

2 files changed

+425
-25
lines changed

2 files changed

+425
-25
lines changed

diffas

Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
diff --git a/stress/Cargo.toml b/stress/Cargo.toml
2+
index 90bb6e28..bb414a02 100644
3+
--- a/stress/Cargo.toml
4+
+++ b/stress/Cargo.toml
5+
@@ -19,6 +19,11 @@ name = "metrics_histogram"
6+
path = "src/metrics_histogram.rs"
7+
doc = false
8+
9+
+[[bin]] # Bin to run measure latency in various conditions
10+
+name = "metrics_latency"
11+
+path = "src/metrics_latency.rs"
12+
+doc = false
13+
+
14+
[[bin]] # Bin to run the metrics overflow stress tests
15+
name = "metrics_overflow"
16+
path = "src/metrics_overflow.rs"
17+
diff --git a/stress/src/metrics_latency.rs b/stress/src/metrics_latency.rs
18+
new file mode 100644
19+
index 00000000..82e683b6
20+
--- /dev/null
21+
+++ b/stress/src/metrics_latency.rs
22+
@@ -0,0 +1,339 @@
23+
+use std::{
24+
+ cell::RefCell,
25+
+ collections::{BTreeMap, HashMap},
26+
+ sync::{
27+
+ atomic::{AtomicBool, AtomicU64, Ordering},
28+
+ Arc, Mutex, Weak,
29+
+ },
30+
+ thread::{sleep, JoinHandle},
31+
+ time::{Duration, Instant},
32+
+ u64,
33+
+};
34+
+
35+
+use opentelemetry::{metrics::MeterProvider, KeyValue};
36+
+use opentelemetry_sdk::{
37+
+ metrics::{
38+
+ data::{self, ResourceMetrics},
39+
+ reader::MetricReader,
40+
+ InstrumentKind, ManualReader, MetricError, Pipeline, SdkMeterProvider, Temporality,
41+
+ },
42+
+ Resource,
43+
+};
44+
+use rand::{rngs, Rng, SeedableRng};
45+
+
46+
+thread_local! {
47+
+ /// Store random number generator for each thread
48+
+ static CURRENT_RNG: RefCell<rngs::SmallRng> = RefCell::new(rngs::SmallRng::from_entropy());
49+
+}
50+
+
51+
+// copy/paste from opentelemetry-sdk/benches/metric.rs
52+
+#[derive(Clone, Debug)]
53+
+pub struct SharedReader(Arc<dyn MetricReader>);
54+
+
55+
+impl SharedReader {
56+
+ pub fn new<R>(reader: R) -> Self
57+
+ where
58+
+ R: MetricReader,
59+
+ {
60+
+ Self(Arc::new(reader))
61+
+ }
62+
+}
63+
+
64+
+impl MetricReader for SharedReader {
65+
+ fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
66+
+ self.0.register_pipeline(pipeline)
67+
+ }
68+
+
69+
+ fn collect(&self, rm: &mut ResourceMetrics) -> Result<(), MetricError> {
70+
+ self.0.collect(rm)
71+
+ }
72+
+
73+
+ fn force_flush(&self) -> Result<(), MetricError> {
74+
+ self.0.force_flush()
75+
+ }
76+
+
77+
+ fn shutdown(&self) -> Result<(), MetricError> {
78+
+ self.0.shutdown()
79+
+ }
80+
+
81+
+ fn temporality(&self, kind: InstrumentKind) -> Temporality {
82+
+ self.0.temporality(kind)
83+
+ }
84+
+}
85+
+
86+
+fn main() {
87+
+ let available_threads: usize = std::thread::available_parallelism().map_or(1, |p| p.get());
88+
+
89+
+ for threads_count in [available_threads / 4, available_threads * 4] {
90+
+ println!("*** updates, using {threads_count} threads ***");
91+
+ measure_update_latency(&format!("no attribs"), threads_count, 10000000, |_i, _j| []);
92+
+ measure_update_latency(&format!("1 attrib"), threads_count, 10000000, |_i, _j| {
93+
+ [KeyValue::new("some_key", 1)]
94+
+ });
95+
+
96+
+ measure_update_latency(&format!("9 attribs"), threads_count, 10000000, |_i, _j| {
97+
+ // for http.server.request.duration as defined in https://opentelemetry.io/docs/specs/semconv/http/http-metrics/
98+
+ [
99+
+ KeyValue::new("http.request.method", "GET"),
100+
+ KeyValue::new("url.scheme", "not_found"),
101+
+ KeyValue::new("error.type", 404),
102+
+ KeyValue::new("http.response.status_code", 404),
103+
+ KeyValue::new("http.route", "testing/metrics/latency"),
104+
+ KeyValue::new("network.protocol.name", "http"),
105+
+ KeyValue::new("network.protocol.version", 2),
106+
+ KeyValue::new("server.address", "example.com"),
107+
+ KeyValue::new("server.port", 8080),
108+
+ ]
109+
+ });
110+
+ println!("*** inserts, using {threads_count} threads ***");
111+
+ measure_update_latency(&format!("1 attrib"), threads_count, 1500, |i, j| {
112+
+ [KeyValue::new(format!("some_key{i}"), j as i64)]
113+
+ });
114+
+
115+
+ measure_update_latency(&format!("10 attribs"), threads_count, 1500, |i, j| {
116+
+ [
117+
+ KeyValue::new(format!("random{i}"), j as i64),
118+
+ KeyValue::new("http.request.method", "GET"),
119+
+ KeyValue::new("url.scheme", "not_found"),
120+
+ KeyValue::new("error.type", 404),
121+
+ KeyValue::new("http.response.status_code", 404),
122+
+ KeyValue::new("http.route", "testing/metrics/latency"),
123+
+ KeyValue::new("network.protocol.name", "http"),
124+
+ KeyValue::new("network.protocol.version", 2),
125+
+ KeyValue::new("server.address", "example.com"),
126+
+ KeyValue::new("server.port", 8080),
127+
+ ]
128+
+ });
129+
+ println!("*** mix mostly updates (~10% inserts), using {threads_count} threads ***");
130+
+ measure_update_latency(&format!("10 attribs"), threads_count, 10000, |_i, j| {
131+
+ let randomness: i64 = 20
132+
+ - CURRENT_RNG.with(|rng| {
133+
+ let mut rng = rng.borrow_mut();
134+
+ rng.gen_range(0..20)
135+
+ });
136+
+ [
137+
+ KeyValue::new("random", (j / 10) as i64 + randomness),
138+
+ KeyValue::new("http.request.method", "GET"),
139+
+ KeyValue::new("url.scheme", "not_found"),
140+
+ KeyValue::new("error.type", 404),
141+
+ KeyValue::new("http.response.status_code", 404),
142+
+ KeyValue::new("http.route", "testing/metrics/latency"),
143+
+ KeyValue::new("network.protocol.name", "http"),
144+
+ KeyValue::new("network.protocol.version", 2),
145+
+ KeyValue::new("server.address", "example.com"),
146+
+ KeyValue::new("server.port", 8080),
147+
+ ]
148+
+ });
149+
+ }
150+
+}
151+
+
152+
+fn measure_update_latency<const N: usize>(
153+
+ msg: &str,
154+
+ threads_count: usize,
155+
+ collect_around: u64,
156+
+ attribs: fn(usize, u64) -> [KeyValue; N],
157+
+) {
158+
+ let reader = SharedReader::new(
159+
+ ManualReader::builder()
160+
+ .with_temporality(Temporality::Delta)
161+
+ .build(),
162+
+ );
163+
+ let provider = SdkMeterProvider::builder()
164+
+ .with_reader(reader.clone())
165+
+ .build();
166+
+ let histogram = provider.meter("test").u64_counter("hello").build();
167+
+ let mut threads = Vec::new();
168+
+ let mut stats = Vec::new();
169+
+ stats.resize_with(threads_count, || {
170+
+ Arc::new(Mutex::new(HashMap::<u64, u64>::new()))
171+
+ });
172+
+ let total_iterations = Arc::new(AtomicU64::new(0));
173+
+ let iterate_flag = Arc::new(AtomicBool::new(true));
174+
+ // run multiple threads and measure how time it takes to update metric
175+
+ for thread_idx in 0..threads_count {
176+
+ let hist = histogram.clone();
177+
+ let stat = stats[thread_idx].clone();
178+
+ let iterate_flag = iterate_flag.clone();
179+
+ let total_iterations = total_iterations.clone();
180+
+ threads.push(std::thread::spawn(move || {
181+
+ let mut stat = stat.lock().unwrap();
182+
+ let mut iter_idx = 0;
183+
+ while iterate_flag.load(Ordering::Acquire) {
184+
+ let kv = attribs(thread_idx, iter_idx);
185+
+ let start = Instant::now();
186+
+ hist.add(1, &kv);
187+
+ let curr = stat.entry(start.elapsed().as_nanos() as u64).or_default();
188+
+ *curr += 1;
189+
+ iter_idx += 1;
190+
+ }
191+
+ total_iterations.fetch_add(iter_idx, Ordering::AcqRel);
192+
+ }));
193+
+ }
194+
+
195+
+ let total_measurements = collect_measurements(reader, threads, iterate_flag, collect_around);
196+
+
197+
+ assert_eq!(total_measurements, total_iterations.load(Ordering::Acquire));
198+
+
199+
+ print_stats(msg, stats, total_measurements);
200+
+}
201+
+
202+
+fn collect_measurements(
203+
+ reader: SharedReader,
204+
+ threads: Vec<JoinHandle<()>>,
205+
+ iterate_flag: Arc<AtomicBool>,
206+
+ collect_around: u64,
207+
+) -> u64 {
208+
+ let start = Instant::now();
209+
+ let mut total_count = 0;
210+
+ let mut wait_for_next_collect = Duration::from_micros(500);
211+
+ while start.elapsed() < Duration::from_secs(1) {
212+
+ sleep(wait_for_next_collect);
213+
+ let collected = collect_and_return_count(&reader);
214+
+ // calculate wait interval so that the next collect cycle would be close to `collect_around`
215+
+ let ratio = collected as f64 / collect_around as f64;
216+
+ let clamped = if ratio > 2.0 {
217+
+ 2.0
218+
+ } else if ratio < 0.5 {
219+
+ 0.5
220+
+ } else {
221+
+ ratio
222+
+ };
223+
+ wait_for_next_collect =
224+
+ Duration::from_micros((wait_for_next_collect.as_micros() as f64 / clamped) as u64);
225+
+ total_count += collected;
226+
+ }
227+
+ iterate_flag.store(false, Ordering::Release);
228+
+ threads.into_iter().for_each(|t| {
229+
+ t.join().unwrap();
230+
+ });
231+
+ total_count += collect_and_return_count(&reader);
232+
+ total_count
233+
+}
234+
+
235+
+fn print_stats(msg: &str, stats: Vec<Arc<Mutex<HashMap<u64, u64>>>>, total_measurements: u64) {
236+
+ let stats = stats
237+
+ .into_iter()
238+
+ .map(|s| Arc::into_inner(s).unwrap().into_inner().unwrap())
239+
+ .flat_map(|s| s.into_iter())
240+
+ .fold(BTreeMap::<u64, u64>::default(), |mut acc, (time, count)| {
241+
+ *acc.entry(time).or_default() += count;
242+
+ acc
243+
+ });
244+
+
245+
+ let sum = stats.iter().fold(0, |mut acc, (&time, &count)| {
246+
+ acc += time * count;
247+
+ acc
248+
+ });
249+
+
250+
+ println!("{msg}");
251+
+ println!("\titer {}", format_count(total_measurements));
252+
+ println!(
253+
+ "\tp50 {}",
254+
+ format_time(get_percentile_value(total_measurements, &stats, 50))
255+
+ );
256+
+ println!(
257+
+ "\tp95 {}",
258+
+ format_time(get_percentile_value(total_measurements, &stats, 95))
259+
+ );
260+
+ println!(
261+
+ "\tp99 {}",
262+
+ format_time(get_percentile_value(total_measurements, &stats, 99))
263+
+ );
264+
+ println!("\tavg {}", format_time(sum / total_measurements as u64));
265+
+ println!("\tbest {}", format_time(*stats.iter().next().unwrap().0));
266+
+ println!("\tworst {}", format_time(*stats.iter().last().unwrap().0));
267+
+}
268+
+
269+
+fn collect_and_return_count(reader: &SharedReader) -> u64 {
270+
+ let mut rm = ResourceMetrics {
271+
+ resource: Resource::empty(),
272+
+ scope_metrics: Vec::new(),
273+
+ };
274+
+ reader.collect(&mut rm).unwrap();
275+
+ rm.scope_metrics
276+
+ .into_iter()
277+
+ .flat_map(|sm| sm.metrics.into_iter())
278+
+ .flat_map(|m| {
279+
+ m.data
280+
+ .as_any()
281+
+ .downcast_ref::<data::Sum<u64>>()
282+
+ .unwrap()
283+
+ .data_points
284+
+ .clone()
285+
+ .into_iter()
286+
+ })
287+
+ .map(|dp| dp.value)
288+
+ .sum()
289+
+}
290+
+
291+
+fn get_percentile_value(
292+
+ total_measurements: u64,
293+
+ stats: &BTreeMap<u64, u64>,
294+
+ percentile: u64,
295+
+) -> u64 {
296+
+ assert!(percentile > 0 && percentile < 100);
297+
+ let break_point = ((total_measurements as f64 * percentile as f64) / 100.0) as u64;
298+
+ let mut iter = stats.iter().peekable();
299+
+ let mut sum = 0;
300+
+ while let Some(left) = iter.next() {
301+
+ sum += left.1;
302+
+ if let Some(&right) = iter.peek() {
303+
+ let next_sum = sum + right.1;
304+
+ if next_sum > break_point {
305+
+ // interpolate
306+
+ let diff = (next_sum - sum) as f32;
307+
+ let ratio = (break_point - sum) as f32 / diff;
308+
+ let time_diff = (right.0 - left.0) as f32;
309+
+ return *left.0 + (time_diff * ratio) as u64;
310+
+ }
311+
+ }
312+
+ }
313+
+ 0
314+
+}
315+
+
316+
+fn format_count(count: u64) -> String {
317+
+ let count = count as f64;
318+
+ let (val, symbol) = if count > 1000000.0 {
319+
+ (count / 1000000.0, "M")
320+
+ } else if count > 1000.0 {
321+
+ (count / 1000.0, "K")
322+
+ } else {
323+
+ (count, "")
324+
+ };
325+
+ if val > 100.0 {
326+
+ format!("{val:>5.1}{symbol}")
327+
+ } else if val > 10.0 {
328+
+ format!("{val:>5.2}{symbol}")
329+
+ } else {
330+
+ format!("{val:>5.3}{symbol}")
331+
+ }
332+
+}
333+
+
334+
+fn format_time(nanos: u64) -> String {
335+
+ let nanos = nanos as f64;
336+
+ let (val, symbol) = if nanos > 1000000.0 {
337+
+ (nanos / 1000000.0, "ms")
338+
+ } else if nanos > 1000.0 {
339+
+ (nanos / 1000.0, "μs")
340+
+ } else {
341+
+ (nanos, "ns")
342+
+ };
343+
+ if val > 100.0 {
344+
+ format!("{val:>5.1}{symbol}")
345+
+ } else if val > 10.0 {
346+
+ format!("{val:>5.2}{symbol}")
347+
+ } else {
348+
+ format!("{val:>5.3}{symbol}")
349+
+ }
350+
+}
351+
+
352+
+#[test]
353+
+fn test_format_time() {
354+
+ assert_eq!("12.00ns", format_time(12));
355+
+ assert_eq!("123.0ns", format_time(123));
356+
+ assert_eq!("1.234μs", format_time(1234));
357+
+ assert_eq!("12.35μs", format_time(12349));
358+
+ assert_eq!("123.4μs", format_time(123400));
359+
+ assert_eq!("1.235ms", format_time(1234900));
360+
+ assert_eq!("12.34ms", format_time(12340000));
361+
+}

0 commit comments

Comments
 (0)