Skip to content

Commit 391fc39

Browse files
committed
Distributor trait
Introduce a distributor that knows how to partition containers across multiple pushers. This moves the partition logic from container builders into a bespoke trait and implementation instead of mixing two different concepts, building containers, and partitioning them. It allows us to implement a distributor in the future that only partitions by key, for example when the container doesn't have easy access by row. Removes the old `partition` function from container builders. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent c06a057 commit 391fc39

File tree

3 files changed

+115
-71
lines changed

3 files changed

+115
-71
lines changed

container/src/lib.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,6 @@ pub trait ContainerBuilder: Default + 'static {
113113
/// be called repeatedly until it returns `None`.
114114
#[must_use]
115115
fn finish(&mut self) -> Option<&mut Self::Container>;
116-
/// Partitions `container` among `builders`, using the function `index` to direct items.
117-
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
118-
where
119-
Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
120-
I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
121-
{
122-
for datum in container.drain() {
123-
let index = index(&datum);
124-
builders[index].push_into(datum);
125-
}
126-
container.clear();
127-
}
128-
129116
/// Indicates a good moment to release resources.
130117
///
131118
/// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`]

timely/src/dataflow/channels/pact.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBu
1414
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
1515
use crate::communication::{Push, Pull};
1616
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
17+
use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
1718
use crate::dataflow::channels::Message;
1819
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
1920
use crate::progress::Timestamp;
@@ -79,20 +80,25 @@ where
7980
}
8081

8182
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
82-
impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
83+
impl<T: Timestamp, CB, H> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
8384
where
84-
CB: ContainerBuilder,
85-
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
85+
CB: ContainerBuilder + for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
8686
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
87-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
87+
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64 + 'static,
8888
{
89-
type Pusher = ExchangePusher<T, CB, LogPusher<Box<dyn Push<Message<T, CB::Container>>>>, H>;
89+
type Pusher = ExchangePusher<
90+
T,
91+
CB::Container,
92+
LogPusher<Box<dyn Push<Message<T, CB::Container>>>>,
93+
DrainContainerDistributor<CB, T, LogPusher<Box<dyn Push<Message<T, CB::Container>>>>, H>
94+
>;
9095
type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
9196

9297
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
9398
let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
9499
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
95-
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
100+
let distributor = DrainContainerDistributor::new(self.hash_func);
101+
(ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
96102
}
97103
}
98104

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 103 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,55 +5,122 @@ use crate::container::{ContainerBuilder, PushInto};
55
use crate::dataflow::channels::Message;
66
use crate::{Container, Data};
77

8-
// TODO : Software write combining
9-
/// Distributes records among target pushees according to a distribution function.
10-
pub struct Exchange<T, CB, P, H>
8+
/// Distribute containers to several pushers.
9+
///
10+
/// A distributor sits behind an exchange pusher, and partitions containers at a given time
11+
/// into several pushers. It can use [`Message::push_at`] to push its outputs at the desired
12+
/// pusher.
13+
///
14+
/// It needs to uphold progress tracking requirements. The count of the input container
15+
/// must be preserved across the output containers, from the first call to `partition` until the
16+
/// call to `flush` for a specific time stamp.
17+
pub trait Distributor<T, C, P> {
18+
/// Partition the contents of `container` at `time` into the `pushers`.
19+
fn partition(&mut self, container: &mut C, time: &T, pushers: &mut [P]);
20+
/// Flush any remaining contents into the `pushers` at time `time`.
21+
fn flush(&mut self, time: &T, pushers: &mut [P]);
22+
/// Optionally release resources, such as memory.
23+
fn relax(&mut self);
24+
}
25+
26+
/// A distributor creating containers from a drainable container based
27+
/// on a hash function of the container's item.
28+
pub struct DrainContainerDistributor<CB, T, P, H> {
29+
builders: Vec<CB>,
30+
hash_func: H,
31+
_phantom: std::marker::PhantomData<(T, P)>,
32+
}
33+
34+
impl<CB: Default, T, P, H> DrainContainerDistributor<CB, T, P, H> {
35+
/// Allocates a new `DrainContainerPartitioner` with the given pusher count and hash function.
36+
pub fn new(hash_func: H) -> Self {
37+
DrainContainerDistributor {
38+
builders: Vec::new(),
39+
hash_func,
40+
_phantom: std::marker::PhantomData,
41+
}
42+
}
43+
}
44+
45+
impl<CB, T, P, H> Distributor<T, CB::Container, P> for DrainContainerDistributor<CB, T, P, H>
1146
where
12-
CB: ContainerBuilder,
47+
CB: ContainerBuilder + for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
48+
CB::Container: Container,
49+
T: Clone,
1350
P: Push<Message<T, CB::Container>>,
14-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
51+
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64,
1552
{
53+
fn partition(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
54+
self.builders.resize_with(pushers.len(), Default::default);
55+
if pushers.len().is_power_of_two() {
56+
let mask = (pushers.len() - 1) as u64;
57+
for datum in container.drain() {
58+
let index = ((self.hash_func)(&datum) & mask) as usize;
59+
self.builders[index].push_into(datum);
60+
while let Some(produced) = self.builders[index].extract() {
61+
Message::push_at(produced, time.clone(), &mut pushers[index]);
62+
}
63+
}
64+
}
65+
else {
66+
let num_pushers = pushers.len() as u64;
67+
for datum in container.drain() {
68+
let index = ((self.hash_func)(&datum) % num_pushers) as usize;
69+
self.builders[index].push_into(datum);
70+
while let Some(produced) = self.builders[index].extract() {
71+
Message::push_at(produced, time.clone(), &mut pushers[index]);
72+
}
73+
}
74+
}
75+
}
76+
77+
fn flush(&mut self, time: &T, pushers: &mut [P]) {
78+
for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) {
79+
while let Some(container) = builder.finish() {
80+
Message::push_at(container, time.clone(), pusher);
81+
}
82+
}
83+
}
84+
85+
fn relax(&mut self) {
86+
for builder in &mut self.builders {
87+
builder.relax();
88+
}
89+
}
90+
}
91+
92+
// TODO : Software write combining
93+
/// Distributes records among target pushees according to a distributor.
94+
pub struct Exchange<T, C, P, D> {
1695
pushers: Vec<P>,
17-
builders: Vec<CB>,
1896
current: Option<T>,
19-
hash_func: H,
97+
distributor: D,
98+
_phantom: std::marker::PhantomData<C>,
2099
}
21100

22-
impl<T: Clone, CB, P, H> Exchange<T, CB, P, H>
101+
impl<T: Clone, C, P, D> Exchange<T, C, P, D>
23102
where
24-
CB: ContainerBuilder,
25-
P: Push<Message<T, CB::Container>>,
26-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
103+
P: Push<Message<T, C>>,
104+
D: Distributor<T, C, P>,
27105
{
28-
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
29-
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, CB, P, H> {
30-
let builders = std::iter::repeat_with(Default::default).take(pushers.len()).collect();
106+
/// Allocates a new `Exchange` from a supplied set of pushers and a distributor.
107+
pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, C, P, D> {
31108
Exchange {
32109
pushers,
33-
hash_func: key,
34-
builders,
35110
current: None,
36-
}
37-
}
38-
#[inline]
39-
fn flush(&mut self, index: usize) {
40-
while let Some(container) = self.builders[index].finish() {
41-
if let Some(ref time) = self.current {
42-
Message::push_at(container, time.clone(), &mut self.pushers[index]);
43-
}
111+
distributor,
112+
_phantom: std::marker::PhantomData,
44113
}
45114
}
46115
}
47116

48-
impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
117+
impl<T: Eq+Data, C, P, D> Push<Message<T, C>> for Exchange<T, C, P, D>
49118
where
50-
CB: ContainerBuilder,
51-
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
52-
P: Push<Message<T, CB::Container>>,
53-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
119+
P: Push<Message<T, C>>,
120+
D: Distributor<T, C, P>,
54121
{
55122
#[inline(never)]
56-
fn push(&mut self, message: &mut Option<Message<T, CB::Container>>) {
123+
fn push(&mut self, message: &mut Option<Message<T, C>>) {
57124
// if only one pusher, no exchange
58125
if self.pushers.len() == 1 {
59126
self.pushers[0].push(message);
@@ -65,35 +132,19 @@ where
65132

66133
// if the time isn't right, flush everything.
67134
if self.current.as_ref().is_some_and(|x| x != time) {
68-
for index in 0..self.pushers.len() {
69-
self.flush(index);
70-
}
135+
self.distributor.flush(time, &mut self.pushers);
71136
}
72137
self.current = Some(time.clone());
73138

74-
let hash_func = &mut self.hash_func;
75-
76-
// if the number of pushers is a power of two, use a mask
77-
if self.pushers.len().is_power_of_two() {
78-
let mask = (self.pushers.len() - 1) as u64;
79-
CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) & mask) as usize);
80-
}
81-
// as a last resort, use mod (%)
82-
else {
83-
let num_pushers = self.pushers.len() as u64;
84-
CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) % num_pushers) as usize);
85-
}
86-
for (buffer, pusher) in self.builders.iter_mut().zip(self.pushers.iter_mut()) {
87-
while let Some(container) = buffer.extract() {
88-
Message::push_at(container, time.clone(), pusher);
89-
}
90-
}
139+
self.distributor.partition(data, time, &mut self.pushers);
91140
}
92141
else {
93142
// flush
143+
if let Some(time) = self.current.as_ref() {
144+
self.distributor.flush(time, &mut self.pushers);
145+
}
146+
self.distributor.relax();
94147
for index in 0..self.pushers.len() {
95-
self.flush(index);
96-
self.builders[index].relax();
97148
self.pushers[index].push(&mut None);
98149
}
99150
}

0 commit comments

Comments
 (0)