Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/reduce_max_bytes.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added a new `max_bytes` configuration option to the `reduce` transform. This option triggers a flush when the accumulated byte size of a reduced event group would exceed the configured threshold. This complements the existing `max_events` option and provides more granular control over memory usage.

authors: https://github.com/PGBI
7 changes: 7 additions & 0 deletions src/transforms/reduce/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ pub struct ReduceConfig {
/// The maximum number of events to group together.
pub max_events: Option<NonZeroUsize>,

/// The maximum size of a reduced event, in bytes.
///
/// If adding an event would cause the reduced event to exceed this size,
/// the current reduced event is flushed first.
#[configurable(metadata(docs::type_unit = "bytes"))]
pub max_bytes: Option<NonZeroUsize>,

/// An ordered list of fields by which to group events.
///
/// Each group with matching values for the specified keys is reduced independently, allowing
Expand Down
284 changes: 284 additions & 0 deletions src/transforms/reduce/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use futures::Stream;
use indexmap::IndexMap;
use vector_lib::byte_size_of::ByteSizeOf;
use vector_lib::stream::expiration_map::{Emitter, map_with_expiration};
use vector_vrl_metrics::MetricsStorage;
use vrl::{
Expand All @@ -29,6 +30,7 @@ use crate::{
#[derive(Clone, Debug)]
struct ReduceState {
events: usize,
accumulated_bytes: usize,
fields: HashMap<OwnedTargetPath, Box<dyn ReduceValueMerger>>,
stale_since: Instant,
creation: Instant,
Expand All @@ -53,6 +55,7 @@ impl ReduceState {
fn new() -> Self {
Self {
events: 0,
accumulated_bytes: 0,
stale_since: Instant::now(),
creation: Instant::now(),
fields: HashMap::new(),
Expand All @@ -61,6 +64,7 @@ impl ReduceState {
}

fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<OwnedTargetPath, MergeStrategy>) {
self.accumulated_bytes += e.size_of();
self.metadata.merge(e.metadata().clone());

for (path, strategy) in strategies {
Expand Down Expand Up @@ -150,6 +154,7 @@ pub struct Reduce {
ends_when: Option<Condition>,
starts_when: Option<Condition>,
max_events: Option<usize>,
max_bytes: Option<usize>,
}

fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
Expand Down Expand Up @@ -193,6 +198,7 @@ impl Reduce {
.transpose()?;
let group_by = config.group_by.clone().into_iter().collect();
let max_events = config.max_events.map(|max| max.into());
let max_bytes = config.max_bytes.map(|max| max.into());

validate_merge_strategies(config.merge_strategies.clone())?;

Expand All @@ -219,6 +225,7 @@ impl Reduce {
ends_when,
starts_when,
max_events,
max_bytes,
})
}

Expand Down Expand Up @@ -275,6 +282,7 @@ impl Reduce {
};

let event = event.into_log();
let incoming_event_size = event.size_of();
let discriminant = Discriminant::from_log_event(&event, &self.group_by);

if let Some(max_events) = self.max_events {
Expand All @@ -288,6 +296,15 @@ impl Reduce {
}
}

// Flush existing state if adding this event would exceed max_bytes
if let Some(max_bytes) = self.max_bytes
&& let Some(entry) = self.reduce_merge_states.get(&discriminant)
&& entry.accumulated_bytes + incoming_event_size > max_bytes
&& let Some(state) = self.reduce_merge_states.remove(&discriminant)
{
emitter.emit(state.flush().into());
}

if starts_here {
if let Some(state) = self.reduce_merge_states.remove(&discriminant) {
emitter.emit(state.flush().into());
Expand Down Expand Up @@ -1047,4 +1064,271 @@ merge_strategies.bar = "concat"
})
.await
}

#[tokio::test]
async fn max_bytes_0() {
let reduce_config = toml::from_str::<ReduceConfig>(
r#"
group_by = [ "id" ]
merge_strategies.id = "retain"
merge_strategies.message = "array"
max_bytes = 0
"#,
);

match reduce_config {
Ok(_conf) => unreachable!("max_bytes=0 should be rejected."),
Err(err) => assert!(
err.to_string()
.contains("invalid value: integer `0`, expected a nonzero usize")
),
}
}

#[tokio::test]
async fn max_bytes_basic() {
// Use max_bytes=1 to force flush before each new event
let reduce_config = toml::from_str::<ReduceConfig>(
r#"
group_by = [ "id" ]
merge_strategies.id = "retain"
merge_strategies.message = "array"
max_bytes = 1

[ends_when]
type = "vrl"
source = "exists(.test_end)"
"#,
)
.unwrap();

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;

// With max_bytes=1, each new event triggers flush of previous state
let mut e_1 = LogEvent::from("test message 1");
e_1.insert("id", "1");

let mut e_2 = LogEvent::from("test message 2");
e_2.insert("id", "1");

let mut e_3 = LogEvent::from("test message 3");
e_3.insert("id", "1");
e_3.insert("test_end", "yep");

for event in [e_1.into(), e_2.into(), e_3.into()] {
tx.send(event).await.unwrap();
}

// e_1 flushed when e_2 arrives (accumulated_bytes + e_2.size > 1)
let output_1 = out.recv().await.unwrap().into_log();
assert_eq!(output_1["message"], vec!["test message 1"].into());

// e_2 flushed when e_3 arrives
let output_2 = out.recv().await.unwrap().into_log();
assert_eq!(output_2["message"], vec!["test message 2"].into());

// e_3 flushed due to ends_when
let output_3 = out.recv().await.unwrap().into_log();
assert_eq!(output_3["message"], vec!["test message 3"].into());

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}

#[tokio::test]
async fn max_bytes_flushes_after_multiple_events() {
let mut e_1 = LogEvent::from("msg1");
e_1.insert("id", "1");
let mut e_2 = LogEvent::from("msg2");
e_2.insert("id", "1");
let mut e_3 = LogEvent::from("msg3");
e_3.insert("id", "1");
let mut e_4 = LogEvent::from("msg4");
e_4.insert("id", "1");

let size_1 = e_1.size_of();
let size_2 = e_2.size_of();
let size_3 = e_3.size_of();
let size_4 = e_4.size_of();

// Set max_bytes to allow e_1 + e_2 + e_3, but adding e_4 exceeds the limit
let max_bytes = size_1 + size_2 + size_3 + (size_4 / 2);

let config_str = format!(
r#"
group_by = [ "id" ]
merge_strategies.id = "retain"
merge_strategies.message = "array"
max_bytes = {}

[ends_when]
type = "vrl"
source = "exists(.test_end)"
"#,
max_bytes
);
let reduce_config = toml::from_str::<ReduceConfig>(&config_str).unwrap();

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;

// Recreate events (the originals were moved for size measurement)
let mut e_1 = LogEvent::from("msg1");
e_1.insert("id", "1");

let mut e_2 = LogEvent::from("msg2");
e_2.insert("id", "1");

let mut e_3 = LogEvent::from("msg3");
e_3.insert("id", "1");

let mut e_4 = LogEvent::from("msg4");
e_4.insert("id", "1");

let mut e_5 = LogEvent::from("msg5");
e_5.insert("id", "1");
e_5.insert("test_end", "yep");

for event in [e_1.into(), e_2.into(), e_3.into(), e_4.into(), e_5.into()] {
tx.send(event).await.unwrap();
}

// First output: events 1+2+3 flushed when event 4 arrives
let output_1 = out.recv().await.unwrap().into_log();
assert_eq!(output_1["message"], vec!["msg1", "msg2", "msg3"].into());

// Second output: events 4+5 flushed due to ends_when
let output_2 = out.recv().await.unwrap().into_log();
assert_eq!(output_2["message"], vec!["msg4", "msg5"].into());

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}

#[tokio::test]
async fn max_bytes_with_max_events() {
// Set both max_bytes and max_events - max_events should trigger first
let reduce_config = toml::from_str::<ReduceConfig>(
r#"
group_by = [ "id" ]
merge_strategies.id = "retain"
merge_strategies.message = "array"
max_events = 2
max_bytes = 100000
"#,
)
.unwrap();

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;

let mut e_1 = LogEvent::from("test 1");
e_1.insert("id", "1");

let mut e_2 = LogEvent::from("test 2");
e_2.insert("id", "1");

let mut e_3 = LogEvent::from("test 3");
e_3.insert("id", "1");

let mut e_4 = LogEvent::from("test 4");
e_4.insert("id", "1");

for event in [e_1.into(), e_2.into(), e_3.into(), e_4.into()] {
tx.send(event).await.unwrap();
}

// max_events=2 should cause flush after every 2 events
let output_1 = out.recv().await.unwrap().into_log();
assert_eq!(output_1["message"], vec!["test 1", "test 2"].into());

let output_2 = out.recv().await.unwrap().into_log();
assert_eq!(output_2["message"], vec!["test 3", "test 4"].into());

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}

#[tokio::test]
async fn max_bytes_with_group_by() {
// Verify byte tracking is per-group
let reduce_config = toml::from_str::<ReduceConfig>(
r#"
group_by = [ "id" ]
merge_strategies.id = "retain"
merge_strategies.message = "array"
max_bytes = 1

[ends_when]
type = "vrl"
source = "exists(.test_end)"
"#,
)
.unwrap();

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;

// Events for group "1"
let mut e_1a = LogEvent::from("group1 msg1");
e_1a.insert("id", "1");

// Events for group "2"
let mut e_2a = LogEvent::from("group2 msg1");
e_2a.insert("id", "2");

// Second event for group "1" - should trigger flush of first
let mut e_1b = LogEvent::from("group1 msg2");
e_1b.insert("id", "1");
e_1b.insert("test_end", "yep");

// Second event for group "2" - should trigger flush of first
let mut e_2b = LogEvent::from("group2 msg2");
e_2b.insert("id", "2");
e_2b.insert("test_end", "yep");

for event in [e_1a.into(), e_2a.into(), e_1b.into(), e_2b.into()] {
tx.send(event).await.unwrap();
}

// Group 1 first event flushed when e_1b arrives
let output_1 = out.recv().await.unwrap().into_log();
assert_eq!(output_1["id"], "1".into());
assert_eq!(output_1["message"], vec!["group1 msg1"].into());

// Group 1 second event flushed due to ends_when
let output_2 = out.recv().await.unwrap().into_log();
assert_eq!(output_2["id"], "1".into());
assert_eq!(output_2["message"], vec!["group1 msg2"].into());

// Group 2 first event flushed when e_2b arrives
let output_3 = out.recv().await.unwrap().into_log();
assert_eq!(output_3["id"], "2".into());
assert_eq!(output_3["message"], vec!["group2 msg1"].into());

// Group 2 second event flushed due to ends_when
let output_4 = out.recv().await.unwrap().into_log();
assert_eq!(output_4["id"], "2".into());
assert_eq!(output_4["message"], vec!["group2 msg2"].into());

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}
}
Loading