Skip to content

Commit 94a2f40

Browse files
authored
enhancement(observability): Add configuration for buffer utilization EWMA alpha (#24467)
* enhancement(observability): Add configuration for buffer utilization EWMA alpha * Restore comment
1 parent a5a6b5e commit 94a2f40

File tree

16 files changed

+111
-22
lines changed

16 files changed

+111
-22
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Added `buffer_utilization_ewma_alpha` configuration option to the global
2+
options, allowing users to control the alpha value for the exponentially
3+
weighted moving average (EWMA) used in source and transform buffer utilization
4+
metrics.
5+
6+
authors: bruceg

lib/vector-buffers/src/topology/builder.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,13 @@ impl<T: Bufferable> TopologyBuilder<T> {
191191
when_full: WhenFull,
192192
receiver_span: &Span,
193193
metadata: Option<ChannelMetricMetadata>,
194+
ewma_alpha: Option<f64>,
194195
) -> (BufferSender<T>, BufferReceiver<T>) {
195196
let usage_handle = BufferUsageHandle::noop();
196197
usage_handle.set_buffer_limits(None, Some(max_events.get()));
197198

198199
let limit = MemoryBufferSize::MaxEvents(max_events);
199-
let (sender, receiver) = limited(limit, metadata);
200+
let (sender, receiver) = limited(limit, metadata, ewma_alpha);
200201

201202
let mode = match when_full {
202203
WhenFull::Overflow => WhenFull::Block,
@@ -232,7 +233,7 @@ impl<T: Bufferable> TopologyBuilder<T> {
232233
usage_handle.set_buffer_limits(None, Some(max_events.get()));
233234

234235
let limit = MemoryBufferSize::MaxEvents(max_events);
235-
let (sender, receiver) = limited(limit, metadata);
236+
let (sender, receiver) = limited(limit, metadata, None);
236237

237238
let mode = match when_full {
238239
WhenFull::Overflow => WhenFull::Block,

lib/vector-buffers/src/topology/channel/limited_queue.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{InMemoryBufferable, config::MemoryBufferSize};
2323
/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a
2424
/// measure of how much weight to give to the current value versus the previous values. A value of
2525
/// 0.9 results in a "half life" of 6-7 measurements.
26-
const EWMA_ALPHA: f64 = 0.9;
26+
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;
2727

2828
/// Error returned by `LimitedSender::send` when the receiver has disconnected.
2929
#[derive(Debug, PartialEq, Eq)]
@@ -127,7 +127,11 @@ struct Metrics {
127127

128128
impl Metrics {
129129
#[expect(clippy::cast_precision_loss)] // We have to convert buffer sizes for a gauge, it's okay to lose precision here.
130-
fn new(limit: MemoryBufferSize, metadata: ChannelMetricMetadata) -> Self {
130+
fn new(
131+
limit: MemoryBufferSize,
132+
metadata: ChannelMetricMetadata,
133+
ewma_alpha: Option<f64>,
134+
) -> Self {
131135
let ChannelMetricMetadata { prefix, output } = metadata;
132136
let (gauge_suffix, max_value) = match limit {
133137
MemoryBufferSize::MaxEvents(max_events) => ("_max_event_size", max_events.get() as f64),
@@ -137,7 +141,7 @@ impl Metrics {
137141
let histogram_name = format!("{prefix}_utilization");
138142
let gauge_name = format!("{prefix}_utilization_level");
139143
let mean_name = format!("{prefix}_utilization_mean");
140-
let ewma = Arc::new(AtomicEwma::new(EWMA_ALPHA));
144+
let ewma = Arc::new(AtomicEwma::new(ewma_alpha.unwrap_or(DEFAULT_EWMA_ALPHA)));
141145
#[cfg(test)]
142146
let recorded_values = Arc::new(Mutex::new(Vec::new()));
143147
if let Some(label_value) = output {
@@ -202,9 +206,13 @@ impl<T> Clone for Inner<T> {
202206
}
203207

204208
impl<T: InMemoryBufferable> Inner<T> {
205-
fn new(limit: MemoryBufferSize, metric_metadata: Option<ChannelMetricMetadata>) -> Self {
209+
fn new(
210+
limit: MemoryBufferSize,
211+
metric_metadata: Option<ChannelMetricMetadata>,
212+
ewma_alpha: Option<f64>,
213+
) -> Self {
206214
let read_waker = Arc::new(Notify::new());
207-
let metrics = metric_metadata.map(|metadata| Metrics::new(limit, metadata));
215+
let metrics = metric_metadata.map(|metadata| Metrics::new(limit, metadata, ewma_alpha));
208216
match limit {
209217
MemoryBufferSize::MaxEvents(max_events) => Inner {
210218
data: Arc::new(ArrayQueue::new(max_events.get())),
@@ -397,8 +405,9 @@ impl<T> Drop for LimitedReceiver<T> {
397405
pub fn limited<T: InMemoryBufferable + fmt::Debug>(
398406
limit: MemoryBufferSize,
399407
metric_metadata: Option<ChannelMetricMetadata>,
408+
ewma_alpha: Option<f64>,
400409
) -> (LimitedSender<T>, LimitedReceiver<T>) {
401-
let inner = Inner::new(limit, metric_metadata);
410+
let inner = Inner::new(limit, metric_metadata, ewma_alpha);
402411

403412
let sender = LimitedSender {
404413
inner: inner.clone(),
@@ -426,7 +435,7 @@ mod tests {
426435
#[tokio::test]
427436
async fn send_receive() {
428437
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
429-
let (mut tx, mut rx) = limited(limit, None);
438+
let (mut tx, mut rx) = limited(limit, None, None);
430439

431440
assert_eq!(2, tx.available_capacity());
432441

@@ -458,6 +467,7 @@ mod tests {
458467
let (mut tx, mut rx) = limited(
459468
limit,
460469
Some(ChannelMetricMetadata::new("test_channel", None)),
470+
None,
461471
);
462472

463473
let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
@@ -477,7 +487,7 @@ mod tests {
477487

478488
// With this configuration a maximum of exactly 10 messages can fit in the channel
479489
let limit = MemoryBufferSize::MaxSize(NonZeroUsize::new(max_allowed_bytes).unwrap());
480-
let (mut tx, mut rx) = limited(limit, None);
490+
let (mut tx, mut rx) = limited(limit, None, None);
481491

482492
assert_eq!(max_allowed_bytes, tx.available_capacity());
483493

@@ -511,7 +521,7 @@ mod tests {
511521
#[test]
512522
fn sender_waits_for_more_capacity_when_none_available() {
513523
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
514-
let (mut tx, mut rx) = limited(limit, None);
524+
let (mut tx, mut rx) = limited(limit, None, None);
515525

516526
assert_eq!(1, tx.available_capacity());
517527

@@ -573,7 +583,7 @@ mod tests {
573583
#[test]
574584
fn sender_waits_for_more_capacity_when_partial_available() {
575585
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap());
576-
let (mut tx, mut rx) = limited(limit, None);
586+
let (mut tx, mut rx) = limited(limit, None, None);
577587

578588
assert_eq!(7, tx.available_capacity());
579589

@@ -662,7 +672,7 @@ mod tests {
662672
#[test]
663673
fn empty_receiver_returns_none_when_last_sender_drops() {
664674
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
665-
let (mut tx, mut rx) = limited(limit, None);
675+
let (mut tx, mut rx) = limited(limit, None, None);
666676

667677
assert_eq!(1, tx.available_capacity());
668678

@@ -705,7 +715,7 @@ mod tests {
705715
#[test]
706716
fn receiver_returns_none_once_empty_when_last_sender_drops() {
707717
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
708-
let (tx, mut rx) = limited::<Sample>(limit, None);
718+
let (tx, mut rx) = limited::<Sample>(limit, None, None);
709719

710720
assert_eq!(1, tx.available_capacity());
711721

@@ -735,7 +745,7 @@ mod tests {
735745
#[test]
736746
fn oversized_send_allowed_when_empty() {
737747
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
738-
let (mut tx, mut rx) = limited(limit, None);
748+
let (mut tx, mut rx) = limited(limit, None, None);
739749

740750
assert_eq!(1, tx.available_capacity());
741751

@@ -768,7 +778,7 @@ mod tests {
768778
#[test]
769779
fn oversized_send_allowed_when_partial_capacity() {
770780
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
771-
let (mut tx, mut rx) = limited(limit, None);
781+
let (mut tx, mut rx) = limited(limit, None, None);
772782

773783
assert_eq!(2, tx.available_capacity());
774784

lib/vector-buffers/src/topology/channel/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ mod receiver;
33
mod sender;
44

55
pub use limited_queue::{
6-
ChannelMetricMetadata, LimitedReceiver, LimitedSender, SendError, limited,
6+
ChannelMetricMetadata, DEFAULT_EWMA_ALPHA, LimitedReceiver, LimitedSender, SendError, limited,
77
};
88
pub use receiver::*;
99
pub use sender::*;

lib/vector-buffers/src/variants/in_memory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ where
4545

4646
usage_handle.set_buffer_limits(max_bytes, max_size);
4747

48-
let (tx, rx) = limited(self.capacity, None);
48+
let (tx, rx) = limited(self.capacity, None, None);
4949
Ok((tx.into(), rx.into()))
5050
}
5151
}

lib/vector-core/src/config/global_options.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,19 @@ pub struct GlobalOptions {
140140
#[serde(skip_serializing_if = "crate::serde::is_default")]
141141
pub expire_metrics_per_metric_set: Option<Vec<PerMetricSetExpiration>>,
142142

143+
/// The alpha value for the exponential weighted moving average (EWMA) of source and transform
144+
/// buffer utilization metrics.
145+
///
146+
/// This value specifies how much of the existing value is retained when each update is made.
147+
/// Values closer to 1.0 result in the value adjusting slower to changes. The default value of
148+
/// 0.9 is equivalent to a "half life" of 6-7 measurements.
149+
///
150+
/// Must be between 0 and 1 exclusive (0 < alpha < 1).
151+
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
152+
#[configurable(validation(range(min = 0.0, max = 1.0)))]
153+
#[configurable(metadata(docs::advanced))]
154+
pub buffer_utilization_ewma_alpha: Option<f64>,
155+
143156
/// The interval, in seconds, at which the internal metrics cache for VRL is refreshed.
144157
/// This must be set to be able to access metrics in VRL functions.
145158
///
@@ -295,6 +308,9 @@ impl GlobalOptions {
295308
expire_metrics: self.expire_metrics.or(with.expire_metrics),
296309
expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs),
297310
expire_metrics_per_metric_set: merged_expire_metrics_per_metric_set,
311+
buffer_utilization_ewma_alpha: self
312+
.buffer_utilization_ewma_alpha
313+
.or(with.buffer_utilization_ewma_alpha),
298314
metrics_storage_refresh_period: self
299315
.metrics_storage_refresh_period
300316
.or(with.metrics_storage_refresh_period),

lib/vector-core/src/fanout.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ mod tests {
490490
WhenFull::Block,
491491
&Span::current(),
492492
None,
493+
None,
493494
)
494495
}
495496

lib/vector-core/src/source_sender/builder.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub struct Builder {
1313
named_outputs: HashMap<String, Output>,
1414
lag_time: Option<Histogram>,
1515
timeout: Option<Duration>,
16+
ewma_alpha: Option<f64>,
1617
}
1718

1819
impl Default for Builder {
@@ -23,6 +24,7 @@ impl Default for Builder {
2324
named_outputs: Default::default(),
2425
lag_time: Some(histogram!(LAG_TIME_NAME)),
2526
timeout: None,
27+
ewma_alpha: None,
2628
}
2729
}
2830
}
@@ -40,6 +42,12 @@ impl Builder {
4042
self
4143
}
4244

45+
#[must_use]
46+
pub fn with_ewma_alpha(mut self, alpha: Option<f64>) -> Self {
47+
self.ewma_alpha = alpha;
48+
self
49+
}
50+
4351
pub fn add_source_output(
4452
&mut self,
4553
output: SourceOutput,
@@ -60,6 +68,7 @@ impl Builder {
6068
log_definition,
6169
output_id,
6270
self.timeout,
71+
self.ewma_alpha,
6372
);
6473
self.default_output = Some(output);
6574
rx
@@ -72,6 +81,7 @@ impl Builder {
7281
log_definition,
7382
output_id,
7483
self.timeout,
84+
self.ewma_alpha,
7585
);
7686
self.named_outputs.insert(name, output);
7787
rx

lib/vector-core/src/source_sender/output.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,11 @@ impl Output {
115115
log_definition: Option<Arc<Definition>>,
116116
output_id: OutputId,
117117
timeout: Option<Duration>,
118+
ewma_alpha: Option<f64>,
118119
) -> (Self, LimitedReceiver<SourceSenderItem>) {
119120
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(n).unwrap());
120121
let metrics = ChannelMetricMetadata::new(UTILIZATION_METRIC_PREFIX, Some(output.clone()));
121-
let (tx, rx) = channel::limited(limit, Some(metrics));
122+
let (tx, rx) = channel::limited(limit, Some(metrics), ewma_alpha);
122123
(
123124
Self {
124125
sender: tx,

lib/vector-core/src/source_sender/sender.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ impl SourceSender {
119119
None,
120120
output_id,
121121
timeout,
122+
None,
122123
);
123124
(
124125
Self {
@@ -192,7 +193,7 @@ impl SourceSender {
192193
port: Some(name.clone()),
193194
};
194195
let (output, recv) =
195-
Output::new_with_buffer(100, name.clone(), None, None, output_id, None);
196+
Output::new_with_buffer(100, name.clone(), None, None, output_id, None, None);
196197
let recv = recv.into_stream().map(move |mut item| {
197198
item.events.iter_events_mut().for_each(|mut event| {
198199
let metadata = event.metadata_mut();

0 commit comments

Comments
 (0)