Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Added metrics to record the utilization level of the buffers that each transform receives from:

- `transform_buffer_max_byte_size`
- `transform_buffer_max_event_size`
- `transform_buffer_utilization`
- `transform_buffer_utilization_level`

authors: bruceg
37 changes: 18 additions & 19 deletions lib/vector-buffers/src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use async_trait::async_trait;
use snafu::{ResultExt, Snafu};
use tracing::Span;

use super::channel::{ReceiverAdapter, SenderAdapter};
use super::channel::{ChannelMetricMetadata, ReceiverAdapter, SenderAdapter};
use crate::{
Bufferable, WhenFull,
buffer_usage_data::{BufferUsage, BufferUsageHandle},
topology::channel::{BufferReceiver, BufferSender},
variants::MemoryBuffer,
config::MemoryBufferSize,
topology::channel::{BufferReceiver, BufferSender, limited},
};

/// Value that can be used as a stage in a buffer topology.
Expand Down Expand Up @@ -186,26 +186,25 @@ impl<T: Bufferable> TopologyBuilder<T> {
/// create the stage, installing buffer usage metrics that aren't required, and so on.
///
#[allow(clippy::print_stderr)]
pub async fn standalone_memory(
pub fn standalone_memory(
max_events: NonZeroUsize,
when_full: WhenFull,
receiver_span: &Span,
metadata: Option<ChannelMetricMetadata>,
) -> (BufferSender<T>, BufferReceiver<T>) {
let usage_handle = BufferUsageHandle::noop();
usage_handle.set_buffer_limits(None, Some(max_events.get()));

let memory_buffer = Box::new(MemoryBuffer::with_max_events(max_events));
let (sender, receiver) = memory_buffer
.into_buffer_parts(usage_handle.clone())
.await
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));
let limit = MemoryBufferSize::MaxEvents(max_events);
let (sender, receiver) = limited(limit, metadata);

let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
m => m,
};
let mut sender = BufferSender::new(sender, mode);
let mut sender = BufferSender::new(sender.into(), mode);
sender.with_send_duration_instrumentation(0, receiver_span);
let receiver = BufferReceiver::new(receiver);
let receiver = BufferReceiver::new(receiver.into());

(sender, receiver)
}
Expand All @@ -224,23 +223,23 @@ impl<T: Bufferable> TopologyBuilder<T> {
/// can simplifying needing to require callers to do all the boilerplate to create the builder,
/// create the stage, installing buffer usage metrics that aren't required, and so on.
#[cfg(test)]
pub async fn standalone_memory_test(
pub fn standalone_memory_test(
max_events: NonZeroUsize,
when_full: WhenFull,
usage_handle: BufferUsageHandle,
metadata: Option<ChannelMetricMetadata>,
) -> (BufferSender<T>, BufferReceiver<T>) {
let memory_buffer = Box::new(MemoryBuffer::with_max_events(max_events));
let (sender, receiver) = memory_buffer
.into_buffer_parts(usage_handle.clone())
.await
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));
usage_handle.set_buffer_limits(None, Some(max_events.get()));

let limit = MemoryBufferSize::MaxEvents(max_events);
let (sender, receiver) = limited(limit, metadata);

let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
m => m,
};
let mut sender = BufferSender::new(sender, mode);
let mut receiver = BufferReceiver::new(receiver);
let mut sender = BufferSender::new(sender.into(), mode);
let mut receiver = BufferReceiver::new(receiver.into());

sender.with_usage_instrumentation(usage_handle.clone());
receiver.with_usage_instrumentation(usage_handle);
Expand Down
87 changes: 74 additions & 13 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use std::{
},
};

#[cfg(test)]
use std::sync::Mutex;

use async_stream::stream;
use crossbeam_queue::{ArrayQueue, SegQueue};
use futures::Stream;
Expand Down Expand Up @@ -89,6 +92,18 @@ where
}
}

#[derive(Clone, Debug)]
pub struct ChannelMetricMetadata {
prefix: &'static str,
output: Option<String>,
}

impl ChannelMetricMetadata {
pub fn new(prefix: &'static str, output: Option<String>) -> Self {
Self { prefix, output }
}
}

#[derive(Clone, Debug)]
struct Metrics {
histogram: Histogram,
Expand All @@ -98,28 +113,54 @@ struct Metrics {
// field, so we need to suppress the warning here.
#[expect(dead_code)]
max_gauge: Gauge,
#[cfg(test)]
recorded_values: Arc<Mutex<Vec<usize>>>,
}

impl Metrics {
#[expect(clippy::cast_precision_loss)] // We have to convert buffer sizes for a gauge, it's okay to lose precision here.
fn new(limit: MemoryBufferSize, prefix: &'static str, output: &str) -> Self {
fn new(limit: MemoryBufferSize, metadata: ChannelMetricMetadata) -> Self {
let ChannelMetricMetadata { prefix, output } = metadata;
let (gauge_suffix, max_value) = match limit {
MemoryBufferSize::MaxEvents(max_events) => ("_max_event_size", max_events.get() as f64),
MemoryBufferSize::MaxSize(max_bytes) => ("_max_byte_size", max_bytes.get() as f64),
};
let max_gauge = gauge!(format!("{prefix}{gauge_suffix}"), "output" => output.to_string());
max_gauge.set(max_value);
Self {
histogram: histogram!(format!("{prefix}_utilization"), "output" => output.to_string()),
gauge: gauge!(format!("{prefix}_utilization_level"), "output" => output.to_string()),
max_gauge,
let max_gauge_name = format!("{prefix}{gauge_suffix}");
let histogram_name = format!("{prefix}_utilization");
let gauge_name = format!("{prefix}_utilization_level");
#[cfg(test)]
let recorded_values = Arc::new(Mutex::new(Vec::new()));
if let Some(label_value) = output {
let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
max_gauge.set(max_value);
Self {
histogram: histogram!(histogram_name, "output" => label_value.clone()),
gauge: gauge!(gauge_name, "output" => label_value.clone()),
max_gauge,
#[cfg(test)]
recorded_values,
}
} else {
let max_gauge = gauge!(max_gauge_name);
max_gauge.set(max_value);
Self {
histogram: histogram!(histogram_name),
gauge: gauge!(gauge_name),
max_gauge,
#[cfg(test)]
recorded_values,
}
}
}

#[expect(clippy::cast_precision_loss)]
fn record(&self, value: usize) {
self.histogram.record(value as f64);
self.gauge.set(value as f64);
#[cfg(test)]
if let Ok(mut recorded) = self.recorded_values.lock() {
recorded.push(value);
}
}
}

Expand All @@ -145,10 +186,9 @@ impl<T> Clone for Inner<T> {
}

impl<T: InMemoryBufferable> Inner<T> {
fn new(limit: MemoryBufferSize, metric_name_output: Option<(&'static str, &str)>) -> Self {
fn new(limit: MemoryBufferSize, metric_metadata: Option<ChannelMetricMetadata>) -> Self {
let read_waker = Arc::new(Notify::new());
let metrics =
metric_name_output.map(|(prefix, output)| Metrics::new(limit, prefix, output));
let metrics = metric_metadata.map(|metadata| Metrics::new(limit, metadata));
match limit {
MemoryBufferSize::MaxEvents(max_events) => Inner {
data: Arc::new(ArrayQueue::new(max_events.get())),
Expand All @@ -167,6 +207,11 @@ impl<T: InMemoryBufferable> Inner<T> {
}
}

/// Records a send after acquiring all required permits.
///
/// The `total` value represents the channel utilization after this send completes. It may be
/// greater than the configured limit because the channel intentionally allows a single
/// oversized payload to flow through rather than forcing the sender to split it.
fn send_with_permit(&mut self, total: usize, permits: OwnedSemaphorePermit, item: T) {
self.data.push((permits, item));
self.read_waker.notify_one();
Expand Down Expand Up @@ -335,9 +380,9 @@ impl<T> Drop for LimitedReceiver<T> {

pub fn limited<T: InMemoryBufferable + fmt::Debug>(
limit: MemoryBufferSize,
metric_name_output: Option<(&'static str, &str)>,
metric_metadata: Option<ChannelMetricMetadata>,
) -> (LimitedSender<T>, LimitedReceiver<T>) {
let inner = Inner::new(limit, metric_name_output);
let inner = Inner::new(limit, metric_metadata);

let sender = LimitedSender {
inner: inner.clone(),
Expand All @@ -355,7 +400,7 @@ mod tests {
use tokio_test::{assert_pending, assert_ready, task::spawn};
use vector_common::byte_size_of::ByteSizeOf;

use super::limited;
use super::{ChannelMetricMetadata, limited};
use crate::{
MemoryBufferSize,
test::MultiEventRecord,
Expand Down Expand Up @@ -391,6 +436,22 @@ mod tests {
assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
}

#[tokio::test]
async fn records_utilization_on_send() {
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
let (mut tx, mut rx) = limited(
limit,
Some(ChannelMetricMetadata::new("test_channel", None)),
);

let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();

tx.send(Sample::new(1)).await.expect("send should succeed");
assert_eq!(metrics.lock().unwrap().last().copied(), Some(1));

let _ = rx.next().await;
}

#[test]
fn test_limiting_by_byte_size() {
let max_elements = 10;
Expand Down
4 changes: 3 additions & 1 deletion lib/vector-buffers/src/topology/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ mod limited_queue;
mod receiver;
mod sender;

pub use limited_queue::{LimitedReceiver, LimitedSender, SendError, limited};
pub use limited_queue::{
ChannelMetricMetadata, LimitedReceiver, LimitedSender, SendError, limited,
};
pub use receiver::*;
pub use sender::*;

Expand Down
12 changes: 6 additions & 6 deletions lib/vector-buffers/src/topology/channel/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ where
#[tokio::test]
async fn test_sender_block() {
// Get a non-overflow buffer in blocking mode with a capacity of 3.
let (mut tx, rx, _) = build_buffer(3, WhenFull::Block, None).await;
let (mut tx, rx, _) = build_buffer(3, WhenFull::Block, None);

// We should be able to send three messages through unimpeded.
assert_current_send_capacity(&mut tx, Some(3), None);
Expand All @@ -113,7 +113,7 @@ async fn test_sender_block() {
#[tokio::test]
async fn test_sender_drop_newest() {
// Get a non-overflow buffer in "drop newest" mode with a capacity of 3.
let (mut tx, rx, _) = build_buffer(3, WhenFull::DropNewest, None).await;
let (mut tx, rx, _) = build_buffer(3, WhenFull::DropNewest, None);

// We should be able to send three messages through unimpeded.
assert_current_send_capacity(&mut tx, Some(3), None);
Expand All @@ -138,7 +138,7 @@ async fn test_sender_drop_newest() {
async fn test_sender_overflow_block() {
// Get an overflow buffer, where the overflow buffer is in blocking mode, and both the base
// and overflow buffers have a capacity of 2.
let (mut tx, rx, _) = build_buffer(2, WhenFull::Overflow, Some(WhenFull::Block)).await;
let (mut tx, rx, _) = build_buffer(2, WhenFull::Overflow, Some(WhenFull::Block));

// We should be able to send four message through unimpeded -- two for the base sender, and
// two for the overflow sender.
Expand All @@ -164,7 +164,7 @@ async fn test_sender_overflow_block() {
async fn test_sender_overflow_drop_newest() {
// Get an overflow buffer, where the overflow buffer is in "drop newest" mode, and both the
// base and overflow buffers have a capacity of 2.
let (mut tx, rx, _) = build_buffer(2, WhenFull::Overflow, Some(WhenFull::DropNewest)).await;
let (mut tx, rx, _) = build_buffer(2, WhenFull::Overflow, Some(WhenFull::DropNewest));

// We should be able to send four message through unimpeded -- two for the base sender, and
// two for the overflow sender.
Expand All @@ -190,7 +190,7 @@ async fn test_sender_overflow_drop_newest() {
#[tokio::test]
async fn test_buffer_metrics_normal() {
// Get a regular blocking buffer.
let (mut tx, rx, handle) = build_buffer(5, WhenFull::Block, None).await;
let (mut tx, rx, handle) = build_buffer(5, WhenFull::Block, None);

// Send three items through, and make sure the buffer usage stats reflect that.
assert_current_send_capacity(&mut tx, Some(5), None);
Expand All @@ -217,7 +217,7 @@ async fn test_buffer_metrics_normal() {
#[tokio::test]
async fn test_buffer_metrics_drop_newest() {
// Get a buffer that drops the newest items when full.
let (mut tx, rx, handle) = build_buffer(2, WhenFull::DropNewest, None).await;
let (mut tx, rx, handle) = build_buffer(2, WhenFull::DropNewest, None);

// Send three items through, and make sure the buffer usage stats reflect that.
assert_current_send_capacity(&mut tx, Some(2), None);
Expand Down
24 changes: 11 additions & 13 deletions lib/vector-buffers/src/topology/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl error::Error for BasicError {}
/// If `mode` is set to `WhenFull::Overflow`, then the buffer will be set to overflow mode, with
/// another in-memory channel buffer being used as the overflow buffer. The overflow buffer will
/// also use the same capacity as the outer buffer.
pub(crate) async fn build_buffer(
pub(crate) fn build_buffer(
capacity: usize,
mode: WhenFull,
overflow_mode: Option<WhenFull>,
Expand All @@ -154,27 +154,25 @@ pub(crate) async fn build_buffer(
NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
overflow_mode,
handle.clone(),
)
.await;
None,
);
let (mut base_sender, mut base_receiver) = TopologyBuilder::standalone_memory_test(
NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
WhenFull::Overflow,
handle.clone(),
)
.await;
None,
);
base_sender.switch_to_overflow(overflow_sender);
base_receiver.switch_to_overflow(overflow_receiver);

(base_sender, base_receiver)
}
m => {
TopologyBuilder::standalone_memory_test(
NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
m,
handle.clone(),
)
.await
}
m => TopologyBuilder::standalone_memory_test(
NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
m,
handle.clone(),
None,
),
};

(tx, rx, handle)
Expand Down
5 changes: 3 additions & 2 deletions lib/vector-buffers/src/variants/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{error::Error, num::NonZeroUsize};
use std::error::Error;

use async_trait::async_trait;

Expand All @@ -21,7 +21,8 @@ impl MemoryBuffer {
MemoryBuffer { capacity }
}

pub fn with_max_events(n: NonZeroUsize) -> Self {
#[cfg(test)]
pub fn with_max_events(n: std::num::NonZeroUsize) -> Self {
Self {
capacity: MemoryBufferSize::MaxEvents(n),
}
Expand Down
Loading
Loading