Skip to content

Commit f1af5e3

Browse files
committed
enhancement(observability): Add transform latency metrics
This adds the `component_latency_seconds` histogram and `component_latency_mean_seconds` gauge internal metrics, exposing the time an event spends in a single transform including the transform buffer.
1 parent d0a6823 commit f1af5e3

File tree

10 files changed

+277
-5
lines changed

10 files changed

+277
-5
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Added the `component_latency_seconds` histogram and
2+
`component_latency_mean_seconds` gauge internal metrics, exposing the time an
3+
event spends in a single transform including the transform buffer.
4+
5+
authors: bruceg

lib/vector-core/src/latency.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use std::time::Instant;
2+
3+
use metrics::{Histogram, gauge, histogram};
4+
use vector_common::stats::EwmaGauge;
5+
6+
use crate::event::EventArray;
7+
8+
const COMPONENT_LATENCY: &str = "component_latency_seconds";
9+
const COMPONENT_LATENCY_MEAN: &str = "component_latency_mean_seconds";
10+
const DEFAULT_LATENCY_EWMA_ALPHA: f64 = 0.9;
11+
12+
#[derive(Debug)]
13+
pub struct LatencyRecorder {
14+
histogram: Histogram,
15+
gauge: EwmaGauge,
16+
}
17+
18+
impl LatencyRecorder {
19+
pub fn new(ewma_alpha: Option<f64>) -> Self {
20+
Self {
21+
histogram: histogram!(COMPONENT_LATENCY),
22+
gauge: EwmaGauge::new(
23+
gauge!(COMPONENT_LATENCY_MEAN),
24+
ewma_alpha.or(Some(DEFAULT_LATENCY_EWMA_ALPHA)),
25+
),
26+
}
27+
}
28+
29+
pub fn on_send(&self, events: &mut EventArray) {
30+
let now = Instant::now();
31+
let mut sum = 0.0;
32+
let mut count = 0usize;
33+
34+
// Since all of the events in the array will most likely have entered and exited the
35+
// component at close to the same time, we average all the latencies over the entire array
36+
// and record it just once in the EWMA-backed gauge. If we were to record each latency
37+
// individually, the gauge would effectively just reflect the latest array's latency,
38+
// eliminating the utility of the EWMA averaging. However, we record the individual
39+
// latencies in the histogram to get a more granular view of the latency distribution.
40+
for mut event in events.iter_events_mut() {
41+
let metadata = event.metadata_mut();
42+
if let Some(previous) = metadata.last_transform_timestamp() {
43+
let latency = now.saturating_duration_since(previous).as_secs_f64();
44+
sum += latency;
45+
count += 1;
46+
self.histogram.record(latency);
47+
}
48+
49+
metadata.set_last_transform_timestamp(now);
50+
}
51+
if count > 0 {
52+
#[expect(
53+
clippy::cast_precision_loss,
54+
reason = "losing precision is acceptable here"
55+
)]
56+
let mean = sum / count as f64;
57+
self.gauge.record(mean);
58+
}
59+
}
60+
}

lib/vector-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub mod config;
3131
pub mod event;
3232
pub mod fanout;
3333
pub mod ipallowlist;
34+
pub mod latency;
3435
pub mod metrics;
3536
pub mod partition;
3637
pub mod schema;

lib/vector-core/src/transform/outputs.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,19 @@ impl TransformOutputsBuf {
220220
pub fn take_all_named(&mut self) -> HashMap<String, OutputBuffer> {
221221
std::mem::take(&mut self.named_buffers)
222222
}
223+
224+
/// Applies `f` to each [`EventArray`] currently buffered in this outputs buffer.
225+
///
226+
/// This is useful for cross-cutting instrumentation (e.g. latency timestamp propagation)
227+
/// that needs mutable access to the buffered arrays before they are sent.
228+
pub fn for_each_array_mut(&mut self, mut f: impl FnMut(&mut EventArray)) {
229+
if let Some(primary) = self.primary_buffer.as_mut() {
230+
primary.for_each_array_mut(&mut f);
231+
}
232+
for buf in self.named_buffers.values_mut() {
233+
buf.for_each_array_mut(&mut f);
234+
}
235+
}
223236
}
224237

225238
impl ByteSizeOf for TransformOutputsBuf {
@@ -295,6 +308,13 @@ impl OutputBuffer {
295308
self.0.drain(..).flat_map(EventArray::into_events)
296309
}
297310

311+
/// Applies `f` to each [`EventArray`] currently held by this buffer.
312+
pub fn for_each_array_mut(&mut self, mut f: impl FnMut(&mut EventArray)) {
313+
for array in &mut self.0 {
314+
f(array);
315+
}
316+
}
317+
298318
async fn send(
299319
&mut self,
300320
output: &mut Fanout,

lib/vector-lib/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ pub use vector_config::impl_generate_config_from_default;
2121
pub use vector_core::compile_vrl;
2222
pub use vector_core::{
2323
EstimatedJsonEncodedSizeOf, buckets, default_data_dir, emit, event, fanout, ipallowlist,
24-
metric_tags, metrics, partition, quantiles, register, samples, schema, serde, sink, source,
25-
source_sender, tcp, tls, transform,
24+
latency, metric_tags, metrics, partition, quantiles, register, samples, schema, serde, sink,
25+
source, source_sender, tcp, tls, transform,
2626
};
2727
pub use vector_lookup as lookup;
2828
pub use vector_stream as stream;

src/topology/builder.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use vector_lib::{
2626
},
2727
},
2828
internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
29+
latency::LatencyRecorder,
2930
schema::Definition,
3031
source_sender::{CHUNK_SIZE, SourceSenderItem},
3132
transform::update_runtime_schema_definition,
@@ -742,7 +743,14 @@ impl<'a> Builder<'a> {
742743
let sender = self
743744
.utilization_registry
744745
.add_component(node.key.clone(), gauge!("utilization"));
745-
let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
746+
let runner = Runner::new(
747+
t,
748+
input_rx,
749+
sender,
750+
node.input_details.data_type(),
751+
outputs,
752+
LatencyRecorder::new(self.config.global.latency_ewma_alpha),
753+
);
746754
let transform = if node.enable_concurrency {
747755
runner.run_concurrently().boxed()
748756
} else {
@@ -807,6 +815,7 @@ impl<'a> Builder<'a> {
807815
component: key.clone(),
808816
port: None,
809817
});
818+
let latency_recorder = LatencyRecorder::new(self.config.global.latency_ewma_alpha);
810819

811820
// Task transforms can only write to the default output, so only a single schema def map is needed
812821
let schema_definition_map = outputs
@@ -825,6 +834,7 @@ impl<'a> Builder<'a> {
825834
for event in events.iter_events_mut() {
826835
update_runtime_schema_definition(event, &output_id, &schema_definition_map);
827836
}
837+
latency_recorder.on_send(&mut events);
828838
(events, Instant::now())
829839
})
830840
.inspect(move |(events, _): &(EventArray, Instant)| {
@@ -1110,6 +1120,7 @@ struct Runner {
11101120
input_type: DataType,
11111121
outputs: TransformOutputs,
11121122
timer_tx: UtilizationComponentSender,
1123+
latency_recorder: LatencyRecorder,
11131124
events_received: Registered<EventsReceived>,
11141125
}
11151126

@@ -1120,13 +1131,15 @@ impl Runner {
11201131
timer_tx: UtilizationComponentSender,
11211132
input_type: DataType,
11221133
outputs: TransformOutputs,
1134+
latency_recorder: LatencyRecorder,
11231135
) -> Self {
11241136
Self {
11251137
transform,
11261138
input_rx: Some(input_rx),
11271139
input_type,
11281140
outputs,
11291141
timer_tx,
1142+
latency_recorder,
11301143
events_received: register!(EventsReceived),
11311144
}
11321145
}
@@ -1142,6 +1155,7 @@ impl Runner {
11421155

11431156
async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
11441157
self.timer_tx.try_send_start_wait();
1158+
outputs_buf.for_each_array_mut(|array| self.latency_recorder.on_send(array));
11451159
self.outputs.send(outputs_buf).await
11461160
}
11471161

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
use std::time::Instant;
2+
use tokio::{
3+
sync::oneshot,
4+
time::{Duration, timeout},
5+
};
6+
use vector_lib::metrics::Controller;
7+
8+
use crate::{
9+
config::Config,
10+
event::{Event, LogEvent, Metric, MetricValue},
11+
test_util::{
12+
mock::{
13+
basic_source,
14+
sinks::CompletionSinkConfig,
15+
transforms::{NoopTransformConfig, TransformType},
16+
},
17+
start_topology, trace_init,
18+
},
19+
};
20+
21+
const EVENT_COUNT: usize = 100;
22+
const TRANSFORM_DELAY_MS: u64 = 10;
23+
const SOURCE_ID: &str = "latency_source";
24+
const TRANSFORM_ID: &str = "latency_delay";
25+
const TRANSFORM_TYPE: &str = "test_noop";
26+
const TRANSFORM_KIND: &str = "transform";
27+
const SINK_ID: &str = "latency_sink";
28+
29+
struct LatencyTestRun {
30+
metrics: Vec<Metric>,
31+
elapsed_time: f64,
32+
}
33+
34+
#[tokio::test]
35+
async fn component_latency_metrics_emitted() {
36+
let run = run_latency_topology().await;
37+
38+
assert_histogram_count(
39+
&run.metrics,
40+
"component_latency_seconds",
41+
has_component_tags,
42+
);
43+
assert_gauge_range(
44+
&run.metrics,
45+
"component_latency_mean_seconds",
46+
has_component_tags,
47+
TRANSFORM_DELAY_MS as f64 / 1000.0,
48+
run.elapsed_time,
49+
);
50+
}
51+
52+
async fn run_latency_topology() -> LatencyTestRun {
53+
trace_init();
54+
55+
let controller = Controller::get().expect("metrics controller");
56+
controller.reset();
57+
58+
let (mut source_tx, source_config) = basic_source();
59+
let transform_config =
60+
NoopTransformConfig::from(TransformType::Task).with_delay_ms(TRANSFORM_DELAY_MS);
61+
let (sink_done_tx, sink_done_rx) = oneshot::channel();
62+
let sink_config = CompletionSinkConfig::new(EVENT_COUNT, sink_done_tx);
63+
64+
let mut config = Config::builder();
65+
config.add_source(SOURCE_ID, source_config);
66+
config.add_transform(TRANSFORM_ID, &[SOURCE_ID], transform_config);
67+
config.add_sink(SINK_ID, &[TRANSFORM_ID], sink_config);
68+
69+
let start_time = Instant::now();
70+
let (topology, _) = start_topology(config.build().unwrap(), false).await;
71+
72+
for idx in 0..EVENT_COUNT {
73+
let event = Event::Log(LogEvent::from(format!("payload-{idx}")));
74+
source_tx.send_event(event).await.unwrap();
75+
}
76+
77+
drop(source_tx);
78+
79+
let completed = timeout(Duration::from_secs(5), sink_done_rx)
80+
.await
81+
.expect("timed out waiting for completion sink to finish")
82+
.expect("completion sink sender dropped");
83+
assert!(
84+
completed,
85+
"completion sink finished before receiving all events"
86+
);
87+
88+
topology.stop().await;
89+
let elapsed_time = start_time.elapsed().as_secs_f64();
90+
91+
LatencyTestRun {
92+
metrics: controller.capture_metrics(),
93+
elapsed_time,
94+
}
95+
}
96+
97+
fn assert_histogram_count(metrics: &[Metric], metric_name: &str, tags_match: fn(&Metric) -> bool) {
98+
let histogram = metrics
99+
.iter()
100+
.find(|metric| metric.name() == metric_name && tags_match(metric))
101+
.unwrap_or_else(|| panic!("{metric_name} histogram missing"));
102+
103+
match histogram.value() {
104+
MetricValue::AggregatedHistogram { count, .. } => {
105+
assert_eq!(
106+
*count, EVENT_COUNT as u64,
107+
"histogram count should match number of events"
108+
);
109+
}
110+
other => panic!("expected aggregated histogram, got {other:?}"),
111+
}
112+
}
113+
114+
fn assert_gauge_range(
115+
metrics: &[Metric],
116+
metric_name: &str,
117+
tags_match: fn(&Metric) -> bool,
118+
expected_min: f64,
119+
elapsed_time: f64,
120+
) {
121+
let gauge = metrics
122+
.iter()
123+
.find(|metric| metric.name() == metric_name && tags_match(metric))
124+
.unwrap_or_else(|| panic!("{metric_name} gauge missing"));
125+
126+
match gauge.value() {
127+
MetricValue::Gauge { value } => {
128+
assert!(
129+
*value >= expected_min,
130+
"expected mean latency to be >= {expected_min}, got {value}"
131+
);
132+
assert!(
133+
*value < elapsed_time,
134+
"expected mean latency ({value}) to be less than elapsed time ({elapsed_time})"
135+
);
136+
}
137+
other => panic!("expected gauge metric, got {other:?}"),
138+
}
139+
}
140+
141+
fn has_component_tags(metric: &Metric) -> bool {
142+
metric.tags().is_some_and(|tags| {
143+
tags.get("component_id") == Some(TRANSFORM_ID)
144+
&& tags.get("component_type") == Some(TRANSFORM_TYPE)
145+
&& tags.get("component_kind") == Some(TRANSFORM_KIND)
146+
})
147+
}

src/topology/test/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ mod crash;
3939
mod doesnt_reload;
4040
#[cfg(all(feature = "sources-http_server", feature = "sinks-http"))]
4141
mod end_to_end;
42+
mod latency_metrics;
4243
#[cfg(all(
4344
feature = "sources-prometheus",
4445
feature = "sinks-prometheus",

website/cue/reference/components/sources/internal_metrics.cue

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,28 @@ components: sources: internal_metrics: {
273273
reason: _reason
274274
}
275275
}
276+
component_latency_seconds: {
277+
description: """
278+
The elapsed time, in fractional seconds, that an event spends in a single transform.
279+
280+
This includes both the time spent queued in the transform’s input buffer and the time spent executing the transform itself.
281+
"""
282+
type: "histogram"
283+
default_namespace: "vector"
284+
tags: _internal_metrics_tags
285+
}
286+
component_latency_mean_seconds: {
287+
description: """
288+
The mean elapsed time, in fractional seconds, that an event spends in a single transform.
289+
290+
This includes both the time spent queued in the transform’s input buffer and the time spent executing the transform itself.
291+
292+
This value is smoothed over time using an exponentially weighted moving average (EWMA).
293+
"""
294+
type: "gauge"
295+
default_namespace: "vector"
296+
tags: _internal_metrics_tags
297+
}
276298
buffer_byte_size: {
277299
description: "The number of bytes currently in the buffer."
278300
type: "gauge"

website/cue/reference/components/transforms.cue

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ components: transforms: [Name=string]: {
1515
telemetry: metrics: {
1616
component_discarded_events_total: components.sources.internal_metrics.output.metrics.component_discarded_events_total
1717
component_errors_total: components.sources.internal_metrics.output.metrics.component_errors_total
18+
component_latency_mean_seconds: components.sources.internal_metrics.output.metrics.component_latency_mean_seconds
19+
component_latency_seconds: components.sources.internal_metrics.output.metrics.component_latency_seconds
20+
component_received_event_bytes_total: components.sources.internal_metrics.output.metrics.component_received_event_bytes_total
1821
component_received_events_count: components.sources.internal_metrics.output.metrics.component_received_events_count
1922
component_received_events_total: components.sources.internal_metrics.output.metrics.component_received_events_total
20-
component_received_event_bytes_total: components.sources.internal_metrics.output.metrics.component_received_event_bytes_total
21-
component_sent_events_total: components.sources.internal_metrics.output.metrics.component_sent_events_total
2223
component_sent_event_bytes_total: components.sources.internal_metrics.output.metrics.component_sent_event_bytes_total
24+
component_sent_events_total: components.sources.internal_metrics.output.metrics.component_sent_events_total
2325
transform_buffer_max_byte_size: components.sources.internal_metrics.output.metrics.transform_buffer_max_byte_size
2426
transform_buffer_max_event_size: components.sources.internal_metrics.output.metrics.transform_buffer_max_event_size
2527
transform_buffer_max_size_bytes: components.sources.internal_metrics.output.metrics.transform_buffer_max_size_bytes

0 commit comments

Comments
 (0)