Skip to content

Commit 79d55e4

Browse files
committed
enhancement(transforms): Add internal metric to record buffer utilization
This builds on the work in #24272 and adds support for transform buffer utilization. Since these are input buffers they omit the `output` label used by the buffers created by the source senders.
1 parent e627722 commit 79d55e4

File tree

15 files changed

+284
-117
lines changed

15 files changed

+284
-117
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
Added metrics to record the utilization level of the buffers that each transform receives from:
2+
3+
- `transform_buffer_max_byte_size`
4+
- `transform_buffer_max_event_size`
5+
- `transform_buffer_utilization`
6+
- `transform_buffer_utilization_level`
7+
8+
authors: bruceg

lib/vector-buffers/src/topology/builder.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ use async_trait::async_trait;
44
use snafu::{ResultExt, Snafu};
55
use tracing::Span;
66

7-
use super::channel::{ReceiverAdapter, SenderAdapter};
7+
use super::channel::{ChannelMetricMetadata, ReceiverAdapter, SenderAdapter};
88
use crate::{
99
Bufferable, WhenFull,
1010
buffer_usage_data::{BufferUsage, BufferUsageHandle},
11-
topology::channel::{BufferReceiver, BufferSender},
12-
variants::MemoryBuffer,
11+
config::MemoryBufferSize,
12+
topology::channel::{BufferReceiver, BufferSender, limited},
1313
};
1414

1515
/// Value that can be used as a stage in a buffer topology.
@@ -186,26 +186,25 @@ impl<T: Bufferable> TopologyBuilder<T> {
186186
/// create the stage, installing buffer usage metrics that aren't required, and so on.
187187
///
188188
#[allow(clippy::print_stderr)]
189-
pub async fn standalone_memory(
189+
pub fn standalone_memory(
190190
max_events: NonZeroUsize,
191191
when_full: WhenFull,
192192
receiver_span: &Span,
193+
metadata: Option<ChannelMetricMetadata>,
193194
) -> (BufferSender<T>, BufferReceiver<T>) {
194195
let usage_handle = BufferUsageHandle::noop();
196+
usage_handle.set_buffer_limits(None, Some(max_events.get()));
195197

196-
let memory_buffer = Box::new(MemoryBuffer::with_max_events(max_events));
197-
let (sender, receiver) = memory_buffer
198-
.into_buffer_parts(usage_handle.clone())
199-
.await
200-
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));
198+
let limit = MemoryBufferSize::MaxEvents(max_events);
199+
let (sender, receiver) = limited(limit, metadata);
201200

202201
let mode = match when_full {
203202
WhenFull::Overflow => WhenFull::Block,
204203
m => m,
205204
};
206-
let mut sender = BufferSender::new(sender, mode);
205+
let mut sender = BufferSender::new(sender.into(), mode);
207206
sender.with_send_duration_instrumentation(0, receiver_span);
208-
let receiver = BufferReceiver::new(receiver);
207+
let receiver = BufferReceiver::new(receiver.into());
209208

210209
(sender, receiver)
211210
}
@@ -224,23 +223,23 @@ impl<T: Bufferable> TopologyBuilder<T> {
224223
/// can simplifying needing to require callers to do all the boilerplate to create the builder,
225224
/// create the stage, installing buffer usage metrics that aren't required, and so on.
226225
#[cfg(test)]
227-
pub async fn standalone_memory_test(
226+
pub fn standalone_memory_test(
228227
max_events: NonZeroUsize,
229228
when_full: WhenFull,
230229
usage_handle: BufferUsageHandle,
230+
metadata: Option<ChannelMetricMetadata>,
231231
) -> (BufferSender<T>, BufferReceiver<T>) {
232-
let memory_buffer = Box::new(MemoryBuffer::with_max_events(max_events));
233-
let (sender, receiver) = memory_buffer
234-
.into_buffer_parts(usage_handle.clone())
235-
.await
236-
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));
232+
usage_handle.set_buffer_limits(None, Some(max_events.get()));
233+
234+
let limit = MemoryBufferSize::MaxEvents(max_events);
235+
let (sender, receiver) = limited(limit, metadata);
237236

238237
let mode = match when_full {
239238
WhenFull::Overflow => WhenFull::Block,
240239
m => m,
241240
};
242-
let mut sender = BufferSender::new(sender, mode);
243-
let mut receiver = BufferReceiver::new(receiver);
241+
let mut sender = BufferSender::new(sender.into(), mode);
242+
let mut receiver = BufferReceiver::new(receiver.into());
244243

245244
sender.with_usage_instrumentation(usage_handle.clone());
246245
receiver.with_usage_instrumentation(usage_handle);

lib/vector-buffers/src/topology/channel/limited_queue.rs

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ use std::{
88
},
99
};
1010

11+
#[cfg(test)]
12+
use std::sync::Mutex;
13+
1114
use async_stream::stream;
1215
use crossbeam_queue::{ArrayQueue, SegQueue};
1316
use futures::Stream;
@@ -89,6 +92,18 @@ where
8992
}
9093
}
9194

95+
#[derive(Clone, Debug)]
96+
pub struct ChannelMetricMetadata {
97+
prefix: &'static str,
98+
output: Option<String>,
99+
}
100+
101+
impl ChannelMetricMetadata {
102+
pub fn new(prefix: &'static str, output: Option<String>) -> Self {
103+
Self { prefix, output }
104+
}
105+
}
106+
92107
#[derive(Clone, Debug)]
93108
struct Metrics {
94109
histogram: Histogram,
@@ -98,28 +113,58 @@ struct Metrics {
98113
// field, so we need to suppress the warning here.
99114
#[expect(dead_code)]
100115
max_gauge: Gauge,
116+
#[cfg(test)]
117+
recorded_values: Arc<Mutex<Vec<usize>>>,
101118
}
102119

103120
impl Metrics {
104121
#[expect(clippy::cast_precision_loss)] // We have to convert buffer sizes for a gauge, it's okay to lose precision here.
105-
fn new(limit: MemoryBufferSize, prefix: &'static str, output: &str) -> Self {
122+
fn new(limit: MemoryBufferSize, metadata: ChannelMetricMetadata) -> Self {
123+
let ChannelMetricMetadata { prefix, output } = metadata;
106124
let (gauge_suffix, max_value) = match limit {
107125
MemoryBufferSize::MaxEvents(max_events) => ("_max_event_size", max_events.get() as f64),
108126
MemoryBufferSize::MaxSize(max_bytes) => ("_max_byte_size", max_bytes.get() as f64),
109127
};
110-
let max_gauge = gauge!(format!("{prefix}{gauge_suffix}"), "output" => output.to_string());
111-
max_gauge.set(max_value);
112-
Self {
113-
histogram: histogram!(format!("{prefix}_utilization"), "output" => output.to_string()),
114-
gauge: gauge!(format!("{prefix}_utilization_level"), "output" => output.to_string()),
115-
max_gauge,
128+
if let Some(label_value) = output {
129+
let max_gauge = gauge!(
130+
format!("{prefix}{gauge_suffix}"),
131+
"output" => label_value.clone()
132+
);
133+
max_gauge.set(max_value);
134+
Self {
135+
histogram: histogram!(
136+
format!("{prefix}_utilization"),
137+
"output" => label_value.clone()
138+
),
139+
gauge: gauge!(
140+
format!("{prefix}_utilization_level"),
141+
"output" => label_value
142+
),
143+
max_gauge,
144+
#[cfg(test)]
145+
recorded_values: Arc::new(Mutex::new(Vec::new())),
146+
}
147+
} else {
148+
let max_gauge = gauge!(format!("{prefix}{gauge_suffix}"));
149+
max_gauge.set(max_value);
150+
Self {
151+
histogram: histogram!(format!("{prefix}_utilization")),
152+
gauge: gauge!(format!("{prefix}_utilization_level")),
153+
max_gauge,
154+
#[cfg(test)]
155+
recorded_values: Arc::new(Mutex::new(Vec::new())),
156+
}
116157
}
117158
}
118159

119160
#[expect(clippy::cast_precision_loss)]
120161
fn record(&self, value: usize) {
121162
self.histogram.record(value as f64);
122163
self.gauge.set(value as f64);
164+
#[cfg(test)]
165+
if let Ok(mut recorded) = self.recorded_values.lock() {
166+
recorded.push(value);
167+
}
123168
}
124169
}
125170

@@ -145,10 +190,9 @@ impl<T> Clone for Inner<T> {
145190
}
146191

147192
impl<T: InMemoryBufferable> Inner<T> {
148-
fn new(limit: MemoryBufferSize, metric_name_output: Option<(&'static str, &str)>) -> Self {
193+
fn new(limit: MemoryBufferSize, metric_metadata: Option<ChannelMetricMetadata>) -> Self {
149194
let read_waker = Arc::new(Notify::new());
150-
let metrics =
151-
metric_name_output.map(|(prefix, output)| Metrics::new(limit, prefix, output));
195+
let metrics = metric_metadata.map(|metadata| Metrics::new(limit, metadata));
152196
match limit {
153197
MemoryBufferSize::MaxEvents(max_events) => Inner {
154198
data: Arc::new(ArrayQueue::new(max_events.get())),
@@ -167,6 +211,11 @@ impl<T: InMemoryBufferable> Inner<T> {
167211
}
168212
}
169213

214+
/// Records a send after acquiring all required permits.
215+
///
216+
/// The `total` value represents the channel utilization after this send completes. It may be
217+
/// greater than the configured limit because the channel intentionally allows a single
218+
/// oversized payload to flow through rather than forcing the sender to split it.
170219
fn send_with_permit(&mut self, total: usize, permits: OwnedSemaphorePermit, item: T) {
171220
self.data.push((permits, item));
172221
self.read_waker.notify_one();
@@ -335,9 +384,9 @@ impl<T> Drop for LimitedReceiver<T> {
335384

336385
pub fn limited<T: InMemoryBufferable + fmt::Debug>(
337386
limit: MemoryBufferSize,
338-
metric_name_output: Option<(&'static str, &str)>,
387+
metric_metadata: Option<ChannelMetricMetadata>,
339388
) -> (LimitedSender<T>, LimitedReceiver<T>) {
340-
let inner = Inner::new(limit, metric_name_output);
389+
let inner = Inner::new(limit, metric_metadata);
341390

342391
let sender = LimitedSender {
343392
inner: inner.clone(),
@@ -355,7 +404,7 @@ mod tests {
355404
use tokio_test::{assert_pending, assert_ready, task::spawn};
356405
use vector_common::byte_size_of::ByteSizeOf;
357406

358-
use super::limited;
407+
use super::{ChannelMetricMetadata, limited};
359408
use crate::{
360409
MemoryBufferSize,
361410
test::MultiEventRecord,
@@ -391,6 +440,22 @@ mod tests {
391440
assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
392441
}
393442

443+
#[tokio::test]
444+
async fn records_utilization_on_send() {
445+
let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
446+
let (mut tx, mut rx) = limited(
447+
limit,
448+
Some(ChannelMetricMetadata::new("test_channel", None)),
449+
);
450+
451+
let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
452+
453+
tx.send(Sample::new(1)).await.expect("send should succeed");
454+
assert_eq!(metrics.lock().unwrap().last().copied(), Some(1));
455+
456+
let _ = rx.next().await;
457+
}
458+
394459
#[test]
395460
fn test_limiting_by_byte_size() {
396461
let max_elements = 10;

lib/vector-buffers/src/topology/channel/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ mod limited_queue;
22
mod receiver;
33
mod sender;
44

5-
pub use limited_queue::{LimitedReceiver, LimitedSender, SendError, limited};
5+
pub use limited_queue::{
6+
ChannelMetricMetadata, LimitedReceiver, LimitedSender, SendError, limited,
7+
};
68
pub use receiver::*;
79
pub use sender::*;
810

lib/vector-buffers/src/topology/channel/tests.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ where
9090
#[tokio::test]
9191
async fn test_sender_block() {
9292
// Get a non-overflow buffer in blocking mode with a capacity of 3.
93-
let (mut tx, rx, _) = build_buffer(3, WhenFull::Block, None).await;
93+
let (mut tx, rx, _) = build_buffer(3, WhenFull::Block, None);
9494

9595
// We should be able to send three messages through unimpeded.
9696
assert_current_send_capacity(&mut tx, Some(3), None);
@@ -113,7 +113,7 @@ async fn test_sender_block() {
113113
#[tokio::test]
114114
async fn test_sender_drop_newest() {
115115
// Get a non-overflow buffer in "drop newest" mode with a capacity of 3.
116-
let (mut tx, rx, _) = build_buffer(3, WhenFull::DropNewest, None).await;
116+
let (mut tx, rx, _) = build_buffer(3, WhenFull::DropNewest, None);
117117

118118
// We should be able to send three messages through unimpeded.
119119
assert_current_send_capacity(&mut tx, Some(3), None);
@@ -138,7 +138,7 @@ async fn test_sender_drop_newest() {
138138
async fn test_sender_overflow_block() {
139139
// Get an overflow buffer, where the overflow buffer is in blocking mode, and both the base
140140
// and overflow buffers have a capacity of 2.
141-
let (mut tx, rx, _) = build_buffer(2, WhenFull::Overflow, Some(WhenFull::Block)).await;
141+
let (mut tx, rx, _) = build_buffer(2, WhenFull::Overflow, Some(WhenFull::Block));
142142

143143
// We should be able to send four message through unimpeded -- two for the base sender, and
144144
// two for the overflow sender.
@@ -164,7 +164,7 @@ async fn test_sender_overflow_block() {
164164
async fn test_sender_overflow_drop_newest() {
165165
// Get an overflow buffer, where the overflow buffer is in "drop newest" mode, and both the
166166
// base and overflow buffers have a capacity of 2.
167-
let (mut tx, rx, _) = build_buffer(2, WhenFull::Overflow, Some(WhenFull::DropNewest)).await;
167+
let (mut tx, rx, _) = build_buffer(2, WhenFull::Overflow, Some(WhenFull::DropNewest));
168168

169169
// We should be able to send four message through unimpeded -- two for the base sender, and
170170
// two for the overflow sender.
@@ -190,7 +190,7 @@ async fn test_sender_overflow_drop_newest() {
190190
#[tokio::test]
191191
async fn test_buffer_metrics_normal() {
192192
// Get a regular blocking buffer.
193-
let (mut tx, rx, handle) = build_buffer(5, WhenFull::Block, None).await;
193+
let (mut tx, rx, handle) = build_buffer(5, WhenFull::Block, None);
194194

195195
// Send three items through, and make sure the buffer usage stats reflect that.
196196
assert_current_send_capacity(&mut tx, Some(5), None);
@@ -217,7 +217,7 @@ async fn test_buffer_metrics_normal() {
217217
#[tokio::test]
218218
async fn test_buffer_metrics_drop_newest() {
219219
// Get a buffer that drops the newest items when full.
220-
let (mut tx, rx, handle) = build_buffer(2, WhenFull::DropNewest, None).await;
220+
let (mut tx, rx, handle) = build_buffer(2, WhenFull::DropNewest, None);
221221

222222
// Send three items through, and make sure the buffer usage stats reflect that.
223223
assert_current_send_capacity(&mut tx, Some(2), None);

lib/vector-buffers/src/topology/test_util.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ impl error::Error for BasicError {}
137137
/// If `mode` is set to `WhenFull::Overflow`, then the buffer will be set to overflow mode, with
138138
/// another in-memory channel buffer being used as the overflow buffer. The overflow buffer will
139139
/// also use the same capacity as the outer buffer.
140-
pub(crate) async fn build_buffer(
140+
pub(crate) fn build_buffer(
141141
capacity: usize,
142142
mode: WhenFull,
143143
overflow_mode: Option<WhenFull>,
@@ -154,27 +154,25 @@ pub(crate) async fn build_buffer(
154154
NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
155155
overflow_mode,
156156
handle.clone(),
157-
)
158-
.await;
157+
None,
158+
);
159159
let (mut base_sender, mut base_receiver) = TopologyBuilder::standalone_memory_test(
160160
NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
161161
WhenFull::Overflow,
162162
handle.clone(),
163-
)
164-
.await;
163+
None,
164+
);
165165
base_sender.switch_to_overflow(overflow_sender);
166166
base_receiver.switch_to_overflow(overflow_receiver);
167167

168168
(base_sender, base_receiver)
169169
}
170-
m => {
171-
TopologyBuilder::standalone_memory_test(
172-
NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
173-
m,
174-
handle.clone(),
175-
)
176-
.await
177-
}
170+
m => TopologyBuilder::standalone_memory_test(
171+
NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
172+
m,
173+
handle.clone(),
174+
None,
175+
),
178176
};
179177

180178
(tx, rx, handle)

lib/vector-buffers/src/variants/in_memory.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{error::Error, num::NonZeroUsize};
1+
use std::error::Error;
22

33
use async_trait::async_trait;
44

@@ -21,7 +21,8 @@ impl MemoryBuffer {
2121
MemoryBuffer { capacity }
2222
}
2323

24-
pub fn with_max_events(n: NonZeroUsize) -> Self {
24+
#[cfg(test)]
25+
pub fn with_max_events(n: std::num::NonZeroUsize) -> Self {
2526
Self {
2627
capacity: MemoryBufferSize::MaxEvents(n),
2728
}

0 commit comments

Comments
 (0)