diff --git a/Cargo.toml b/Cargo.toml index ebbe292270a8b..d86224c4b7f81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -375,7 +375,7 @@ itertools.workspace = true k8s-openapi = { version = "0.22.0", default-features = false, features = ["v1_26"], optional = true } kube = { version = "0.93.0", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true } listenfd = { version = "1.0.2", default-features = false, optional = true } -lru = { version = "0.16.0", default-features = false, optional = true } +lru = { version = "0.16.0", default-features = false } maxminddb = { version = "0.26.0", default-features = false, optional = true, features = ["simdutf8"] } md-5 = { version = "0.10", default-features = false, optional = true } mongodb = { version = "2.8.2", default-features = false, features = ["tokio-runtime"], optional = true } @@ -711,6 +711,7 @@ transforms-logs = [ transforms-metrics = [ "transforms-aggregate", "transforms-filter", + "transforms-incremental_to_absolute", "transforms-log_to_metric", "transforms-lua", "transforms-metric_to_log", @@ -723,6 +724,7 @@ transforms-aggregate = [] transforms-aws_ec2_metadata = ["dep:arc-swap"] transforms-dedupe = ["transforms-impl-dedupe"] transforms-filter = [] +transforms-incremental_to_absolute = [] transforms-window = [] transforms-log_to_metric = [] transforms-lua = ["dep:mlua", "vector-lib/lua"] @@ -737,7 +739,7 @@ transforms-throttle = ["dep:governor"] # Implementations of transforms transforms-impl-sample = [] -transforms-impl-dedupe = ["dep:lru"] +transforms-impl-dedupe = [] transforms-impl-reduce = [] # Sinks @@ -851,7 +853,7 @@ sinks-opentelemetry = ["sinks-http"] sinks-papertrail = ["dep:syslog"] sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"] sinks-postgres = ["dep:sqlx"] -sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"] +sinks-pulsar = ["dep:apache-avro", "dep:pulsar"] sinks-redis = ["dep:redis"] sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"] sinks-socket = ["sinks-utils-udp"] diff --git a/changelog.d/incremental_to_absolute_transform.feature.md b/changelog.d/incremental_to_absolute_transform.feature.md new file mode 100644 index 0000000000000..623ee48bab019 --- /dev/null +++ b/changelog.d/incremental_to_absolute_transform.feature.md @@ -0,0 +1,6 @@ +Add a new `incremental_to_absolute` transform which converts incremental metrics to absolute metrics. This is useful for +use cases when sending metrics to a sink is lossy or you want to get a historical record of metrics, in which case +incremental metrics may be inaccurate since any gaps in metrics sent will result in an inaccurate reading of the ending +value. + +authors: GreyLilac09 diff --git a/lib/vector-core/src/event/metric/data.rs b/lib/vector-core/src/event/metric/data.rs index 44823d0b81199..adef5b3876169 100644 --- a/lib/vector-core/src/event/metric/data.rs +++ b/lib/vector-core/src/event/metric/data.rs @@ -58,11 +58,14 @@ impl MetricData { /// Consumes this metric, returning it as an absolute metric. /// - /// If the metric was already absolute, nothing is changed. + /// The `interval_ms` is set to `None`. If the metric was already absolute, nothing else is changed. #[must_use] pub fn into_absolute(self) -> Self { Self { - time: self.time, + time: MetricTime { + timestamp: self.time.timestamp, + interval_ms: None, + }, kind: MetricKind::Absolute, value: self.value, } diff --git a/src/sinks/util/buffer/metrics/mod.rs b/src/sinks/util/buffer/metrics/mod.rs index 146eac28ad561..37acaf458a49a 100644 --- a/src/sinks/util/buffer/metrics/mod.rs +++ b/src/sinks/util/buffer/metrics/mod.rs @@ -57,7 +57,12 @@ impl Batch for MetricsBuffer { } else { let max_events = self.max_events; self.metrics - .get_or_insert_with(|| MetricSet::with_capacity(max_events)) + .get_or_insert_with(|| { + MetricSet::new(MetricSetSettings { + max_events: Some(max_events), + ..Default::default() + }) + }) .insert_update(item); PushResult::Ok(self.num_items() >= self.max_events) } diff --git a/src/sinks/util/buffer/metrics/normalize.rs b/src/sinks/util/buffer/metrics/normalize.rs index 11580a74db1db..b3cdc7a84f4b8 100644 --- a/src/sinks/util/buffer/metrics/normalize.rs +++ b/src/sinks/util/buffer/metrics/normalize.rs @@ -1,5 +1,5 @@ -use indexmap::IndexMap; - +use lru::LruCache; +use std::marker::PhantomData; use std::time::{Duration, Instant}; use vector_lib::event::{ @@ -7,6 +7,99 @@ use vector_lib::event::{ metric::{MetricData, MetricSeries}, }; +use serde_with::serde_as; +use snafu::Snafu; +use vector_config_macros::configurable_component; +use vector_lib::ByteSizeOf; + +#[derive(Debug, Snafu, PartialEq, Eq)] +pub enum NormalizerError { + #[snafu(display("`max_bytes` must be greater than zero"))] + InvalidMaxBytes, + #[snafu(display("`max_events` must be greater than zero"))] + InvalidMaxEvents, + #[snafu(display("`time_to_live` must be greater than zero"))] + InvalidTimeToLive, +} + +/// Defines behavior for creating the MetricNormalizer +#[serde_as] +#[configurable_component] +#[configurable(metadata(docs::advanced))] +#[derive(Clone, Copy, Debug, Default)] +pub struct NormalizerConfig { + /// The maximum size in bytes of the events in the metrics normalizer cache, excluding cache overhead. + #[serde(default = "default_max_bytes::")] + #[configurable(metadata(docs::type_unit = "bytes"))] + pub max_bytes: Option, + + /// The maximum number of events of the metrics normalizer cache + #[serde(default = "default_max_events::")] + #[configurable(metadata(docs::type_unit = "events"))] + pub max_events: Option, + + /// The maximum age of a metric not being updated before it is evicted from the metrics normalizer cache. + #[serde(default = "default_time_to_live::")] + #[configurable(metadata(docs::type_unit = "seconds"))] + #[configurable(metadata(docs::human_name = "Time To Live"))] + pub time_to_live: Option, + + #[serde(skip)] + pub _d: PhantomData, +} + +const fn default_max_bytes() -> Option { + D::MAX_BYTES +} + +const fn default_max_events() -> Option { + D::MAX_EVENTS +} + +const fn default_time_to_live() -> Option { + D::TIME_TO_LIVE +} + +impl NormalizerConfig { + pub fn validate(&self) -> Result, NormalizerError> { + let config = NormalizerConfig:: { + max_bytes: self.max_bytes.or(D::MAX_BYTES), + max_events: self.max_events.or(D::MAX_EVENTS), + time_to_live: self.time_to_live.or(D::TIME_TO_LIVE), + _d: Default::default(), + }; + match (config.max_bytes, config.max_events, config.time_to_live) { + (Some(0), _, _) => Err(NormalizerError::InvalidMaxBytes), + (_, Some(0), _) => Err(NormalizerError::InvalidMaxEvents), + (_, _, Some(0)) => Err(NormalizerError::InvalidTimeToLive), + _ => Ok(config), + } + } + + pub const fn into_settings(self) -> MetricSetSettings { + MetricSetSettings { + max_bytes: self.max_bytes, + max_events: self.max_events, + time_to_live: self.time_to_live, + } + } +} + +pub trait NormalizerSettings { + const MAX_EVENTS: Option; + const MAX_BYTES: Option; + const TIME_TO_LIVE: Option; +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct DefaultNormalizerSettings; + +impl NormalizerSettings for DefaultNormalizerSettings { + const MAX_EVENTS: Option = None; + const MAX_BYTES: Option = None; + const TIME_TO_LIVE: Option = None; +} + /// Normalizes metrics according to a set of rules. /// /// Depending on the system in which they are being sent to, metrics may have to be modified in order to fit the data @@ -48,10 +141,17 @@ pub struct MetricNormalizer { } impl MetricNormalizer { - /// Creates a new normalizer with TTL policy. - pub fn with_ttl(normalizer: N, ttl: TtlPolicy) -> Self { + /// Creates a new normalizer with the given configuration. + pub fn with_config( + normalizer: N, + config: NormalizerConfig, + ) -> Self { + let settings = config + .validate() + .unwrap_or_else(|e| panic!("Invalid cache settings: {e:?}")) + .into_settings(); Self { - state: MetricSet::with_ttl_policy(ttl), + state: MetricSet::new(settings), normalizer, } } @@ -89,7 +189,7 @@ impl From for MetricNormalizer { } } -/// Represents a stored metric entry with its data, metadata, and optional timestamp. +/// Represents a stored metric entry with its data, metadata, and timestamp. #[derive(Clone, Debug)] pub struct MetricEntry { /// The metric data containing the value and kind @@ -100,6 +200,12 @@ pub struct MetricEntry { pub timestamp: Option, } +impl ByteSizeOf for MetricEntry { + fn allocated_bytes(&self) -> usize { + self.data.allocated_bytes() + self.metadata.allocated_bytes() + } +} + impl MetricEntry { /// Creates a new MetricEntry with the given data, metadata, and timestamp. pub const fn new( @@ -114,7 +220,7 @@ impl MetricEntry { } } - /// Creates a new MetricEntry from a Metric and optional timestamp. + /// Creates a new MetricEntry from a Metric. pub fn from_metric(metric: Metric, timestamp: Option) -> (MetricSeries, Self) { let (series, data, metadata) = metric.into_parts(); let entry = Self::new(data, metadata, timestamp); @@ -130,9 +236,95 @@ impl MetricEntry { pub const fn update_timestamp(&mut self, timestamp: Option) { self.timestamp = timestamp; } + + /// Checks if this entry has expired based on the given TTL and reference time. + /// + /// Using a provided reference time ensures consistency across multiple expiration checks. + pub fn is_expired(&self, ttl: Duration, reference_time: Instant) -> bool { + match self.timestamp { + Some(ts) => reference_time.duration_since(ts) >= ttl, + None => false, + } + } +} + +/// Configuration for capacity-based eviction (memory and/or entry count limits). +#[derive(Clone, Debug)] +pub struct CapacityPolicy { + /// Maximum memory usage in bytes + pub max_bytes: Option, + /// Maximum number of entries + pub max_events: Option, + /// Current memory usage tracking + current_memory: usize, +} + +impl CapacityPolicy { + /// Creates a new capacity policy with both memory and entry limits. + pub const fn new(max_bytes: Option, max_events: Option) -> Self { + Self { + max_bytes, + max_events, + current_memory: 0, + } + } + + /// Gets the current memory usage. + pub const fn current_memory(&self) -> usize { + self.current_memory + } + + /// Updates memory tracking when an entry is removed. + const fn remove_memory(&mut self, bytes: usize) { + self.current_memory = self.current_memory.saturating_sub(bytes); + } + + /// Frees the memory for an item if max_bytes is set. + /// Only calculates and tracks memory when max_bytes is specified. + pub fn free_item(&mut self, series: &MetricSeries, entry: &MetricEntry) { + if self.max_bytes.is_some() { + let freed_memory = self.item_size(series, entry); + self.remove_memory(freed_memory); + } + } + + /// Updates memory tracking. + const fn replace_memory(&mut self, old_bytes: usize, new_bytes: usize) { + self.current_memory = self + .current_memory + .saturating_sub(old_bytes) + .saturating_add(new_bytes); + } + + /// Checks if the current state exceeds memory limits. + const fn exceeds_memory_limit(&self) -> bool { + if let Some(max_bytes) = self.max_bytes { + self.current_memory > max_bytes + } else { + false + } + } + + /// Checks if the given entry count exceeds entry limits. + const fn exceeds_entry_limit(&self, entry_count: usize) -> bool { + if let Some(max_events) = self.max_events { + entry_count > max_events + } else { + false + } + } + + /// Returns true if any limits are currently exceeded. + const fn needs_eviction(&self, entry_count: usize) -> bool { + self.exceeds_memory_limit() || self.exceeds_entry_limit(entry_count) + } + + /// Gets the total memory size of entry/series, excluding LRU cache overhead. + pub fn item_size(&self, series: &MetricSeries, entry: &MetricEntry) -> usize { + entry.allocated_bytes() + series.allocated_bytes() + } } -/// Configuration for automatic cleanup of expired entries. #[derive(Clone, Debug)] pub struct TtlPolicy { /// Time-to-live for entries @@ -143,8 +335,9 @@ pub struct TtlPolicy { pub(crate) last_cleanup: Instant, } +/// Configuration for automatic cleanup of expired entries. impl TtlPolicy { - /// Creates a new cleanup configuration with TTL. + /// Creates a new TTL policy with the given duration. /// Cleanup interval defaults to TTL/10 with a 10-second minimum. pub fn new(ttl: Duration) -> Self { Self { @@ -155,46 +348,81 @@ impl TtlPolicy { } /// Checks if it's time to run cleanup. - pub fn should_cleanup(&self) -> bool { - Instant::now().duration_since(self.last_cleanup) >= self.cleanup_interval + /// + /// Returns Some(current_time) if cleanup should be performed, None otherwise. + pub fn should_cleanup(&self) -> Option { + let now = Instant::now(); + if now.duration_since(self.last_cleanup) >= self.cleanup_interval { + Some(now) + } else { + None + } } - /// Marks cleanup as having been performed. - pub fn mark_cleanup_done(&mut self) { - self.last_cleanup = Instant::now(); + /// Marks cleanup as having been performed with the provided timestamp. + pub const fn mark_cleanup_done(&mut self, now: Instant) { + self.last_cleanup = now; } } -/// Metric storage for use with normalization. +#[derive(Debug, Clone, Copy, Default)] +pub struct MetricSetSettings { + pub max_bytes: Option, + pub max_events: Option, + pub time_to_live: Option, +} + +/// Dual-limit cache using standard LRU with optional capacity and TTL policies. /// -/// This is primarily a wrapper around [`IndexMap`] (to ensure insertion order -/// is maintained) with convenience methods to make it easier to perform -/// normalization-specific operations. It also includes an optional TTL policy -/// to automatically expire old entries. -#[derive(Clone, Debug, Default)] +/// This implementation uses the standard LRU crate with optional enforcement of both +/// memory and entry count limits via CapacityPolicy, plus optional TTL via TtlPolicy. +#[derive(Clone, Debug)] pub struct MetricSet { - inner: IndexMap, + /// LRU cache for storing metric entries + inner: LruCache, + /// Optional capacity policy for memory and/or entry count limits + capacity_policy: Option, + /// Optional TTL policy for time-based expiration ttl_policy: Option, } impl MetricSet { - /// Creates an empty MetricSet with the specified capacity. - pub fn with_capacity(capacity: usize) -> Self { - Self { - inner: IndexMap::with_capacity(capacity), - ttl_policy: None, - } + /// Creates a new MetricSet with the given settings. + pub fn new(settings: MetricSetSettings) -> Self { + // Create capacity policy if any capacity limit is set + let capacity_policy = match (settings.max_bytes, settings.max_events) { + (None, None) => None, + (max_bytes, max_events) => Some(CapacityPolicy::new(max_bytes, max_events)), + }; + + // Create TTL policy if time-to-live is set + let ttl_policy = settings + .time_to_live + .map(|ttl| TtlPolicy::new(Duration::from_secs(ttl))); + + Self::with_policies(capacity_policy, ttl_policy) } - /// Creates a MetricSet with custom cleanup configuration. - pub fn with_ttl_policy(ttl_policy: TtlPolicy) -> Self { + /// Creates a new MetricSet with the given policies. + pub fn with_policies( + capacity_policy: Option, + ttl_policy: Option, + ) -> Self { + // Always use an unbounded cache since we manually track limits + // This ensures our capacity policy can properly track memory for all evicted entries Self { - inner: IndexMap::default(), - ttl_policy: Some(ttl_policy), + inner: LruCache::unbounded(), + capacity_policy, + ttl_policy, } } - /// Gets a reference to the TTL policy configuration. + /// Gets the current capacity policy. + pub const fn capacity_policy(&self) -> Option<&CapacityPolicy> { + self.capacity_policy.as_ref() + } + + /// Gets the current TTL policy. pub const fn ttl_policy(&self) -> Option<&TtlPolicy> { self.ttl_policy.as_ref() } @@ -204,57 +432,125 @@ impl MetricSet { self.ttl_policy.as_mut() } - /// Perform periodic cleanup if enough time has passed since the last cleanup - fn maybe_cleanup(&mut self) { - // Return early if no cleanup is needed - if !self - .ttl_policy() - .is_some_and(|config| config.should_cleanup()) - { - return; + /// Gets the current number of entries in the cache. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns true if the cache contains no entries. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Gets the current memory usage in bytes. + pub fn weighted_size(&self) -> u64 { + self.capacity_policy + .as_ref() + .map_or(0, |cp| cp.current_memory() as u64) + } + + /// Creates a timestamp if TTL is enabled. + fn create_timestamp(&self) -> Option { + self.ttl_policy.as_ref().map(|_| Instant::now()) + } + + /// Enforce memory and entry limits by evicting LRU entries. + fn enforce_capacity_policy(&mut self) { + let Some(ref mut capacity_policy) = self.capacity_policy else { + return; // No capacity limits configured + }; + + // Keep evicting until we're within limits + while capacity_policy.needs_eviction(self.inner.len()) { + if let Some((series, entry)) = self.inner.pop_lru() { + capacity_policy.free_item(&series, &entry); + } else { + break; // No more entries to evict + } } - self.cleanup_expired(); + } + + /// Perform TTL cleanup if configured and needed. + fn maybe_cleanup(&mut self) { + // Check if cleanup is needed and get the current timestamp in one operation + let now = match self.ttl_policy().and_then(|config| config.should_cleanup()) { + Some(timestamp) => timestamp, + None => return, // No cleanup needed + }; + + // Perform the cleanup using the same timestamp + self.cleanup_expired(now); + + // Mark cleanup as done with the same timestamp if let Some(config) = self.ttl_policy_mut() { - config.mark_cleanup_done(); + config.mark_cleanup_done(now); } } - /// Removes expired entries based on TTL if configured. - fn cleanup_expired(&mut self) { - let now = Instant::now(); - if let Some(config) = &self.ttl_policy { - self.inner.retain(|_, entry| match entry.timestamp { - Some(ts) => now.duration_since(ts) < config.ttl, - None => true, - }); + /// Remove expired entries based on TTL using the provided timestamp. + fn cleanup_expired(&mut self, now: Instant) { + // Get the TTL from the policy + let Some(ttl) = self.ttl_policy().map(|policy| policy.ttl) else { + return; // No TTL policy, nothing to do + }; + + let mut expired_keys = Vec::new(); + + // Collect expired keys using the provided timestamp + for (series, entry) in self.inner.iter() { + if entry.is_expired(ttl, now) { + expired_keys.push(series.clone()); + } } - } - /// Returns the number of elements in the set. - pub fn len(&self) -> usize { - self.inner.len() + // Remove expired entries and update memory tracking (if max_bytes is set) + for series in expired_keys { + if let Some(entry) = self.inner.pop(&series) { + if let Some(ref mut capacity_policy) = self.capacity_policy { + capacity_policy.free_item(&series, &entry); + } + } + } } - fn create_timestamp(&self) -> Option { - match self.ttl_policy() { - Some(_) => Some(Instant::now()), - _ => None, + /// Internal insert that updates memory tracking and enforces limits. + fn insert_with_tracking(&mut self, series: MetricSeries, entry: MetricEntry) { + let Some(ref mut capacity_policy) = self.capacity_policy else { + self.inner.put(series, entry); + return; // No capacity limits configured, return immediately + }; + + // Handle differently based on whether we need to track memory + if capacity_policy.max_bytes.is_some() { + // When tracking memory, we need to calculate sizes before and after + let entry_size = capacity_policy.item_size(&series, &entry); + + if let Some(existing_entry) = self.inner.put(series.clone(), entry) { + // If we had an existing entry, calculate its size and adjust memory tracking + let existing_size = capacity_policy.item_size(&series, &existing_entry); + capacity_policy.replace_memory(existing_size, entry_size); + } else { + // No existing entry, just add the new entry's size + capacity_policy.replace_memory(0, entry_size); + } + } else { + // When not tracking memory (only entry count limits), just put directly + self.inner.put(series, entry); } - } - /// Returns true if the set contains no elements. - pub fn is_empty(&self) -> bool { - self.inner.is_empty() + // Enforce limits after insertion + self.enforce_capacity_policy(); } /// Consumes this MetricSet and returns a vector of Metric. pub fn into_metrics(mut self) -> Vec { - // Always cleanup on final consumption - self.cleanup_expired(); - self.inner - .into_iter() - .map(|(series, entry)| entry.into_metric(series)) - .collect() + // Clean up expired entries first (using current time) + self.cleanup_expired(Instant::now()); + let mut metrics = Vec::new(); + while let Some((series, entry)) = self.inner.pop_lru() { + metrics.push(entry.into_metric(series)); + } + metrics } /// Either pass the metric through as-is if absolute, or convert it @@ -282,20 +578,19 @@ impl MetricSet { /// application uptime. fn incremental_to_absolute(&mut self, mut metric: Metric) -> Metric { let timestamp = self.create_timestamp(); + // We always call insert() to track memory usage match self.inner.get_mut(metric.series()) { Some(existing) => { - if existing.data.value.add(metric.value()) { - metric = metric.with_value(existing.data.value.clone()); - existing.update_timestamp(timestamp); - } else { - // Metric changed type, store this as the new reference value - let (series, entry) = MetricEntry::from_metric(metric.clone(), timestamp); - self.inner.insert(series, entry); + let mut new_value = existing.data.value().clone(); + if new_value.add(metric.value()) { + // Update the stored value + metric = metric.with_value(new_value); } + // Insert the updated stored value, or as store a new reference value (if the Metric changed type) + self.insert(metric.clone(), timestamp); } None => { - let (series, entry) = MetricEntry::from_metric(metric.clone(), timestamp); - self.inner.insert(series, entry); + self.insert(metric.clone(), timestamp); } } metric.into_absolute() @@ -324,13 +619,18 @@ impl MetricSet { // again, but this is a behavior we have to observe for sinks that can only handle // incremental updates. let timestamp = self.create_timestamp(); + // We always call insert() to track memory usage match self.inner.get_mut(metric.series()) { Some(reference) => { let new_value = metric.value().clone(); + // Create a copy of the reference so we can insert and + // replace the existing entry, tracking memory usage + let mut new_reference = reference.clone(); // From the stored reference value, emit an increment if metric.subtract(&reference.data) { - reference.data.value = new_value; - reference.update_timestamp(timestamp); + new_reference.data.value = new_value; + new_reference.timestamp = timestamp; + self.insert_with_tracking(metric.series().clone(), new_reference); Some(metric.into_incremental()) } else { // Metric changed type, store this and emit nothing @@ -348,7 +648,7 @@ impl MetricSet { fn insert(&mut self, metric: Metric, timestamp: Option) { let (series, entry) = MetricEntry::from_metric(metric, timestamp); - self.inner.insert(series, entry); + self.insert_with_tracking(series, entry); } pub fn insert_update(&mut self, metric: Metric) { @@ -360,10 +660,14 @@ impl MetricSet { // Incremental metrics update existing entries, if present match self.inner.get_mut(metric.series()) { Some(existing) => { + // Create a copy of the reference so we can insert and + // replace the existing entry, tracking memory usage + let mut new_existing = existing.clone(); let (series, data, metadata) = metric.into_parts(); - if existing.data.update(&data) { - existing.metadata.merge(metadata); - existing.update_timestamp(timestamp); + if new_existing.data.update(&data) { + new_existing.metadata.merge(metadata); + new_existing.update_timestamp(timestamp); + self.insert_with_tracking(series, new_existing); None } else { warn!(message = "Metric changed type, dropping old value.", %series); @@ -379,11 +683,22 @@ impl MetricSet { } } - /// Removes a series from the set. + /// Removes a series from the cache. /// - /// If the series existed and was removed, returns `true`. Otherwise, `false`. + /// If the series existed and was removed, returns true. Otherwise, false. pub fn remove(&mut self, series: &MetricSeries) -> bool { - self.maybe_cleanup(); - self.inner.shift_remove(series).is_some() + if let Some(entry) = self.inner.pop(series) { + if let Some(ref mut capacity_policy) = self.capacity_policy { + capacity_policy.free_item(series, &entry); + } + return true; + } + false + } +} + +impl Default for MetricSet { + fn default() -> Self { + Self::new(MetricSetSettings::default()) } } diff --git a/src/sinks/util/normalizer.rs b/src/sinks/util/normalizer.rs index 4ef398a3f9578..03a0c82242207 100644 --- a/src/sinks/util/normalizer.rs +++ b/src/sinks/util/normalizer.rs @@ -8,7 +8,9 @@ use futures_util::{Stream, StreamExt, stream::Fuse}; use pin_project::pin_project; use vector_lib::event::Metric; -use super::buffer::metrics::{MetricNormalize, MetricNormalizer, TtlPolicy}; +use crate::sinks::util::buffer::metrics::{ + DefaultNormalizerSettings, MetricNormalize, MetricNormalizer, NormalizerConfig, +}; #[pin_project] pub struct Normalizer @@ -32,9 +34,16 @@ where } pub fn new_with_ttl(stream: St, normalizer: N, ttl: Duration) -> Self { + // Create a new MetricSetConfig with the proper settings type + let config = NormalizerConfig:: { + time_to_live: Some(ttl.as_secs()), + max_bytes: None, + max_events: None, + _d: std::marker::PhantomData, + }; Self { stream: stream.fuse(), - normalizer: MetricNormalizer::with_ttl(normalizer, TtlPolicy::new(ttl)), + normalizer: MetricNormalizer::with_config(normalizer, config), } } } diff --git a/src/transforms/incremental_to_absolute.rs b/src/transforms/incremental_to_absolute.rs new file mode 100644 index 0000000000000..9628d3450031b --- /dev/null +++ b/src/transforms/incremental_to_absolute.rs @@ -0,0 +1,231 @@ +use std::{collections::HashMap, future::ready, pin::Pin, time::Duration}; + +use futures::{Stream, StreamExt}; +use vector_lib::config::LogNamespace; +use vector_lib::configurable::configurable_component; + +use crate::sinks::util::buffer::metrics::{MetricSet, NormalizerConfig, NormalizerSettings}; +use crate::{ + config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput}, + event::Event, + schema, + transforms::{TaskTransform, Transform}, +}; + +/// Configuration for the `incremental_to_absolute` transform. +#[configurable_component(transform( + "incremental_to_absolute", + "Convert incremental metrics to absolute." +))] +#[derive(Clone, Debug, Default)] +#[serde(deny_unknown_fields)] +pub struct IncrementalToAbsoluteConfig { + /// Configuration for the internal metrics cache used to normalize a stream of incremental + /// metrics into absolute metrics. + /// + /// By default, incremental metrics are evicted after 5 minutes of not being updated. The next + /// incremental value will be reset. + #[configurable(derived)] + #[serde(default)] + pub cache: NormalizerConfig, +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct IncrementalToAbsoluteDefaultNormalizerSettings; + +impl NormalizerSettings for IncrementalToAbsoluteDefaultNormalizerSettings { + const MAX_EVENTS: Option = None; + const MAX_BYTES: Option = None; + const TIME_TO_LIVE: Option = Some(300); +} + +pub const fn default_expire_metrics_secs() -> Duration { + Duration::from_secs(120) +} + +impl_generate_config_from_default!(IncrementalToAbsoluteConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "incremental_to_absolute")] +impl TransformConfig for IncrementalToAbsoluteConfig { + async fn build(&self, _context: &TransformContext) -> crate::Result { + IncrementalToAbsolute::new(self).map(Transform::event_task) + } + + fn input(&self) -> Input { + Input::metric() + } + + fn outputs( + &self, + _: vector_lib::enrichment::TableRegistry, + _: &[(OutputId, schema::Definition)], + _: LogNamespace, + ) -> Vec { + vec![TransformOutput::new(DataType::Metric, HashMap::new())] + } +} +#[derive(Debug)] +pub struct IncrementalToAbsolute { + data: MetricSet, +} + +impl IncrementalToAbsolute { + pub fn new(config: &IncrementalToAbsoluteConfig) -> crate::Result { + // Create a new MetricSet with the proper cache settings + Ok(Self { + data: MetricSet::new(config.cache.validate()?.into_settings()), + }) + } + pub fn transform_one(&mut self, event: Event) -> Option { + self.data + .make_absolute(event.as_metric().clone()) + .map(Event::Metric) + } +} + +impl TaskTransform for IncrementalToAbsolute { + fn transform( + self: Box, + task: Pin + Send>>, + ) -> Pin + Send>> + where + Self: 'static, + { + let mut inner = self; + Box::pin(task.filter_map(move |v| ready(inner.transform_one(v)))) + } +} + +#[cfg(test)] +mod tests { + use futures_util::SinkExt; + use similar_asserts::assert_eq; + use std::sync::Arc; + use vector_lib::config::ComponentKey; + + use super::*; + use crate::event::{ + Metric, + metric::{MetricKind, MetricValue}, + }; + + fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event { + let mut event = Event::Metric(Metric::new(name, kind, value)) + .with_source_id(Arc::new(ComponentKey::from("in"))) + .with_upstream_id(Arc::new(OutputId::from("transform"))); + + event.metadata_mut().set_source_type("unit_test_stream"); + + event + } + + async fn assert_metric_eq( + tx: &mut futures::channel::mpsc::Sender, + mut out_stream: impl Stream + Unpin, + metric: Event, + expected_metric: Event, + ) { + tx.send(metric).await.unwrap(); + if let Some(out_event) = out_stream.next().await { + let result = out_event; + assert_eq!(result, expected_metric); + } else { + panic!("Unexpectedly received None in output stream"); + } + } + + #[tokio::test] + async fn test_incremental_to_absolute() { + let config = toml::from_str::( + r#" +[cache] +max_events = 100 +"#, + ) + .unwrap(); + let incremental_to_absolute = IncrementalToAbsolute::new(&config) + .map(Transform::event_task) + .unwrap(); + let incremental_to_absolute = incremental_to_absolute.into_task(); + let (mut tx, rx) = futures::channel::mpsc::channel(10); + let mut out_stream = incremental_to_absolute.transform_events(Box::pin(rx)); + + let inc_counter_1 = make_metric( + "incremental_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 10.0 }, + ); + let expected_inc_counter_1 = make_metric( + "incremental_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 10.0 }, + ); + assert_metric_eq( + &mut tx, + &mut out_stream, + inc_counter_1, + expected_inc_counter_1, + ) + .await; + + let inc_counter_2 = make_metric( + "incremental_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 10.0 }, + ); + let expected_inc_counter_2 = make_metric( + "incremental_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 20.0 }, + ); + assert_metric_eq( + &mut tx, + &mut out_stream, + inc_counter_2, + expected_inc_counter_2, + ) + .await; + + let inc_counter_3 = make_metric( + "incremental_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 10.0 }, + ); + let expected_inc_counter_3 = make_metric( + "incremental_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 30.0 }, + ); + assert_metric_eq( + &mut tx, + &mut out_stream, + inc_counter_3, + expected_inc_counter_3, + ) + .await; + + // Absolute counters and gauges are emitted unchanged + let gauge = make_metric( + "gauge", + MetricKind::Absolute, + MetricValue::Gauge { value: 42.0 }, + ); + let expected_gauge = gauge.clone(); + assert_metric_eq(&mut tx, &mut out_stream, gauge, expected_gauge).await; + + let absolute_counter = make_metric( + "absolute_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 42.0 }, + ); + let absolute_counter_expected = absolute_counter.clone(); + assert_metric_eq( + &mut tx, + &mut out_stream, + absolute_counter, + absolute_counter_expected, + ) + .await; + } +} diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index 6dd7e74a1e873..3d5bd83a7a75b 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -15,6 +15,8 @@ pub mod aws_ec2_metadata; mod exclusive_route; #[cfg(feature = "transforms-filter")] pub mod filter; +#[cfg(feature = "transforms-incremental_to_absolute")] +pub mod incremental_to_absolute; #[cfg(feature = "transforms-log_to_metric")] pub mod log_to_metric; #[cfg(feature = "transforms-lua")] diff --git a/website/cue/reference/components.cue b/website/cue/reference/components.cue index f72bc26395805..7d264e557ac23 100644 --- a/website/cue/reference/components.cue +++ b/website/cue/reference/components.cue @@ -192,19 +192,20 @@ components: { } if Args.kind == "transform" { - aggregate?: #FeaturesAggregate - convert?: #FeaturesConvert - enrich?: #FeaturesEnrich - filter?: #FeaturesFilter - parse?: #FeaturesParse - program?: #FeaturesProgram - proxy?: #FeaturesProxy - reduce?: #FeaturesReduce - route?: #FeaturesRoute - exclusive_route?: #FeaturesExclusiveRoute - sanitize?: #FeaturesSanitize - shape?: #FeaturesShape - window?: #FeaturesWindow + aggregate?: #FeaturesAggregate + convert?: #FeaturesConvert + enrich?: #FeaturesEnrich + filter?: #FeaturesFilter + parse?: #FeaturesParse + program?: #FeaturesProgram + proxy?: #FeaturesProxy + reduce?: #FeaturesReduce + route?: #FeaturesRoute + exclusive_route?: #FeaturesExclusiveRoute + sanitize?: #FeaturesSanitize + shape?: #FeaturesShape + window?: #FeaturesWindow + incremental_to_absolute?: #FeaturesIncrementalToAbsolute } if Args.kind == "sink" { @@ -338,6 +339,8 @@ components: { #FeaturesWindow: {} + #FeaturesIncrementalToAbsolute: {} + #FeaturesSend: { _args: { egress_method: string diff --git a/website/cue/reference/components/transforms/generated/incremental_to_absolute.cue b/website/cue/reference/components/transforms/generated/incremental_to_absolute.cue new file mode 100644 index 0000000000000..3182d466fbbf2 --- /dev/null +++ b/website/cue/reference/components/transforms/generated/incremental_to_absolute.cue @@ -0,0 +1,32 @@ +package metadata + +generated: components: transforms: incremental_to_absolute: configuration: cache: { + description: """ + Configuration for the internal metrics cache used to normalize a stream of incremental + metrics into absolute metrics. + + By default, incremental metrics are evicted after 5 minutes of not being updated. The next + incremental value will be reset. + """ + required: false + type: object: options: { + max_bytes: { + description: "The maximum size in bytes of the events in the metrics normalizer cache, excluding cache overhead." + required: false + type: uint: unit: "bytes" + } + max_events: { + description: "The maximum number of events of the metrics normalizer cache" + required: false + type: uint: unit: "events" + } + time_to_live: { + description: "The maximum age of a metric not being updated before it is evicted from the metrics normalizer cache." + required: false + type: uint: { + default: 300 + unit: "seconds" + } + } + } +} diff --git a/website/cue/reference/components/transforms/incremental_to_absolute.cue b/website/cue/reference/components/transforms/incremental_to_absolute.cue new file mode 100644 index 0000000000000..0c3e655ed2938 --- /dev/null +++ b/website/cue/reference/components/transforms/incremental_to_absolute.cue @@ -0,0 +1,182 @@ +package metadata + +components: transforms: incremental_to_absolute: { + title: "Incremental To Absolute" + + description: """ + Converts incremental metrics to absolute. Absolute metrics are emitted unchanged to downstream components. + """ + + classes: { + commonly_used: false + development: "beta" + egress_method: "stream" + stateful: true + } + + features: { + convert: {} + } + + support: { + requirements: [] + warnings: [] + notices: [] + } + + configuration: generated.components.transforms.incremental_to_absolute.configuration + + input: { + logs: false + metrics: { + counter: true + distribution: true + gauge: true + histogram: true + set: true + summary: true + } + traces: false + } + + output: { + metrics: "": { + description: "The modified input `metric` event." + } + } + + examples: [ + { + title: "Convert incremental metrics to absolute" + input: [ + { + metric: { + kind: "incremental" + name: "counter.1" + timestamp: "2021-07-12T07:58:44.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 1.1 + } + } + }, + { + metric: { + kind: "incremental" + name: "counter.1" + timestamp: "2021-07-12T07:58:45.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 1.1 + } + } + }, + { + metric: { + kind: "incremental" + name: "counter.1" + timestamp: "2021-07-12T07:58:46.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 1.1 + } + } + }, + { + metric: { + kind: "incremental" + name: "counter.1" + timestamp: "2021-07-12T08:59:45.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 1.1 + } + } + }, + ] + configuration: { + cache: time_to_live: 10 + } + output: [ + { + metric: { + kind: "absolute" + name: "counter.1" + timestamp: "2021-07-12T07:58:44.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 1.1 + } + } + }, + { + metric: { + kind: "absolute" + name: "counter.1" + timestamp: "2021-07-12T07:58:45.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 2.2 + } + } + }, + { + metric: { + kind: "absolute" + name: "counter.1" + timestamp: "2021-07-12T07:58:46.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 3.3 + } + } + }, + { + metric: { + kind: "absolute" + name: "counter.1" + timestamp: "2021-07-12T08:59:45.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 1.1 + } + } + }, + ] + }, + ] + + how_it_works: { + advantages: { + title: "Advantages of Use" + body: """ + Converting incremental metrics to absolute metrics has two major benefits. First, incremental metrics require + the entire history to determine the current state, as they depend on previous values to calculate changes. + Each absolute metric represents a complete state, making it easier to view historical data accurately for + components like the File sink, where some files might end up missing or out of order. Second, it can reduce + overhead for downstream components like Prometheus Remote Write, which internally converts + incremental to absolute metrics. Converting to absolute metric with this transform prevents the + creation of duplicate caches when sending to multiple Prometheus Remote Write sinks. + + The conversion is performed based on the order in which incremental metrics are received, not their timestamps. + Moreover, absolute metrics received by this transform are emitted unchanged. + """ + } + } +}