From 391fc39645fc7903ea82dd5db8125bee83b26e89 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sat, 16 Aug 2025 21:16:04 +0200 Subject: [PATCH 1/5] Distributor trait Introduce a distributor that knows how to partition containers across multiple pushers. This moves the partition logic from container builders into a bespoke trait and implementation instead of mixing two different concepts, building containers, and partitioning them. It allows us to implement a distributor in the future that only partitions by key, for example when the container doesn't have easy access by row. Removes the old `partition` function from container builders. Signed-off-by: Moritz Hoffmann --- container/src/lib.rs | 13 -- timely/src/dataflow/channels/pact.rs | 18 +- .../src/dataflow/channels/pushers/exchange.rs | 155 ++++++++++++------ 3 files changed, 115 insertions(+), 71 deletions(-) diff --git a/container/src/lib.rs b/container/src/lib.rs index 839f80167..2ff6589fe 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -113,19 +113,6 @@ pub trait ContainerBuilder: Default + 'static { /// be called repeatedly until it returns `None`. #[must_use] fn finish(&mut self) -> Option<&mut Self::Container>; - /// Partitions `container` among `builders`, using the function `index` to direct items. - fn partition(container: &mut Self::Container, builders: &mut [Self], mut index: I) - where - Self: for<'a> PushInto<::Item<'a>>, - I: for<'a> FnMut(&::Item<'a>) -> usize, - { - for datum in container.drain() { - let index = index(&datum); - builders[index].push_into(datum); - } - container.clear(); - } - /// Indicates a good moment to release resources. /// /// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`] diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index fb2f95e7d..a88c6336a 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -14,6 +14,7 @@ use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBu use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::communication::{Push, Pull}; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; +use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor; use crate::dataflow::channels::Message; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; use crate::progress::Timestamp; @@ -79,20 +80,25 @@ where } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -impl ParallelizationContract for ExchangeCore +impl ParallelizationContract for ExchangeCore where - CB: ContainerBuilder, - CB: for<'a> PushInto<::Item<'a>>, + CB: ContainerBuilder + for<'a> PushInto<::Item<'a>>, CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes, - for<'a> H: FnMut(&::Item<'a>) -> u64 + for<'a> H: FnMut(&::Item<'a>) -> u64 + 'static, { - type Pusher = ExchangePusher>>>, H>; + type Pusher = ExchangePusher< + T, + CB::Container, + LogPusher>>>, + DrainContainerDistributor>>>, H> + >; type Puller = LogPuller>>>; fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) + let distributor = DrainContainerDistributor::new(self.hash_func); + (ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) } } diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 9b7baeece..a1a25de05 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -5,55 +5,122 @@ use crate::container::{ContainerBuilder, PushInto}; use crate::dataflow::channels::Message; use crate::{Container, Data}; -// TODO : Software write combining -/// Distributes records among target pushees according to a distribution function. -pub struct Exchange +/// Distribute containers to several pushers. +/// +/// A distributor sits behind an exchange pusher, and partitions containers at a given time +/// into several pushers. It can use [`Message::push_at`] to push its outputs at the desired +/// pusher. +/// +/// It needs to uphold progress tracking requirements. The count of the input container +/// must be preserved across the output containers, from the first call to `partition` until the +/// call to `flush` for a specific time stamp. +pub trait Distributor { + /// Partition the contents of `container` at `time` into the `pushers`. + fn partition(&mut self, container: &mut C, time: &T, pushers: &mut [P]); + /// Flush any remaining contents into the `pushers` at time `time`. + fn flush(&mut self, time: &T, pushers: &mut [P]); + /// Optionally release resources, such as memory. + fn relax(&mut self); +} + +/// A distributor creating containers from a drainable container based +/// on a hash function of the container's item. +pub struct DrainContainerDistributor { + builders: Vec, + hash_func: H, + _phantom: std::marker::PhantomData<(T, P)>, +} + +impl DrainContainerDistributor { + /// Allocates a new `DrainContainerPartitioner` with the given pusher count and hash function. + pub fn new(hash_func: H) -> Self { + DrainContainerDistributor { + builders: Vec::new(), + hash_func, + _phantom: std::marker::PhantomData, + } + } +} + +impl Distributor for DrainContainerDistributor where - CB: ContainerBuilder, + CB: ContainerBuilder + for<'a> PushInto<::Item<'a>>, + CB::Container: Container, + T: Clone, P: Push>, - for<'a> H: FnMut(&::Item<'a>) -> u64 + for<'a> H: FnMut(&::Item<'a>) -> u64, { + fn partition(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) { + self.builders.resize_with(pushers.len(), Default::default); + if pushers.len().is_power_of_two() { + let mask = (pushers.len() - 1) as u64; + for datum in container.drain() { + let index = ((self.hash_func)(&datum) & mask) as usize; + self.builders[index].push_into(datum); + while let Some(produced) = self.builders[index].extract() { + Message::push_at(produced, time.clone(), &mut pushers[index]); + } + } + } + else { + let num_pushers = pushers.len() as u64; + for datum in container.drain() { + let index = ((self.hash_func)(&datum) % num_pushers) as usize; + self.builders[index].push_into(datum); + while let Some(produced) = self.builders[index].extract() { + Message::push_at(produced, time.clone(), &mut pushers[index]); + } + } + } + } + + fn flush(&mut self, time: &T, pushers: &mut [P]) { + for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) { + while let Some(container) = builder.finish() { + Message::push_at(container, time.clone(), pusher); + } + } + } + + fn relax(&mut self) { + for builder in &mut self.builders { + builder.relax(); + } + } +} + +// TODO : Software write combining +/// Distributes records among target pushees according to a distributor. +pub struct Exchange { pushers: Vec

, - builders: Vec, current: Option, - hash_func: H, + distributor: D, + _phantom: std::marker::PhantomData, } -impl Exchange +impl Exchange where - CB: ContainerBuilder, - P: Push>, - for<'a> H: FnMut(&::Item<'a>) -> u64 + P: Push>, + D: Distributor, { - /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. - pub fn new(pushers: Vec

, key: H) -> Exchange { - let builders = std::iter::repeat_with(Default::default).take(pushers.len()).collect(); + /// Allocates a new `Exchange` from a supplied set of pushers and a distributor. + pub fn new(pushers: Vec

, distributor: D) -> Exchange { Exchange { pushers, - hash_func: key, - builders, current: None, - } - } - #[inline] - fn flush(&mut self, index: usize) { - while let Some(container) = self.builders[index].finish() { - if let Some(ref time) = self.current { - Message::push_at(container, time.clone(), &mut self.pushers[index]); - } + distributor, + _phantom: std::marker::PhantomData, } } } -impl Push> for Exchange +impl Push> for Exchange where - CB: ContainerBuilder, - CB: for<'a> PushInto<::Item<'a>>, - P: Push>, - for<'a> H: FnMut(&::Item<'a>) -> u64 + P: Push>, + D: Distributor, { #[inline(never)] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange if self.pushers.len() == 1 { self.pushers[0].push(message); @@ -65,35 +132,19 @@ where // if the time isn't right, flush everything. if self.current.as_ref().is_some_and(|x| x != time) { - for index in 0..self.pushers.len() { - self.flush(index); - } + self.distributor.flush(time, &mut self.pushers); } self.current = Some(time.clone()); - let hash_func = &mut self.hash_func; - - // if the number of pushers is a power of two, use a mask - if self.pushers.len().is_power_of_two() { - let mask = (self.pushers.len() - 1) as u64; - CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) & mask) as usize); - } - // as a last resort, use mod (%) - else { - let num_pushers = self.pushers.len() as u64; - CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) % num_pushers) as usize); - } - for (buffer, pusher) in self.builders.iter_mut().zip(self.pushers.iter_mut()) { - while let Some(container) = buffer.extract() { - Message::push_at(container, time.clone(), pusher); - } - } + self.distributor.partition(data, time, &mut self.pushers); } else { // flush + if let Some(time) = self.current.as_ref() { + self.distributor.flush(time, &mut self.pushers); + } + self.distributor.relax(); for index in 0..self.pushers.len() { - self.flush(index); - self.builders[index].relax(); self.pushers[index].push(&mut None); } } From d3e47a9ed416404e4525b3e86a9b9be59a5aa60b Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sat, 16 Aug 2025 22:13:58 +0200 Subject: [PATCH 2/5] Address feedback Signed-off-by: Moritz Hoffmann --- .../src/dataflow/channels/pushers/exchange.rs | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index a1a25de05..158aedc1c 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -3,7 +3,7 @@ use crate::communication::Push; use crate::container::{ContainerBuilder, PushInto}; use crate::dataflow::channels::Message; -use crate::{Container, Data}; +use crate::Container; /// Distribute containers to several pushers. /// @@ -32,9 +32,9 @@ pub struct DrainContainerDistributor { } impl DrainContainerDistributor { - /// Allocates a new `DrainContainerPartitioner` with the given pusher count and hash function. + /// Constructs a new `DrainContainerDistributor` with the given hash function. pub fn new(hash_func: H) -> Self { - DrainContainerDistributor { + Self { builders: Vec::new(), hash_func, _phantom: std::marker::PhantomData, @@ -98,11 +98,7 @@ pub struct Exchange { _phantom: std::marker::PhantomData, } -impl Exchange -where - P: Push>, - D: Distributor, -{ +impl Exchange { /// Allocates a new `Exchange` from a supplied set of pushers and a distributor. pub fn new(pushers: Vec

, distributor: D) -> Exchange { Exchange { @@ -114,7 +110,7 @@ where } } -impl Push> for Exchange +impl Push> for Exchange where P: Push>, D: Distributor, @@ -131,17 +127,24 @@ where let data = &mut message.data; // if the time isn't right, flush everything. - if self.current.as_ref().is_some_and(|x| x != time) { - self.distributor.flush(time, &mut self.pushers); + match self.current.as_ref() { + // We have a current time, and it is different from the new time. + Some(current_time) if current_time != time => { + self.distributor.flush(current_time, &mut self.pushers); + self.current = Some(time.clone()); + } + // We had no time before, or flushed. + None => self.current = Some(time.clone()), + // Time didn't change since last call. + _ => {} } - self.current = Some(time.clone()); self.distributor.partition(data, time, &mut self.pushers); } else { // flush - if let Some(time) = self.current.as_ref() { - self.distributor.flush(time, &mut self.pushers); + if let Some(time) = self.current.take() { + self.distributor.flush(&time, &mut self.pushers); } self.distributor.relax(); for index in 0..self.pushers.len() { From 730fcfcb77bbab62a8032b47797efbf70d46cff2 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sat, 16 Aug 2025 22:28:50 +0200 Subject: [PATCH 3/5] Guard against `resize_with not being efficient. Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/channels/pushers/exchange.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 158aedc1c..9dde61e95 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -51,7 +51,9 @@ where for<'a> H: FnMut(&::Item<'a>) -> u64, { fn partition(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) { - self.builders.resize_with(pushers.len(), Default::default); + if self.builders.len() != pushers.len() { + self.builders.resize_with(pushers.len(), Default::default); + } if pushers.len().is_power_of_two() { let mask = (pushers.len() - 1) as u64; for datum in container.drain() { From b392c87cb3037521445db87199947e7966d61f2f Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 28 Aug 2025 21:05:35 +0200 Subject: [PATCH 4/5] Remove unneeded type parameters and assert pusher length Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/channels/pact.rs | 3 +- .../src/dataflow/channels/pushers/exchange.rs | 38 +++++++++---------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index a88c6336a..5d04d867f 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -88,9 +88,8 @@ where { type Pusher = ExchangePusher< T, - CB::Container, LogPusher>>>, - DrainContainerDistributor>>>, H> + DrainContainerDistributor >; type Puller = LogPuller>>>; diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 9dde61e95..d607b8add 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -14,46 +14,44 @@ use crate::Container; /// It needs to uphold progress tracking requirements. The count of the input container /// must be preserved across the output containers, from the first call to `partition` until the /// call to `flush` for a specific time stamp. -pub trait Distributor { +pub trait Distributor { /// Partition the contents of `container` at `time` into the `pushers`. - fn partition(&mut self, container: &mut C, time: &T, pushers: &mut [P]); + fn partition>>(&mut self, container: &mut C, time: &T, pushers: &mut [P]); /// Flush any remaining contents into the `pushers` at time `time`. - fn flush(&mut self, time: &T, pushers: &mut [P]); + fn flush>>(&mut self, time: &T, pushers: &mut [P]); /// Optionally release resources, such as memory. fn relax(&mut self); } /// A distributor creating containers from a drainable container based /// on a hash function of the container's item. -pub struct DrainContainerDistributor { +pub struct DrainContainerDistributor { builders: Vec, hash_func: H, - _phantom: std::marker::PhantomData<(T, P)>, } -impl DrainContainerDistributor { +impl DrainContainerDistributor { /// Constructs a new `DrainContainerDistributor` with the given hash function. pub fn new(hash_func: H) -> Self { Self { builders: Vec::new(), hash_func, - _phantom: std::marker::PhantomData, } } } -impl Distributor for DrainContainerDistributor +impl Distributor for DrainContainerDistributor where CB: ContainerBuilder + for<'a> PushInto<::Item<'a>>, - CB::Container: Container, - T: Clone, - P: Push>, for<'a> H: FnMut(&::Item<'a>) -> u64, { - fn partition(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) { - if self.builders.len() != pushers.len() { + fn partition>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) { + if self.builders.len() <= pushers.len() { self.builders.resize_with(pushers.len(), Default::default); } + else { + debug_assert_eq!(self.builders.len(), pushers.len()); + } if pushers.len().is_power_of_two() { let mask = (pushers.len() - 1) as u64; for datum in container.drain() { @@ -76,7 +74,7 @@ where } } - fn flush(&mut self, time: &T, pushers: &mut [P]) { + fn flush>>(&mut self, time: &T, pushers: &mut [P]) { for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) { while let Some(container) = builder.finish() { Message::push_at(container, time.clone(), pusher); @@ -93,29 +91,27 @@ where // TODO : Software write combining /// Distributes records among target pushees according to a distributor. -pub struct Exchange { +pub struct Exchange { pushers: Vec

, current: Option, distributor: D, - _phantom: std::marker::PhantomData, } -impl Exchange { +impl Exchange { /// Allocates a new `Exchange` from a supplied set of pushers and a distributor. - pub fn new(pushers: Vec

, distributor: D) -> Exchange { + pub fn new(pushers: Vec

, distributor: D) -> Exchange { Exchange { pushers, current: None, distributor, - _phantom: std::marker::PhantomData, } } } -impl Push> for Exchange +impl Push> for Exchange where P: Push>, - D: Distributor, + D: Distributor, { #[inline(never)] fn push(&mut self, message: &mut Option>) { From 31045e6b94a4a1751d15b1278bf9f60ac1e6c881 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 28 Aug 2025 21:10:02 +0200 Subject: [PATCH 5/5] Pass number of peers at construction time Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/channels/pact.rs | 2 +- timely/src/dataflow/channels/pushers/exchange.rs | 14 +++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 5d04d867f..bb99cd277 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -96,7 +96,7 @@ where fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - let distributor = DrainContainerDistributor::new(self.hash_func); + let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers()); (ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) } } diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index d607b8add..e72085b4f 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -31,10 +31,11 @@ pub struct DrainContainerDistributor { } impl DrainContainerDistributor { - /// Constructs a new `DrainContainerDistributor` with the given hash function. - pub fn new(hash_func: H) -> Self { + /// Constructs a new `DrainContainerDistributor` with the given hash function for a number of + /// peers. + pub fn new(hash_func: H, peers: usize) -> Self { Self { - builders: Vec::new(), + builders: std::iter::repeat_with(Default::default).take(peers).collect(), hash_func, } } @@ -46,12 +47,7 @@ where for<'a> H: FnMut(&::Item<'a>) -> u64, { fn partition>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) { - if self.builders.len() <= pushers.len() { - self.builders.resize_with(pushers.len(), Default::default); - } - else { - debug_assert_eq!(self.builders.len(), pushers.len()); - } + debug_assert_eq!(self.builders.len(), pushers.len()); if pushers.len().is_power_of_two() { let mask = (pushers.len() - 1) as u64; for datum in container.drain() {