Skip to content

Commit 9d2d2ee

Browse files
committed
enhancement(observability): Add metrics to measure transform processing time
1 parent f5d0c56 commit 9d2d2ee

File tree

13 files changed

+215
-48
lines changed

13 files changed

+215
-48
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Added the `component_processing_time_seconds` histogram and
2+
`component_processing_time_mean_seconds` gauge internal_metrics, exposing the
3+
time an event spends in a single transform including the transform buffer.
4+
5+
authors: bruceg

lib/vector-buffers/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}
112112
pub trait BufferInstrumentation<T: Bufferable>: Send + Sync + 'static {
113113
/// Called immediately before the item is emitted to the underlying buffer.
114114
/// The underlying type is stored in an `Arc`, so we cannot have `&mut self`.
115-
fn on_send(&self, item: &T);
115+
fn on_send(&self, item: &mut T);
116116
}
117117

118118
pub trait EventCount {

lib/vector-buffers/src/topology/channel/sender.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,13 @@ impl<T: Bufferable> BufferSender<T> {
205205
}
206206

207207
#[async_recursion]
208-
pub async fn send(&mut self, item: T, send_reference: Option<Instant>) -> crate::Result<()> {
208+
pub async fn send(
209+
&mut self,
210+
mut item: T,
211+
send_reference: Option<Instant>,
212+
) -> crate::Result<()> {
209213
if let Some(instrumentation) = self.custom_instrumentation.as_ref() {
210-
instrumentation.on_send(&item);
214+
instrumentation.on_send(&mut item);
211215
}
212216
let item_sizing = self
213217
.usage_instrumentation

lib/vector-core/src/config/global_options.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,10 @@ pub struct GlobalOptions {
156156
/// The alpha value for the exponential weighted moving average (EWMA) of transform processing
157157
/// time metrics.
158158
///
159-
/// This controls how quickly the `event_processing_time_mean_seconds` gauge responds to new
160-
/// observations. Values closer to 1.0 retain more of the previous value, leading to slower
161-
/// adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements.
159+
/// This controls how quickly the `component_processing_time_mean_seconds` and
160+
/// `event_processing_time_mean_seconds` gauges respond to new observations. Values closer to
161+
/// 1.0 retain more of the previous value, leading to slower adjustments. The default value of
162+
/// 0.9 is equivalent to a "half life" of 6-7 measurements.
162163
///
163164
/// Must be between 0 and 1 exclusively (0 < alpha < 1).
164165
#[serde(default, skip_serializing_if = "crate::serde::is_default")]

lib/vector-core/src/event/array.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
use std::{iter, slice, sync::Arc, vec};
66

7+
use chrono::Utc;
78
use futures::{Stream, stream};
89
#[cfg(test)]
910
use quickcheck::{Arbitrary, Gen};
@@ -172,6 +173,15 @@ impl EventArray {
172173
}
173174
}
174175

176+
/// Sets the `last_transform_timestamp` in the metadata for all events in this array. This is
177+
/// used to initialize the timestamp before the first transform so that component processing
178+
/// time can be measured.
179+
pub fn set_last_transform_timestamp(&mut self, timestamp: chrono::DateTime<Utc>) {
180+
for mut event in self.iter_events_mut() {
181+
event.metadata_mut().set_last_transform_timestamp(timestamp);
182+
}
183+
}
184+
175185
/// Iterate over references to this array's events.
176186
pub fn iter_events(&self) -> impl Iterator<Item = EventRef<'_>> {
177187
match self {

lib/vector-core/src/event/metadata.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ pub(super) struct Inner {
8484
#[derivative(PartialEq = "ignore")]
8585
#[serde(default, skip)]
8686
pub(crate) ingest_timestamp: Option<DateTime<Utc>>,
87+
88+
/// The timestamp when the event last entered a transform buffer.
89+
#[derivative(PartialEq = "ignore")]
90+
#[serde(default, skip)]
91+
pub(crate) last_transform_timestamp: Option<DateTime<Utc>>,
8792
}
8893

8994
/// Metric Origin metadata for submission to Datadog.
@@ -256,6 +261,17 @@ impl EventMetadata {
256261
pub fn set_ingest_timestamp(&mut self, timestamp: DateTime<Utc>) {
257262
self.get_mut().ingest_timestamp = Some(timestamp);
258263
}
264+
265+
/// Returns the timestamp of the last transform buffer enqueue operation, if it exists.
266+
#[must_use]
267+
pub fn last_transform_timestamp(&self) -> Option<DateTime<Utc>> {
268+
self.0.last_transform_timestamp
269+
}
270+
271+
/// Sets the transform enqueue timestamp to the provided value.
272+
pub fn set_last_transform_timestamp(&mut self, timestamp: DateTime<Utc>) {
273+
self.get_mut().last_transform_timestamp = Some(timestamp);
274+
}
259275
}
260276

261277
impl Default for Inner {
@@ -272,6 +288,7 @@ impl Default for Inner {
272288
datadog_origin_metadata: None,
273289
source_event_id: Some(Uuid::new_v4()),
274290
ingest_timestamp: None,
291+
last_transform_timestamp: None,
275292
}
276293
}
277294
}

lib/vector-core/src/event/proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,7 @@ impl From<Metadata> for EventMetadata {
689689
datadog_origin_metadata,
690690
source_event_id,
691691
ingest_timestamp: None,
692+
last_transform_timestamp: None,
692693
}))
693694
}
694695
}

src/topology/builder.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
time::Instant,
77
};
88

9+
use chrono::Utc;
910
use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
1011
use futures_util::stream::FuturesUnordered;
1112
use metrics::gauge;
@@ -35,7 +36,7 @@ use vector_vrl_metrics::MetricsStorage;
3536
use super::{
3637
BuiltBuffer, ConfigDiff,
3738
fanout::{self, Fanout},
38-
processing_time::ProcessingTimeRecorder,
39+
processing_time::{ComponentProcessingTimeRecorder, ProcessingTimeRecorder},
3940
schema,
4041
task::{Task, TaskOutput, TaskResult},
4142
};
@@ -289,6 +290,7 @@ impl<'a> Builder<'a> {
289290
{
290291
array.set_output_id(&source);
291292
array.set_source_type(source_type);
293+
array.set_last_transform_timestamp(Utc::now());
292294
fanout
293295
.send(array, Some(send_reference))
294296
.await
@@ -525,14 +527,18 @@ impl<'a> Builder<'a> {
525527
};
526528

527529
let metrics = ChannelMetricMetadata::new(TRANSFORM_CHANNEL_METRIC_PREFIX, None);
528-
let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
530+
let (mut input_tx, input_rx) = TopologyBuilder::standalone_memory(
529531
TOPOLOGY_BUFFER_SIZE,
530532
WhenFull::Block,
531533
&span,
532534
Some(metrics),
533535
self.config.global.buffer_utilization_ewma_alpha,
534536
);
535537

538+
input_tx.with_custom_instrumentation(ComponentProcessingTimeRecorder::new(
539+
self.config.global.processing_time_ewma_alpha,
540+
));
541+
536542
self.inputs
537543
.insert(key.clone(), (input_tx, node.inputs.clone()));
538544

src/topology/processing_time.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use crate::{config::ComponentKey, event::EventArray};
1111
const NANOS_PER_SECOND: f64 = 1_000_000_000.0;
1212
const EVENT_PROCESSING_TIME: &str = "event_processing_time_seconds";
1313
const EVENT_PROCESSING_TIME_MEAN: &str = "event_processing_time_mean_seconds";
14+
const COMPONENT_PROCESSING_TIME: &str = "component_processing_time_seconds";
15+
const COMPONENT_PROCESSING_TIME_MEAN: &str = "component_processing_time_mean_seconds";
1416
const DEFAULT_PROCESSING_TIME_EWMA_ALPHA: f64 = 0.9;
1517

1618
#[cfg(test)]
@@ -87,7 +89,7 @@ impl ProcessingTimeRecorder {
8789
}
8890

8991
impl BufferInstrumentation<EventArray> for ProcessingTimeRecorder {
90-
fn on_send(&self, events: &EventArray) {
92+
fn on_send(&self, events: &mut EventArray) {
9193
self.record_events(events);
9294
}
9395
}
@@ -124,6 +126,47 @@ impl Metrics {
124126
}
125127
}
126128

129+
#[derive(Debug)]
130+
pub(crate) struct ComponentProcessingTimeRecorder {
131+
histogram: Histogram,
132+
gauge: EwmaGauge,
133+
}
134+
135+
impl ComponentProcessingTimeRecorder {
136+
pub(crate) fn new(ewma_alpha: Option<f64>) -> Self {
137+
Self {
138+
histogram: histogram!(COMPONENT_PROCESSING_TIME),
139+
gauge: EwmaGauge::new(
140+
gauge!(COMPONENT_PROCESSING_TIME_MEAN),
141+
Some(ewma_alpha.unwrap_or(DEFAULT_PROCESSING_TIME_EWMA_ALPHA)),
142+
),
143+
}
144+
}
145+
146+
fn record(&self, latency_seconds: f64) {
147+
self.histogram.record(latency_seconds);
148+
self.gauge.record(latency_seconds);
149+
}
150+
}
151+
152+
impl BufferInstrumentation<EventArray> for ComponentProcessingTimeRecorder {
153+
fn on_send(&self, events: &mut EventArray) {
154+
let now = Utc::now();
155+
156+
for mut event in events.iter_events_mut() {
157+
let metadata = event.metadata_mut();
158+
if let Some(previous) = metadata.last_transform_timestamp()
159+
&& let Some(latency_ns) = now.signed_duration_since(previous).num_nanoseconds()
160+
&& latency_ns >= 0
161+
{
162+
self.record(latency_ns as f64 / NANOS_PER_SECOND);
163+
}
164+
165+
metadata.set_last_transform_timestamp(now);
166+
}
167+
}
168+
}
169+
127170
#[cfg(test)]
128171
mod tests {
129172
use super::*;

src/topology/test/processing_time.rs

Lines changed: 76 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,67 @@ use crate::{
1414
},
1515
};
1616

17+
const EVENT_COUNT: usize = 100;
18+
const SOURCE_ID: &str = "latency_source";
19+
const TRANSFORM_ID: &str = "latency_delay";
20+
const TRANSFORM_TYPE: &str = "test_noop";
21+
const TRANSFORM_KIND: &str = "transform";
22+
const SINK_ID: &str = "latency_sink";
23+
24+
struct ProcessingTimeTestRun {
25+
metrics: Vec<Metric>,
26+
elapsed_time: f64,
27+
}
28+
1729
#[tokio::test]
1830
async fn sink_processing_time_metrics_emitted() {
31+
let run = run_processing_time_topology().await;
32+
33+
assert_histogram_count(
34+
&run.metrics,
35+
"event_processing_time_seconds",
36+
has_latency_tags,
37+
);
38+
assert_gauge_range(
39+
&run.metrics,
40+
"event_processing_time_mean_seconds",
41+
has_latency_tags,
42+
run.elapsed_time,
43+
);
44+
}
45+
46+
#[tokio::test]
47+
async fn component_processing_time_metrics_emitted() {
48+
let run = run_processing_time_topology().await;
49+
50+
assert_histogram_count(
51+
&run.metrics,
52+
"component_processing_time_seconds",
53+
has_component_tags,
54+
);
55+
assert_gauge_range(
56+
&run.metrics,
57+
"component_processing_time_mean_seconds",
58+
has_component_tags,
59+
run.elapsed_time,
60+
);
61+
}
62+
63+
async fn run_processing_time_topology() -> ProcessingTimeTestRun {
1964
trace_init();
2065

2166
let controller = Controller::get().expect("metrics controller");
2267
controller.reset();
2368

24-
const EVENT_COUNT: usize = 100;
25-
2669
let (mut source_tx, source_config) = basic_source();
2770
let transform_config = noop_transform();
2871
let (sink_done_tx, sink_done_rx) = oneshot::channel();
2972
let sink_config = completion_sink(EVENT_COUNT, sink_done_tx);
3073

3174
let mut config = Config::builder();
32-
config.add_source("latency_source", source_config);
33-
config.add_transform("latency_delay", &["latency_source"], transform_config);
34-
config.add_sink("latency_sink", &["latency_delay"], sink_config);
75+
config.add_source(SOURCE_ID, source_config);
76+
config.add_transform(TRANSFORM_ID, &[SOURCE_ID], transform_config);
77+
config.add_sink(SINK_ID, &[TRANSFORM_ID], sink_config);
3578

3679
let start_time = Instant::now();
3780
let (topology, _) = start_topology(config.build().unwrap(), false).await;
@@ -55,17 +98,17 @@ async fn sink_processing_time_metrics_emitted() {
5598
topology.stop().await;
5699
let elapsed_time = start_time.elapsed().as_secs_f64();
57100

58-
let metrics = controller.capture_metrics();
59-
let sink_id = "latency_sink";
60-
let source_id = "latency_source";
101+
ProcessingTimeTestRun {
102+
metrics: controller.capture_metrics(),
103+
elapsed_time,
104+
}
105+
}
61106

107+
fn assert_histogram_count(metrics: &[Metric], metric_name: &str, tags_match: fn(&Metric) -> bool) {
62108
let histogram = metrics
63109
.iter()
64-
.find(|metric| {
65-
metric.name() == "event_processing_time_seconds"
66-
&& has_latency_tags(metric, sink_id, source_id)
67-
})
68-
.expect("event_processing_time_seconds histogram missing");
110+
.find(|metric| metric.name() == metric_name && tags_match(metric))
111+
.unwrap_or_else(|| panic!("{metric_name} histogram missing"));
69112

70113
match histogram.value() {
71114
MetricValue::AggregatedHistogram { count, .. } => {
@@ -76,14 +119,18 @@ async fn sink_processing_time_metrics_emitted() {
76119
}
77120
other => panic!("expected aggregated histogram, got {other:?}"),
78121
}
122+
}
79123

124+
fn assert_gauge_range(
125+
metrics: &[Metric],
126+
metric_name: &str,
127+
tags_match: fn(&Metric) -> bool,
128+
elapsed_time: f64,
129+
) {
80130
let gauge = metrics
81131
.iter()
82-
.find(|metric| {
83-
metric.name() == "event_processing_time_mean_seconds"
84-
&& has_latency_tags(metric, sink_id, source_id)
85-
})
86-
.expect("event_processing_time_mean_seconds gauge missing");
132+
.find(|metric| metric.name() == metric_name && tags_match(metric))
133+
.unwrap_or_else(|| panic!("{metric_name} gauge missing"));
87134

88135
match gauge.value() {
89136
MetricValue::Gauge { value } => {
@@ -100,9 +147,17 @@ async fn sink_processing_time_metrics_emitted() {
100147
}
101148
}
102149

103-
fn has_latency_tags(metric: &Metric, sink: &str, source: &str) -> bool {
150+
fn has_latency_tags(metric: &Metric) -> bool {
151+
metric.tags().is_some_and(|tags| {
152+
tags.get("source_component_id") == Some(SOURCE_ID)
153+
&& tags.get("sink_component_id") == Some(SINK_ID)
154+
})
155+
}
156+
157+
fn has_component_tags(metric: &Metric) -> bool {
104158
metric.tags().is_some_and(|tags| {
105-
tags.get("source_component_id") == Some(source)
106-
&& tags.get("sink_component_id") == Some(sink)
159+
tags.get("component_id") == Some(TRANSFORM_ID)
160+
&& tags.get("component_type") == Some(TRANSFORM_TYPE)
161+
&& tags.get("component_kind") == Some(TRANSFORM_KIND)
107162
})
108163
}

0 commit comments

Comments
 (0)