Skip to content

Commit 22e92a6

Browse files
authored
chore(observability): Refactor EWMA + Gauge into a new struct (#24556)
1 parent 0af6553 commit 22e92a6

File tree

4 files changed

+45
-16
lines changed

4 files changed

+45
-16
lines changed

lib/vector-buffers/src/topology/channel/limited_queue.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,10 @@ use crossbeam_queue::{ArrayQueue, SegQueue};
1616
use futures::Stream;
1717
use metrics::{Gauge, Histogram, gauge, histogram};
1818
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
19-
use vector_common::stats::AtomicEwma;
19+
use vector_common::stats::EwmaGauge;
2020

2121
use crate::{InMemoryBufferable, config::MemoryBufferSize};
2222

23-
/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a
24-
/// measure of how much weight to give to the current value versus the previous values. A value of
25-
/// 0.9 results in a "half life" of 6-7 measurements.
26-
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;
27-
2823
/// Error returned by `LimitedSender::send` when the receiver has disconnected.
2924
#[derive(Debug, PartialEq, Eq)]
3025
pub struct SendError<T>(pub T);
@@ -114,8 +109,7 @@ impl ChannelMetricMetadata {
114109
struct Metrics {
115110
histogram: Histogram,
116111
gauge: Gauge,
117-
mean_gauge: Gauge,
118-
ewma: Arc<AtomicEwma>,
112+
mean_gauge: EwmaGauge,
119113
// We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but
120114
// since the value is static, we never need to update it. The compiler detects this as an unused
121115
// field, so we need to suppress the warning here.
@@ -150,37 +144,36 @@ impl Metrics {
150144
let histogram_name = format!("{prefix}_utilization");
151145
let gauge_name = format!("{prefix}_utilization_level");
152146
let mean_name = format!("{prefix}_utilization_mean");
153-
let ewma = Arc::new(AtomicEwma::new(ewma_alpha.unwrap_or(DEFAULT_EWMA_ALPHA)));
154147
#[cfg(test)]
155148
let recorded_values = Arc::new(Mutex::new(Vec::new()));
156149
if let Some(label_value) = output {
157150
let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
158151
max_gauge.set(max_value);
152+
let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone());
159153
// DEPRECATED: buffer-bytes-events-metrics
160154
let legacy_max_gauge = gauge!(legacy_max_gauge_name, "output" => label_value.clone());
161155
legacy_max_gauge.set(max_value);
162156
Self {
163157
histogram: histogram!(histogram_name, "output" => label_value.clone()),
164158
gauge: gauge!(gauge_name, "output" => label_value.clone()),
165-
mean_gauge: gauge!(mean_name, "output" => label_value.clone()),
159+
mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha),
166160
max_gauge,
167-
ewma,
168161
legacy_max_gauge,
169162
#[cfg(test)]
170163
recorded_values,
171164
}
172165
} else {
173166
let max_gauge = gauge!(max_gauge_name);
174167
max_gauge.set(max_value);
168+
let mean_gauge_handle = gauge!(mean_name);
175169
// DEPRECATED: buffer-bytes-events-metrics
176170
let legacy_max_gauge = gauge!(legacy_max_gauge_name);
177171
legacy_max_gauge.set(max_value);
178172
Self {
179173
histogram: histogram!(histogram_name),
180174
gauge: gauge!(gauge_name),
181-
mean_gauge: gauge!(mean_name),
175+
mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha),
182176
max_gauge,
183-
ewma,
184177
legacy_max_gauge,
185178
#[cfg(test)]
186179
recorded_values,
@@ -192,8 +185,7 @@ impl Metrics {
192185
fn record(&self, value: usize) {
193186
self.histogram.record(value as f64);
194187
self.gauge.set(value as f64);
195-
let avg = self.ewma.update(value as f64);
196-
self.mean_gauge.set(avg);
188+
self.mean_gauge.record(value as f64);
197189
#[cfg(test)]
198190
if let Ok(mut recorded) = self.recorded_values.lock() {
199191
recorded.push(value);

lib/vector-buffers/src/topology/channel/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ mod receiver;
33
mod sender;
44

55
pub use limited_queue::{
6-
ChannelMetricMetadata, DEFAULT_EWMA_ALPHA, LimitedReceiver, LimitedSender, SendError, limited,
6+
ChannelMetricMetadata, LimitedReceiver, LimitedSender, SendError, limited,
77
};
88
pub use receiver::*;
99
pub use sender::*;
10+
pub use vector_common::stats::DEFAULT_EWMA_ALPHA;
1011

1112
#[cfg(test)]
1213
mod tests;
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use std::sync::Arc;
2+
3+
use metrics::Gauge;
4+
5+
use super::AtomicEwma;
6+
7+
/// The default alpha parameter used when constructing EWMA-backed gauges.
8+
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;
9+
10+
/// Couples a [`Gauge`] with an [`AtomicEwma`] so gauge readings reflect the EWMA.
11+
#[derive(Clone, Debug)]
12+
pub struct EwmaGauge {
13+
gauge: Gauge,
14+
// Note that the `Gauge` internally is equivalent to an `Arc<AtomicF64>` so we need to use the
15+
// same semantics for the EWMA calculation as well.
16+
ewma: Arc<AtomicEwma>,
17+
}
18+
19+
impl EwmaGauge {
20+
#[must_use]
21+
pub fn new(gauge: Gauge, alpha: Option<f64>) -> Self {
22+
let alpha = alpha.unwrap_or(DEFAULT_EWMA_ALPHA);
23+
let ewma = Arc::new(AtomicEwma::new(alpha));
24+
Self { gauge, ewma }
25+
}
26+
27+
/// Records a new value, updates the EWMA, and sets the gauge accordingly.
28+
pub fn record(&self, value: f64) {
29+
let average = self.ewma.update(value);
30+
self.gauge.set(average);
31+
}
32+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
#![allow(missing_docs)]
22

3+
pub mod ewma_gauge;
4+
5+
pub use ewma_gauge::{DEFAULT_EWMA_ALPHA, EwmaGauge};
6+
37
use std::sync::atomic::Ordering;
48

59
use crate::atomic::AtomicF64;

0 commit comments

Comments
 (0)