Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
5616c4e
add incremental_to_absolute
GreyLilac09 Jul 14, 2025
9c6e6dc
fix docs
GreyLilac09 Jul 14, 2025
239f7f3
fix tpos
GreyLilac09 Jul 14, 2025
284d9db
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Jul 15, 2025
0d6c9f1
use sync
GreyLilac09 Jul 15, 2025
9d55415
fix extra aggregate
GreyLilac09 Jul 16, 2025
8dedf1c
remove interval_ms
GreyLilac09 Jul 16, 2025
0ee9cb1
address comments
GreyLilac09 Jul 16, 2025
3cc9d8b
Merge branch 'master' into greylilac09/add-incremental-to-absolute
thomasqueirozb Jul 18, 2025
b1a18aa
use lru cache
GreyLilac09 Jul 24, 2025
0761542
use max_bytes or max_events
GreyLilac09 Jul 24, 2025
7c6fa3e
add debug print
GreyLilac09 Jul 24, 2025
bbe41f4
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Jul 24, 2025
5596b9c
Merge branch 'greylilac09/add-incremental-to-absolute' of github.com:…
GreyLilac09 Jul 24, 2025
07db954
remove debug print
GreyLilac09 Jul 24, 2025
a1ce197
use original cache impl
GreyLilac09 Jul 25, 2025
51d959a
restore original formatting
GreyLilac09 Jul 25, 2025
00c2388
use lrucache
GreyLilac09 Jul 26, 2025
c8f73d1
make some optimizations and fix clippy
GreyLilac09 Jul 26, 2025
f832a4e
fix incr_to_absolute and absolute_to_incr
GreyLilac09 Jul 26, 2025
7c692d5
fix
GreyLilac09 Jul 26, 2025
72f86a0
add overhead
GreyLilac09 Jul 29, 2025
3cc49f1
simplify free_items
GreyLilac09 Jul 31, 2025
33680bd
Update lib/vector-core/src/event/metric/data.rs
GreyLilac09 Aug 1, 2025
9a94c3f
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 4, 2025
72df8b0
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 5, 2025
024ae54
Make whitespace consistent
thomasqueirozb Aug 6, 2025
00b3714
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 6, 2025
dbf0297
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 6, 2025
7a9eea7
fix
GreyLilac09 Aug 6, 2025
a62402c
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 7, 2025
7ca83cb
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 7, 2025
1640d9d
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 7, 2025
9510c02
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 8, 2025
ff2ccd4
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 11, 2025
ca1d3a3
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 13, 2025
0e620a5
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 13, 2025
76595bc
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 15, 2025
8676451
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 19, 2025
8db2a10
fmt
GreyLilac09 Aug 19, 2025
f596006
add more examples
GreyLilac09 Aug 21, 2025
c8c743c
exclude cache overhead
GreyLilac09 Aug 21, 2025
0d9e9a2
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 22, 2025
59814cb
fix fmt
GreyLilac09 Aug 22, 2025
c1f7988
Merge remote-tracking branch 'origin/master' into greylilac09/add-inc…
pront Aug 26, 2025
1c07bee
add .
pront Aug 26, 2025
69ab4bc
Merge branch 'master' into greylilac09/add-incremental-to-absolute
GreyLilac09 Aug 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ dnstap-parser = { path = "lib/dnstap-parser", optional = true }
fakedata = { path = "lib/fakedata", optional = true }
portpicker = { path = "lib/portpicker" }
tracing-limit = { path = "lib/tracing-limit" }
vector-common = { path = "lib/vector-common", default-features = false}
vector-common = { path = "lib/vector-common", default-features = false }
vector-lib.workspace = true
vector-config.workspace = true
vector-config-common.workspace = true
Expand Down Expand Up @@ -384,7 +384,7 @@ seahash = { version = "4.1.0", default-features = false }
smallvec = { version = "1", default-features = false, features = ["union", "serde"] }
snap = { version = "1.1.1", default-features = false }
socket2 = { version = "0.5.10", default-features = false }
sqlx = { version = "0.8.6", default-features = false, features = ["derive", "postgres", "chrono", "runtime-tokio"], optional=true }
sqlx = { version = "0.8.6", default-features = false, features = ["derive", "postgres", "chrono", "runtime-tokio"], optional = true }
stream-cancel = { version = "0.8.2", default-features = false }
strip-ansi-escapes = { version = "0.2.1", default-features = false }
syslog = { version = "6.1.1", default-features = false, optional = true }
Expand Down Expand Up @@ -691,6 +691,7 @@ transforms-logs = [
transforms-metrics = [
"transforms-aggregate",
"transforms-filter",
"transforms-incremental_to_absolute",
"transforms-log_to_metric",
"transforms-lua",
"transforms-metric_to_log",
Expand All @@ -703,6 +704,7 @@ transforms-aggregate = []
transforms-aws_ec2_metadata = ["dep:arc-swap"]
transforms-dedupe = ["transforms-impl-dedupe"]
transforms-filter = []
transforms-incremental_to_absolute = []
transforms-window = []
transforms-log_to_metric = []
transforms-lua = ["dep:mlua", "vector-lib/lua"]
Expand Down
6 changes: 6 additions & 0 deletions changelog.d/incremental_to_absolute_transform.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Add a new `incremental_to_absolute` transform which converts incremental metrics to absolute metrics. This is useful for
use cases when sending metrics to a sink is lossy or you want to get a historical record of metrics, in which case
incremental metrics may be inaccurate since any gaps in metrics sent will result in an inaccurate reading of the ending
value.

authors: GreyLilac09
227 changes: 227 additions & 0 deletions src/transforms/incremental_to_absolute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
use std::{collections::HashMap, pin::Pin, time::Duration};

use async_stream::stream;
use futures::{Stream, StreamExt};
use vector_lib::config::LogNamespace;
use vector_lib::configurable::configurable_component;

use crate::sinks::util::buffer::metrics::{MetricSet, TtlPolicy};
use crate::{
config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
event::Event,
schema,
transforms::{TaskTransform, Transform},
};

/// Configuration for the `incremental_to_absolute` transform.
#[configurable_component(transform(
"incremental_to_absolute",
"Convert incremental metrics to absolute."
))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct IncrementalToAbsoluteConfig {
/// The amount of time, in seconds, that incremental metrics will persist in the internal metrics cache
/// after having not been updated before they expire and are removed.
#[serde(skip_serializing_if = "crate::serde::is_default")]
#[configurable(metadata(docs::common = false, docs::required = true))]
pub expire_metrics_secs: u64,
}

impl_generate_config_from_default!(IncrementalToAbsoluteConfig);

#[async_trait::async_trait]
#[typetag::serde(name = "aggregate")]
impl TransformConfig for IncrementalToAbsoluteConfig {
async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
IncrementalToAbsolute::new(self).map(Transform::event_task)
}

fn input(&self) -> Input {
Input::metric()
}

fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
_: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
vec![TransformOutput::new(DataType::Metric, HashMap::new())]
}
}
#[derive(Debug)]
pub struct IncrementalToAbsolute {
data: MetricSet,
}

impl IncrementalToAbsolute {
pub fn new(config: &IncrementalToAbsoluteConfig) -> crate::Result<Self> {
Ok(Self {
data: MetricSet::with_ttl_policy(TtlPolicy::new(Duration::from_secs(
config.expire_metrics_secs,
))),
})
}
}

impl TaskTransform<Event> for IncrementalToAbsolute {
fn transform(
mut self: Box<Self>,
mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
) -> Pin<Box<dyn Stream<Item = Event> + Send>>
where
Self: 'static,
{
Box::pin(stream! {
let mut done = false;
while !done {
tokio::select! {
maybe_event = input_rx.next() => {
match maybe_event {
None => done = true,
Some(event) => {
if let Some(metric) = self.data.make_absolute(event.as_metric().clone()) {
// Create a new Event from the metric and yield it
yield Event::Metric(metric);
}
}
}
}
}
}
})
}
}

#[cfg(test)]
mod tests {
use futures_util::SinkExt;
use similar_asserts::assert_eq;
use std::sync::Arc;
use vector_lib::config::ComponentKey;

use super::*;
use crate::event::{
metric::{MetricKind, MetricValue},
Metric,
};

fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
let mut event = Event::Metric(Metric::new(name, kind, value))
.with_source_id(Arc::new(ComponentKey::from("in")))
.with_upstream_id(Arc::new(OutputId::from("transform")));

event.metadata_mut().set_source_type("unit_test_stream");

event
}

async fn assert_metric_eq(
tx: &mut futures::channel::mpsc::Sender<Event>,
mut out_stream: impl Stream<Item = Event> + Unpin,
metric: Event,
expected_metric: Event,
) {
tx.send(metric).await.unwrap();
if let Some(out_event) = out_stream.next().await {
let result = out_event;
assert_eq!(result, expected_metric);
} else {
panic!("Unexpectedly received None in output stream");
}
}

#[tokio::test]
async fn test_incremental_to_absolute() {
let config = toml::from_str::<IncrementalToAbsoluteConfig>(
r#"
expire_metrics_secs = 9999
"#,
)
.unwrap();
let incremental_to_absolute = IncrementalToAbsolute::new(&config)
.map(Transform::event_task)
.unwrap();
let incremental_to_absolute = incremental_to_absolute.into_task();
let (mut tx, rx) = futures::channel::mpsc::channel(10);
let mut out_stream = incremental_to_absolute.transform_events(Box::pin(rx));

let inc_counter_1 = make_metric(
"incremental_counter",
MetricKind::Incremental,
MetricValue::Counter { value: 10.0 },
);
let expected_inc_counter_1 = make_metric(
"incremental_counter",
MetricKind::Absolute,
MetricValue::Counter { value: 10.0 },
);
assert_metric_eq(
&mut tx,
&mut out_stream,
inc_counter_1,
expected_inc_counter_1,
)
.await;

let inc_counter_2 = make_metric(
"incremental_counter",
MetricKind::Incremental,
MetricValue::Counter { value: 10.0 },
);
let expected_inc_counter_2 = make_metric(
"incremental_counter",
MetricKind::Absolute,
MetricValue::Counter { value: 20.0 },
);
assert_metric_eq(
&mut tx,
&mut out_stream,
inc_counter_2,
expected_inc_counter_2,
)
.await;

let inc_counter_3 = make_metric(
"incremental_counter",
MetricKind::Incremental,
MetricValue::Counter { value: 10.0 },
);
let expected_inc_counter_3 = make_metric(
"incremental_counter",
MetricKind::Absolute,
MetricValue::Counter { value: 30.0 },
);
assert_metric_eq(
&mut tx,
&mut out_stream,
inc_counter_3,
expected_inc_counter_3,
)
.await;

// Absolute counters and gauges are emitted unchanged
let gauge = make_metric(
"gauge",
MetricKind::Absolute,
MetricValue::Gauge { value: 42.0 },
);
let expected_gauge = gauge.clone();
assert_metric_eq(&mut tx, &mut out_stream, gauge, expected_gauge).await;

let absolute_counter = make_metric(
"absolute_counter",
MetricKind::Absolute,
MetricValue::Counter { value: 42.0 },
);
let absolute_counter_expected = absolute_counter.clone();
assert_metric_eq(
&mut tx,
&mut out_stream,
absolute_counter,
absolute_counter_expected,
)
.await;
}
}
2 changes: 2 additions & 0 deletions src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub mod aws_ec2_metadata;
mod exclusive_route;
#[cfg(feature = "transforms-filter")]
pub mod filter;
#[cfg(feature = "transforms-incremental_to_absolute")]
pub mod incremental_to_absolute;
#[cfg(feature = "transforms-log_to_metric")]
pub mod log_to_metric;
#[cfg(feature = "transforms-lua")]
Expand Down
29 changes: 16 additions & 13 deletions website/cue/reference/components.cue
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,20 @@ components: {
}

if Args.kind == "transform" {
aggregate?: #FeaturesAggregate
convert?: #FeaturesConvert
enrich?: #FeaturesEnrich
filter?: #FeaturesFilter
parse?: #FeaturesParse
program?: #FeaturesProgram
proxy?: #FeaturesProxy
reduce?: #FeaturesReduce
route?: #FeaturesRoute
exclusive_route?: #FeaturesExclusiveRoute
sanitize?: #FeaturesSanitize
shape?: #FeaturesShape
window?: #FeaturesWindow
aggregate?: #FeaturesAggregate
convert?: #FeaturesConvert
enrich?: #FeaturesEnrich
filter?: #FeaturesFilter
parse?: #FeaturesParse
program?: #FeaturesProgram
proxy?: #FeaturesProxy
reduce?: #FeaturesReduce
route?: #FeaturesRoute
exclusive_route?: #FeaturesExclusiveRoute
sanitize?: #FeaturesSanitize
shape?: #FeaturesShape
window?: #FeaturesWindow
incremental_to_absolute?: #FeaturesIncrementalToAbsolute
}

if Args.kind == "sink" {
Expand Down Expand Up @@ -338,6 +339,8 @@ components: {

#FeaturesWindow: {}

#FeaturesIncrementalToAbsolute: {}

#FeaturesSend: {
_args: {
egress_method: string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package metadata

generated: components: transforms: incremental_to_absolute: configuration: expire_metrics_secs: {
common: false
description: """
The amount of time, in seconds, that incremental metrics will persist in the internal metrics cache
after having not been updated before they expire and are removed.
"""
required: true
type: uint: {}
}
Loading