From 60cb0c1e1198131b511a1dcb2163ec39ee431a17 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 16 Jun 2025 10:10:54 +0200 Subject: [PATCH 1/6] Flush container builders in exchange Gives container builders the option to drop their allocations in flush in exchange. Other places might benefit from this, too, but exchange has a quadratic number of builders, so not dropping large allocations is can cause a memory regression. Signed-off-by: Moritz Hoffmann --- container/src/lib.rs | 6 ++++++ timely/examples/columnar.rs | 6 ++++++ .../src/dataflow/channels/pushers/exchange.rs | 17 +++++++++-------- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/container/src/lib.rs b/container/src/lib.rs index e9aec213a..11fdc710b 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -125,6 +125,12 @@ pub trait ContainerBuilder: Default + 'static { } container.clear(); } + + /// Indicates a good moment to release resources. By default, does nothing. Callers should + /// not rely on this method releasing any internal state though, i.e., the caller first + /// needs to drain the contents using [`Self::finish`]. + #[inline] + fn flush(&mut self) { } } /// A wrapper trait indicating that the container building will preserve the number of records. diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index b59ddcae6..247c28718 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -300,6 +300,7 @@ mod builder { } impl Default for ColumnBuilder { + #[inline] fn default() -> Self { ColumnBuilder { current: Default::default(), @@ -331,6 +332,11 @@ mod builder { self.empty = self.pending.pop_front(); self.empty.as_mut() } + + #[inline] + fn flush(&mut self) { + *self = Self::default(); + } } impl LengthPreservingContainerBuilder for ColumnBuilder where C::Container: Clone { } diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index f0de36274..b8868b24f 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -14,7 +14,7 @@ where for<'a> H: FnMut(&::Item<'a>) -> u64 { pushers: Vec

, - buffers: Vec, + builders: Vec, current: Option, hash_func: H, } @@ -27,20 +27,20 @@ where { /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. pub fn new(pushers: Vec

, key: H) -> Exchange { - let mut buffers = vec![]; + let mut builders = vec![]; for _ in 0..pushers.len() { - buffers.push(Default::default()); + builders.push(Default::default()); } Exchange { pushers, hash_func: key, - buffers, + builders, current: None, } } #[inline] fn flush(&mut self, index: usize) { - while let Some(container) = self.buffers[index].finish() { + while let Some(container) = self.builders[index].finish() { if let Some(ref time) = self.current { Message::push_at(container, time.clone(), &mut self.pushers[index]); } @@ -79,14 +79,14 @@ where // if the number of pushers is a power of two, use a mask if self.pushers.len().is_power_of_two() { let mask = (self.pushers.len() - 1) as u64; - CB::partition(data, &mut self.buffers, |datum| ((hash_func)(datum) & mask) as usize); + CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) & mask) as usize); } // as a last resort, use mod (%) else { let num_pushers = self.pushers.len() as u64; - CB::partition(data, &mut self.buffers, |datum| ((hash_func)(datum) % num_pushers) as usize); + CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) % num_pushers) as usize); } - for (buffer, pusher) in self.buffers.iter_mut().zip(self.pushers.iter_mut()) { + for (buffer, pusher) in self.builders.iter_mut().zip(self.pushers.iter_mut()) { while let Some(container) = buffer.extract() { Message::push_at(container, time.clone(), pusher); } @@ -96,6 +96,7 @@ where // flush for index in 0..self.pushers.len() { self.flush(index); + self.builders[index].flush(); self.pushers[index].push(&mut None); } } From 67385debce2bf79af708615073fc59990db16966 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 16 Jun 2025 14:51:11 +0200 Subject: [PATCH 2/6] Also flush session buffer Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/channels/pushers/buffer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index af90f955b..ff0eadd52 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -81,6 +81,7 @@ impl>> Buffer Date: Tue, 17 Jun 2025 09:58:35 +0200 Subject: [PATCH 3/6] Rename to relax Signed-off-by: Moritz Hoffmann --- container/src/lib.rs | 2 +- timely/examples/columnar.rs | 2 +- timely/src/dataflow/channels/pushers/buffer.rs | 2 +- timely/src/dataflow/channels/pushers/exchange.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/container/src/lib.rs b/container/src/lib.rs index 11fdc710b..25f7321d3 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -130,7 +130,7 @@ pub trait ContainerBuilder: Default + 'static { /// not rely on this method releasing any internal state though, i.e., the caller first /// needs to drain the contents using [`Self::finish`]. #[inline] - fn flush(&mut self) { } + fn relax(&mut self) { } } /// A wrapper trait indicating that the container building will preserve the number of records. diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 247c28718..5f658822f 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -334,7 +334,7 @@ mod builder { } #[inline] - fn flush(&mut self) { + fn relax(&mut self) { *self = Self::default(); } } diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index ff0eadd52..be579936c 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -81,7 +81,7 @@ impl>> Buffer Date: Tue, 24 Jun 2025 15:18:19 +0200 Subject: [PATCH 4/6] Guard against data loss Signed-off-by: Moritz Hoffmann --- timely/examples/columnar.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 5f658822f..305cf5b1c 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -335,6 +335,10 @@ mod builder { #[inline] fn relax(&mut self) { + /// The caller is responsible for draining all contents; assert that we are empty. + /// The assertion is not strictly necessary, but it helps catch bugs. + debug_assert!(self.current.is_empty()); + debug_assert!(self.pending.is_empty()); *self = Self::default(); } } From 0e1049bca26c26da0fe9096ead29076a6f94957c Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 24 Jun 2025 15:21:19 +0200 Subject: [PATCH 5/6] Always assert Signed-off-by: Moritz Hoffmann --- timely/examples/columnar.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 305cf5b1c..2f7e8eef7 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -337,8 +337,8 @@ mod builder { fn relax(&mut self) { /// The caller is responsible for draining all contents; assert that we are empty. /// The assertion is not strictly necessary, but it helps catch bugs. - debug_assert!(self.current.is_empty()); - debug_assert!(self.pending.is_empty()); + assert!(self.current.is_empty()); + assert!(self.pending.is_empty()); *self = Self::default(); } } From 8059744dd2b0572366652e76baf8c4e0d9837ea8 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 24 Jun 2025 15:31:54 +0200 Subject: [PATCH 6/6] Improve doc Signed-off-by: Moritz Hoffmann --- container/src/lib.rs | 8 +++++--- timely/examples/columnar.rs | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/container/src/lib.rs b/container/src/lib.rs index 25f7321d3..839f80167 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -126,9 +126,11 @@ pub trait ContainerBuilder: Default + 'static { container.clear(); } - /// Indicates a good moment to release resources. By default, does nothing. Callers should - /// not rely on this method releasing any internal state though, i.e., the caller first - /// needs to drain the contents using [`Self::finish`]. + /// Indicates a good moment to release resources. + /// + /// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`] + /// before calling this function. The implementation should not change the contents of the + /// builder. #[inline] fn relax(&mut self) { } } diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 2f7e8eef7..7271ccc76 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -335,8 +335,8 @@ mod builder { #[inline] fn relax(&mut self) { - /// The caller is responsible for draining all contents; assert that we are empty. - /// The assertion is not strictly necessary, but it helps catch bugs. + // The caller is responsible for draining all contents; assert that we are empty. + // The assertion is not strictly necessary, but it helps catch bugs. assert!(self.current.is_empty()); assert!(self.pending.is_empty()); *self = Self::default();