Skip to content

Commit d5c66b5

Browse files
authored
Distributor trait (#700)
* Distributor trait Introduce a distributor that knows how to partition containers across multiple pushers. This moves the partition logic from container builders into a bespoke trait and implementation instead of mixing two different concepts, building containers, and partitioning them. It allows us to implement a distributor in the future that only partitions by key, for example when the container doesn't have easy access by row. Removes the old `partition` function from container builders. Signed-off-by: Moritz Hoffmann <[email protected]> * Address feedback Signed-off-by: Moritz Hoffmann <[email protected]> * Guard against `resize_with not being efficient. Signed-off-by: Moritz Hoffmann <[email protected]> * Remove unneeded type parameters and assert pusher length Signed-off-by: Moritz Hoffmann <[email protected]> * Pass number of peers at construction time Signed-off-by: Moritz Hoffmann <[email protected]> --------- Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 4c92cf5 commit d5c66b5

File tree

3 files changed

+116
-76
lines changed

3 files changed

+116
-76
lines changed

container/src/lib.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,6 @@ pub trait ContainerBuilder: Default + 'static {
113113
/// be called repeatedly until it returns `None`.
114114
#[must_use]
115115
fn finish(&mut self) -> Option<&mut Self::Container>;
116-
/// Partitions `container` among `builders`, using the function `index` to direct items.
117-
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
118-
where
119-
Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
120-
I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
121-
{
122-
for datum in container.drain() {
123-
let index = index(&datum);
124-
builders[index].push_into(datum);
125-
}
126-
container.clear();
127-
}
128-
129116
/// Indicates a good moment to release resources.
130117
///
131118
/// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`]

timely/src/dataflow/channels/pact.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBu
1414
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
1515
use crate::communication::{Push, Pull};
1616
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
17+
use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
1718
use crate::dataflow::channels::Message;
1819
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
1920
use crate::progress::Timestamp;
@@ -79,20 +80,24 @@ where
7980
}
8081

8182
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
82-
impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
83+
impl<T: Timestamp, CB, H> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
8384
where
84-
CB: ContainerBuilder,
85-
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
85+
CB: ContainerBuilder + for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
8686
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
87-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
87+
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64 + 'static,
8888
{
89-
type Pusher = ExchangePusher<T, CB, LogPusher<Box<dyn Push<Message<T, CB::Container>>>>, H>;
89+
type Pusher = ExchangePusher<
90+
T,
91+
LogPusher<Box<dyn Push<Message<T, CB::Container>>>>,
92+
DrainContainerDistributor<CB, H>
93+
>;
9094
type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
9195

9296
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
9397
let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
9498
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
95-
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
99+
let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers());
100+
(ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
96101
}
97102
}
98103

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 105 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,57 +3,114 @@
33
use crate::communication::Push;
44
use crate::container::{ContainerBuilder, PushInto};
55
use crate::dataflow::channels::Message;
6-
use crate::{Container, Data};
6+
use crate::Container;
77

8-
// TODO : Software write combining
9-
/// Distributes records among target pushees according to a distribution function.
10-
pub struct Exchange<T, CB, P, H>
11-
where
12-
CB: ContainerBuilder,
13-
P: Push<Message<T, CB::Container>>,
14-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
15-
{
16-
pushers: Vec<P>,
8+
/// Distribute containers to several pushers.
9+
///
10+
/// A distributor sits behind an exchange pusher, and partitions containers at a given time
11+
/// into several pushers. It can use [`Message::push_at`] to push its outputs at the desired
12+
/// pusher.
13+
///
14+
/// It needs to uphold progress tracking requirements. The count of the input container
15+
/// must be preserved across the output containers, from the first call to `partition` until the
16+
/// call to `flush` for a specific time stamp.
17+
pub trait Distributor<C> {
18+
/// Partition the contents of `container` at `time` into the `pushers`.
19+
fn partition<T: Clone, P: Push<Message<T, C>>>(&mut self, container: &mut C, time: &T, pushers: &mut [P]);
20+
/// Flush any remaining contents into the `pushers` at time `time`.
21+
fn flush<T: Clone, P: Push<Message<T, C>>>(&mut self, time: &T, pushers: &mut [P]);
22+
/// Optionally release resources, such as memory.
23+
fn relax(&mut self);
24+
}
25+
26+
/// A distributor creating containers from a drainable container based
27+
/// on a hash function of the container's item.
28+
pub struct DrainContainerDistributor<CB, H> {
1729
builders: Vec<CB>,
18-
current: Option<T>,
1930
hash_func: H,
2031
}
2132

22-
impl<T: Clone, CB, P, H> Exchange<T, CB, P, H>
33+
impl<CB: Default, H> DrainContainerDistributor<CB, H> {
34+
/// Constructs a new `DrainContainerDistributor` with the given hash function for a number of
35+
/// peers.
36+
pub fn new(hash_func: H, peers: usize) -> Self {
37+
Self {
38+
builders: std::iter::repeat_with(Default::default).take(peers).collect(),
39+
hash_func,
40+
}
41+
}
42+
}
43+
44+
impl<CB, H> Distributor<CB::Container> for DrainContainerDistributor<CB, H>
2345
where
24-
CB: ContainerBuilder,
25-
P: Push<Message<T, CB::Container>>,
26-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
46+
CB: ContainerBuilder + for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
47+
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64,
2748
{
28-
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
29-
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, CB, P, H> {
30-
let builders = std::iter::repeat_with(Default::default).take(pushers.len()).collect();
31-
Exchange {
32-
pushers,
33-
hash_func: key,
34-
builders,
35-
current: None,
49+
fn partition<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
50+
debug_assert_eq!(self.builders.len(), pushers.len());
51+
if pushers.len().is_power_of_two() {
52+
let mask = (pushers.len() - 1) as u64;
53+
for datum in container.drain() {
54+
let index = ((self.hash_func)(&datum) & mask) as usize;
55+
self.builders[index].push_into(datum);
56+
while let Some(produced) = self.builders[index].extract() {
57+
Message::push_at(produced, time.clone(), &mut pushers[index]);
58+
}
59+
}
60+
}
61+
else {
62+
let num_pushers = pushers.len() as u64;
63+
for datum in container.drain() {
64+
let index = ((self.hash_func)(&datum) % num_pushers) as usize;
65+
self.builders[index].push_into(datum);
66+
while let Some(produced) = self.builders[index].extract() {
67+
Message::push_at(produced, time.clone(), &mut pushers[index]);
68+
}
69+
}
3670
}
3771
}
38-
#[inline]
39-
fn flush(&mut self, index: usize) {
40-
while let Some(container) = self.builders[index].finish() {
41-
if let Some(ref time) = self.current {
42-
Message::push_at(container, time.clone(), &mut self.pushers[index]);
72+
73+
fn flush<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, time: &T, pushers: &mut [P]) {
74+
for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) {
75+
while let Some(container) = builder.finish() {
76+
Message::push_at(container, time.clone(), pusher);
4377
}
4478
}
4579
}
80+
81+
fn relax(&mut self) {
82+
for builder in &mut self.builders {
83+
builder.relax();
84+
}
85+
}
4686
}
4787

48-
impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
88+
// TODO : Software write combining
89+
/// Distributes records among target pushees according to a distributor.
90+
pub struct Exchange<T, P, D> {
91+
pushers: Vec<P>,
92+
current: Option<T>,
93+
distributor: D,
94+
}
95+
96+
impl<T: Clone, P, D> Exchange<T, P, D> {
97+
/// Allocates a new `Exchange` from a supplied set of pushers and a distributor.
98+
pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, P, D> {
99+
Exchange {
100+
pushers,
101+
current: None,
102+
distributor,
103+
}
104+
}
105+
}
106+
107+
impl<T: Eq+Clone, C, P, D> Push<Message<T, C>> for Exchange<T, P, D>
49108
where
50-
CB: ContainerBuilder,
51-
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
52-
P: Push<Message<T, CB::Container>>,
53-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
109+
P: Push<Message<T, C>>,
110+
D: Distributor<C>,
54111
{
55112
#[inline(never)]
56-
fn push(&mut self, message: &mut Option<Message<T, CB::Container>>) {
113+
fn push(&mut self, message: &mut Option<Message<T, C>>) {
57114
// if only one pusher, no exchange
58115
if self.pushers.len() == 1 {
59116
self.pushers[0].push(message);
@@ -64,36 +121,27 @@ where
64121
let data = &mut message.data;
65122

66123
// if the time isn't right, flush everything.
67-
if self.current.as_ref().is_some_and(|x| x != time) {
68-
for index in 0..self.pushers.len() {
69-
self.flush(index);
124+
match self.current.as_ref() {
125+
// We have a current time, and it is different from the new time.
126+
Some(current_time) if current_time != time => {
127+
self.distributor.flush(current_time, &mut self.pushers);
128+
self.current = Some(time.clone());
70129
}
130+
// We had no time before, or flushed.
131+
None => self.current = Some(time.clone()),
132+
// Time didn't change since last call.
133+
_ => {}
71134
}
72-
self.current = Some(time.clone());
73-
74-
let hash_func = &mut self.hash_func;
75135

76-
// if the number of pushers is a power of two, use a mask
77-
if self.pushers.len().is_power_of_two() {
78-
let mask = (self.pushers.len() - 1) as u64;
79-
CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) & mask) as usize);
80-
}
81-
// as a last resort, use mod (%)
82-
else {
83-
let num_pushers = self.pushers.len() as u64;
84-
CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) % num_pushers) as usize);
85-
}
86-
for (buffer, pusher) in self.builders.iter_mut().zip(self.pushers.iter_mut()) {
87-
while let Some(container) = buffer.extract() {
88-
Message::push_at(container, time.clone(), pusher);
89-
}
90-
}
136+
self.distributor.partition(data, time, &mut self.pushers);
91137
}
92138
else {
93139
// flush
140+
if let Some(time) = self.current.take() {
141+
self.distributor.flush(&time, &mut self.pushers);
142+
}
143+
self.distributor.relax();
94144
for index in 0..self.pushers.len() {
95-
self.flush(index);
96-
self.builders[index].relax();
97145
self.pushers[index].push(&mut None);
98146
}
99147
}

0 commit comments

Comments
 (0)