diff --git a/changelog.d/14015 _aggregation_by_event_time.feature.md b/changelog.d/14015 _aggregation_by_event_time.feature.md new file mode 100644 index 0000000000000..9f90ccade5521 --- /dev/null +++ b/changelog.d/14015 _aggregation_by_event_time.feature.md @@ -0,0 +1,3 @@ +Feature: added support for aggregation driven by **event time**, allowing buckets to be aligned by metric timestamps instead of processing time. + +authors: adiwab diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 9c3f0b20afaa0..9af7b971117dd 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -20,6 +20,8 @@ use crate::{ transforms::{TaskTransform, Transform}, }; +use chrono::{DateTime, Utc}; + /// Configuration for the `aggregate` transform. #[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))] #[derive(Clone, Debug, Default)] @@ -31,12 +33,34 @@ pub struct AggregateConfig { #[serde(default = "default_interval_ms")] #[configurable(metadata(docs::human_name = "Flush Interval"))] pub interval_ms: u64, + /// Function to use for aggregation. /// /// Some of the functions may only function on incremental and some only on absolute metrics. #[serde(default = "default_mode")] #[configurable(derived)] pub mode: AggregationMode, + + /// Aggregation clock. + /// + /// Defines whether buckets are created based on processing time (default) or event time. + #[serde(default = "default_clock")] + #[configurable(derived)] + pub clock: AggregationClock, + + /// Allowed lateness for event-time processing. + /// + /// Specifies how long to wait for late or out-of-order samples before closing an event-time bucket. + #[serde(default = "default_allowed_lateness_ms")] + #[configurable(derived)] + pub allowed_lateness_ms: u64, + + /// Output timestamp mode. + /// + /// Determines whether the output timestamp should be set to the start or the end of the bucket. + #[serde(default)] + #[configurable(derived)] + pub emit_ts: EmitTimestamp, } #[configurable_component] @@ -72,6 +96,28 @@ pub enum AggregationMode { Stdev, } +#[configurable_component] +#[derive(Clone, Debug, Default)] +#[configurable(description = "Specifies which clock source drives the aggregation buckets.")] +pub enum AggregationClock { + /// Buckets are driven by Vector's wall clock (processing time). + #[default] + Processing, + /// Buckets are driven by each event's own timestamp (event time). + Event, +} + +#[configurable_component] +#[derive(Clone, Debug, Default)] +#[configurable(description = "Specifies which timestamp is applied to the emitted output.")] +pub enum EmitTimestamp { + /// Stamp the output at the start of the bucket window. + #[default] + BucketStart, + /// Stamp the output at the end of the bucket window. + BucketEnd, +} + const fn default_mode() -> AggregationMode { AggregationMode::Auto } @@ -80,6 +126,14 @@ const fn default_interval_ms() -> u64 { 10 * 1000 } +const fn default_allowed_lateness_ms() -> u64 { + 120_000 +} + +fn default_clock() -> AggregationClock { + AggregationClock::Processing +} + impl_generate_config_from_default!(AggregateConfig); #[async_trait::async_trait] @@ -105,13 +159,24 @@ impl TransformConfig for AggregateConfig { type MetricEntry = (MetricData, EventMetadata); +/// Key for event-time buckets: (series, bucket_start_epoch_ms) +type SeriesBucketKey = (MetricSeries, i64); + #[derive(Debug)] pub struct Aggregate { + // processing-time state interval: Duration, map: HashMap, prev_map: HashMap, multi_map: HashMap>, mode: AggregationMode, + + // event-time controls & state + clock: AggregationClock, + allowed_lateness: Duration, + emit_ts: EmitTimestamp, + event_map: HashMap, + event_multi_map: HashMap>, } impl Aggregate { @@ -122,12 +187,115 @@ impl Aggregate { prev_map: Default::default(), multi_map: Default::default(), mode: config.mode.clone(), + + clock: config.clock.clone(), + allowed_lateness: Duration::from_millis(config.allowed_lateness_ms), + emit_ts: config.emit_ts.clone(), + event_map: Default::default(), + event_multi_map: Default::default(), }) } + /// Extract timestamp (ms) from MetricData; fall back to "now" if missing. + #[inline] + fn ts_ms_from_data(&self, data: &MetricData) -> i64 { + if let Some(ts) = data.timestamp() { + ts.timestamp_millis() + } else { + Utc::now().timestamp_millis() + } + } + + /// Floor metric timestamp to window start. + #[inline] + fn bucket_epoch_ms(&self, data: &MetricData) -> i64 { + let ts = self.ts_ms_from_data(data); + let w = self.interval.as_millis() as i64; + ts - (ts % w) + } + + /// Convert bucket start epoch (ms) into the output timestamp (bucket start or end). + #[inline] + fn bucket_ts_to_datetime(&self, bucket_ms: i64) -> DateTime { + let interval_ms = i64::try_from(self.interval.as_millis()) + .expect("interval fits into i64 milliseconds"); + + let end_ms = bucket_ms + interval_ms - 1; + let ms = match self.emit_ts { + EmitTimestamp::BucketStart => bucket_ms, + EmitTimestamp::BucketEnd => end_ms, + }; + + DateTime::::from_timestamp_millis(ms) + .expect("valid bucket ms") + } + + /// Write the computed timestamp onto the outgoing Metric. + #[inline] + fn set_metric_timestamp(metric: &mut Metric, ts: DateTime) { + metric.data_mut().time.timestamp = Some(ts); + } + fn record(&mut self, event: Event) { let (series, data, metadata) = event.into_metric().into_parts(); + // --- Event-time path (new): bucket by the event's own timestamp --- + if matches!(self.clock, AggregationClock::Event) { + let bucket_ms = self.bucket_epoch_ms(&data); + let key: SeriesBucketKey = (series, bucket_ms); + + match self.mode { + AggregationMode::Auto => match data.kind { + MetricKind::Incremental => self.record_sum_event(key, data, metadata), + MetricKind::Absolute => { + // Latest by event timestamp within the same bucket + let new_ts = self.ts_ms_from_data(&data); + match self.event_map.entry(key) { + Entry::Occupied(mut e) => { + let existing_ts = self.ts_ms_from_data(&e.get().0.clone()); + if new_ts >= existing_ts { + e.insert((data, metadata)); + } // sonst älteres Sample verwerfen + } + Entry::Vacant(e) => { e.insert((data, metadata)); } + } + } + }, + AggregationMode::Sum => self.record_sum_event(key, data, metadata), + AggregationMode::Latest | AggregationMode::Diff => match data.kind { + MetricKind::Incremental => (), + MetricKind::Absolute => { + // Latest by event timestamp within the same bucket + let new_ts = self.ts_ms_from_data(&data); + match self.event_map.entry(key) { + Entry::Occupied(mut e) => { + let existing_ts = self.ts_ms_from_data(&e.get().0); + if new_ts >= existing_ts { + e.insert((data, metadata)); + } + } + Entry::Vacant(e) => { e.insert((data, metadata)); } + } + } + }, + AggregationMode::Count => self.record_count_event(key, data, metadata), + AggregationMode::Max | AggregationMode::Min => { + self.record_comparison_event(key, data, metadata) + } + AggregationMode::Mean | AggregationMode::Stdev => match data.kind { + MetricKind::Incremental => (), + MetricKind::Absolute => { + if matches!(data.value, MetricValue::Gauge { .. }) { + self.event_multi_map.entry(key).or_default().push((data, metadata)); + } + } + }, + } + + emit!(AggregateEventRecorded); + return; + } + match self.mode { AggregationMode::Auto => match data.kind { MetricKind::Incremental => self.record_sum(series, data, metadata), @@ -246,22 +414,208 @@ impl Aggregate { } } + fn record_count_event( + &mut self, + key: SeriesBucketKey, + mut data: MetricData, + metadata: EventMetadata, + ) { + let mut count_data = data.clone(); + let existing = self.event_map.entry(key).or_insert_with(|| { + *data.value_mut() = MetricValue::Counter { value: 0.0 }; + (data.clone(), metadata.clone()) + }); + *count_data.value_mut() = MetricValue::Counter { value: 1.0 }; + if existing.0.kind == data.kind && existing.0.update(&count_data) { + existing.1.merge(metadata); + } else { + emit!(AggregateUpdateFailed); + } + } + + fn record_sum_event(&mut self, key: SeriesBucketKey, data: MetricData, metadata: EventMetadata) { + match data.kind { + MetricKind::Incremental => match self.event_map.entry(key) { + Entry::Occupied(mut entry) => { + let existing = entry.get_mut(); + if existing.0.kind == data.kind && existing.0.update(&data) { + existing.1.merge(metadata); + } else { + emit!(AggregateUpdateFailed); + *existing = (data, metadata); + } + } + Entry::Vacant(entry) => { entry.insert((data, metadata)); } + }, + MetricKind::Absolute => {} + } + } + + fn record_comparison_event( + &mut self, + key: SeriesBucketKey, + data: MetricData, + metadata: EventMetadata, + ) { + match data.kind { + MetricKind::Incremental => (), + MetricKind::Absolute => match self.event_map.entry(key) { + Entry::Occupied(mut entry) => { + let existing = entry.get_mut(); + if existing.0.kind == data.kind { + if let MetricValue::Gauge { value: ev } = existing.0.value() { + if let MetricValue::Gauge { value: nv } = data.value() { + let should_update = match self.mode { + AggregationMode::Max => nv > ev, + AggregationMode::Min => nv < ev, + _ => false, + }; + if should_update { *existing = (data, metadata); } + } + } + } else { + emit!(AggregateUpdateFailed); + *existing = (data, metadata); + } + } + Entry::Vacant(entry) => { entry.insert((data, metadata)); } + }, + } + } + + fn flush_into(&mut self, output: &mut Vec) { - let map = std::mem::take(&mut self.map); - for (series, entry) in map.clone().into_iter() { - let mut metric = Metric::from_parts(series, entry.0, entry.1); - if matches!(self.mode, AggregationMode::Diff) - && let Some(prev_entry) = self.prev_map.get(metric.series()) - && metric.data().kind == prev_entry.0.kind - && !metric.subtract(&prev_entry.0) - { - emit!(AggregateUpdateFailed); + // --- Processing-time path (existing behavior) --- + if matches!(self.clock, AggregationClock::Processing) { + let map = std::mem::take(&mut self.map); + for (series, (data, meta)) in map.iter() { + let mut metric = Metric::from_parts(series.clone(), data.clone(), meta.clone()); + if matches!(self.mode, AggregationMode::Diff) + && let Some(prev_entry) = self.prev_map.get(metric.series()) + && metric.data().kind == prev_entry.0.kind + && !metric.subtract(&prev_entry.0) + { + emit!(AggregateUpdateFailed); + } + output.push(Event::Metric(metric)); } - output.push(Event::Metric(metric)); + + let multi_map = std::mem::take(&mut self.multi_map); + 'outer: for (series, entries) in multi_map.into_iter() { + if entries.is_empty() { + continue; + } + + let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone(); + for (data, metadata) in entries.iter().skip(1) { + if !final_sum.update(data) { + // Incompatible types, skip this metric + emit!(AggregateUpdateFailed); + continue 'outer; + } + final_metadata.merge(metadata.clone()); + } + + let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() { + *value /= entries.len() as f64; + *value + } else { + 0.0 + }; + + let final_mean = final_sum.clone(); + match self.mode { + AggregationMode::Mean => { + let metric = Metric::from_parts(series, final_mean, final_metadata); + output.push(Event::Metric(metric)); + } + AggregationMode::Stdev => { + let variance = entries + .iter() + .filter_map(|(data, _)| { + if let MetricValue::Gauge { value } = data.value() { + let diff = final_mean_value - value; + Some(diff * diff) + } else { + None + } + }) + .sum::() + / entries.len() as f64; + let mut final_stdev = final_mean; + if let MetricValue::Gauge { value } = final_stdev.value_mut() { + *value = variance.sqrt() + } + let metric = Metric::from_parts(series, final_stdev, final_metadata); + output.push(Event::Metric(metric)); + } + _ => (), + } + } + + self.prev_map = map; + emit!(AggregateFlushed); + return; } - let multi_map = std::mem::take(&mut self.multi_map); - 'outer: for (series, entries) in multi_map.into_iter() { + // --- Event-time path (new behavior) --- + let now_ms = Utc::now().timestamp_millis(); + let window_ms = self.interval.as_millis() as i64; + let lateness_ms = self.allowed_lateness.as_millis() as i64; + + // Single-value aggregates per (series, bucket) + let mut matured: Vec<(SeriesBucketKey, MetricEntry)> = Vec::new(); + let mut keep_event_map: HashMap = HashMap::new(); + + for (key, entry) in std::mem::take(&mut self.event_map).into_iter() { + let bucket_ms = key.1; + let mature = now_ms >= bucket_ms + window_ms + lateness_ms; + if mature { + matured.push((key, entry)); + } else { + keep_event_map.insert(key, entry); + } + } + + // Für Diff: reife Buckets zeitlich sortieren + matured.sort_by_key(|(k, _)| k.1); + + for (key, (data, meta)) in matured.into_iter() { + let bucket_ms = key.1; + + // Vor der ggf. folgenden Subtraktion den Aggregatwert merken + let prev_candidate = (data.clone(), meta.clone()); + + let mut metric = Metric::from_parts(key.0.clone(), data, meta); + + if matches!(self.mode, AggregationMode::Diff) { + if let Some(prev_entry) = self.prev_map.get(metric.series()) { + if metric.data().kind == prev_entry.0.kind && !metric.subtract(&prev_entry.0) { + emit!(AggregateUpdateFailed); + } + } + } + + let ts = self.bucket_ts_to_datetime(bucket_ms); + Self::set_metric_timestamp(&mut metric, ts); + output.push(Event::Metric(metric)); + + // Nur für Diff: prev_map auf den gerade aggregierten (vor-Diff) Wert aktualisieren + if matches!(self.mode, AggregationMode::Diff) { + self.prev_map.insert(key.0, prev_candidate); + } + } + self.event_map = keep_event_map; + + // Multi-point aggregates (Mean/Stdev) per (series, bucket) + let mut keep_event_multi: HashMap> = HashMap::new(); + for (key, entries) in std::mem::take(&mut self.event_multi_map).into_iter() { + let bucket_ms = key.1; + let mature = now_ms >= bucket_ms + window_ms + lateness_ms; + if !mature { + keep_event_multi.insert(key, entries); + continue; + } if entries.is_empty() { continue; } @@ -269,52 +623,53 @@ impl Aggregate { let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone(); for (data, metadata) in entries.iter().skip(1) { if !final_sum.update(data) { - // Incompatible types, skip this metric emit!(AggregateUpdateFailed); - continue 'outer; + continue; } final_metadata.merge(metadata.clone()); } - let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() { - // Entries are not empty so this is safe. + // Mittelwert korrekt bilden + let mut final_mean = final_sum.clone(); + if let MetricValue::Gauge { value } = final_mean.value_mut() { *value /= entries.len() as f64; - *value - } else { - 0.0 - }; + } - let final_mean = final_sum.clone(); match self.mode { AggregationMode::Mean => { - let metric = Metric::from_parts(series, final_mean, final_metadata); + let mut metric = Metric::from_parts(key.0, final_mean, final_metadata); + let ts = self.bucket_ts_to_datetime(bucket_ms); + Self::set_metric_timestamp(&mut metric, ts); output.push(Event::Metric(metric)); } AggregationMode::Stdev => { + let mean_val = if let MetricValue::Gauge { value } = final_mean.value() { *value } else { 0.0 }; let variance = entries .iter() - .filter_map(|(data, _)| { - if let MetricValue::Gauge { value } = data.value() { - let diff = final_mean_value - value; + .filter_map(|(d, _)| { + if let MetricValue::Gauge { value } = d.value() { + let diff = mean_val - value; Some(diff * diff) } else { None } }) - .sum::() - / entries.len() as f64; + .sum::() / entries.len() as f64; + let mut final_stdev = final_mean; if let MetricValue::Gauge { value } = final_stdev.value_mut() { - *value = variance.sqrt() + *value = variance.sqrt(); } - let metric = Metric::from_parts(series, final_stdev, final_metadata); + let mut metric = Metric::from_parts(key.0, final_stdev, final_metadata); + let ts = self.bucket_ts_to_datetime(bucket_ms); + Self::set_metric_timestamp(&mut metric, ts); output.push(Event::Metric(metric)); } _ => (), } } + self.event_multi_map = keep_event_multi; - self.prev_map = map; emit!(AggregateFlushed); } } @@ -399,6 +754,7 @@ mod tests { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64, mode: AggregationMode::Auto, + ..Default::default() }) .unwrap(); @@ -470,6 +826,7 @@ mod tests { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64, mode: AggregationMode::Auto, + ..Default::default() }) .unwrap(); @@ -536,6 +893,7 @@ mod tests { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64, mode: AggregationMode::Count, + ..Default::default() }) .unwrap(); @@ -592,6 +950,7 @@ mod tests { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64, mode: AggregationMode::Max, + ..Default::default() }) .unwrap(); @@ -638,6 +997,7 @@ mod tests { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64, mode: AggregationMode::Min, + ..Default::default() }) .unwrap(); @@ -684,6 +1044,7 @@ mod tests { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64, mode: AggregationMode::Diff, + ..Default::default() }) .unwrap(); @@ -740,6 +1101,7 @@ mod tests { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64, mode: AggregationMode::Diff, + ..Default::default() }) .unwrap(); @@ -775,6 +1137,7 @@ mod tests { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64, mode: AggregationMode::Mean, + ..Default::default() }) .unwrap(); @@ -832,6 +1195,7 @@ mod tests { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64, mode: AggregationMode::Stdev, + ..Default::default() }) .unwrap(); @@ -892,6 +1256,7 @@ mod tests { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64, mode: AggregationMode::Auto, + ..Default::default() }) .unwrap(); @@ -950,6 +1315,7 @@ mod tests { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64, mode: AggregationMode::Auto, + ..Default::default() }) .unwrap(); diff --git a/website/cue/reference/components/transforms/generated/aggregate.cue b/website/cue/reference/components/transforms/generated/aggregate.cue index 725285a35281b..caac63bc788f5 100644 --- a/website/cue/reference/components/transforms/generated/aggregate.cue +++ b/website/cue/reference/components/transforms/generated/aggregate.cue @@ -32,4 +32,45 @@ generated: components: transforms: aggregate: configuration: { } } } + clock: { + description: """ + Aggregation clock source. + + Determines whether buckets are aligned by Vector's processing time + or by the event's own timestamp. + """ + required: false + type: string: { + default: "Processing" + enum: { + Processing: "Buckets are driven by Vector's wall clock (processing time)." + Event: "Buckets are driven by each event's own timestamp (event time)." + } + } + } + allowed_lateness_ms: { + description: """ + Allowed lateness for event-time processing. + + Specifies how long to wait for late or out-of-order samples before closing an event-time bucket. + """ + required: false + type: uint: default: 120000 + } + emit_ts: { + description: """ + Output timestamp mode. + + Controls whether the emitted metric's timestamp is set to the start + or to the end of the bucket window. + """ + required: false + type: string: { + default: "BucketStart" + enum: { + BucketStart: "Stamp the output at the start of the bucket window." + BucketEnd: "Stamp the output at the end of the bucket window." + } + } + } }