Skip to content

Commit 31045e6

Browse files
committed
Pass number of peers at construction time
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent b392c87 commit 31045e6

File tree

2 files changed

+6
-10
lines changed

2 files changed

+6
-10
lines changed

timely/src/dataflow/channels/pact.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ where
9696
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
9797
let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
9898
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
99-
let distributor = DrainContainerDistributor::new(self.hash_func);
99+
let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers());
100100
(ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
101101
}
102102
}

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ pub struct DrainContainerDistributor<CB, H> {
3131
}
3232

3333
impl<CB: Default, H> DrainContainerDistributor<CB, H> {
34-
/// Constructs a new `DrainContainerDistributor` with the given hash function.
35-
pub fn new(hash_func: H) -> Self {
34+
/// Constructs a new `DrainContainerDistributor` with the given hash function for a number of
35+
/// peers.
36+
pub fn new(hash_func: H, peers: usize) -> Self {
3637
Self {
37-
builders: Vec::new(),
38+
builders: std::iter::repeat_with(Default::default).take(peers).collect(),
3839
hash_func,
3940
}
4041
}
@@ -46,12 +47,7 @@ where
4647
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64,
4748
{
4849
fn partition<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
49-
if self.builders.len() <= pushers.len() {
50-
self.builders.resize_with(pushers.len(), Default::default);
51-
}
52-
else {
53-
debug_assert_eq!(self.builders.len(), pushers.len());
54-
}
50+
debug_assert_eq!(self.builders.len(), pushers.len());
5551
if pushers.len().is_power_of_two() {
5652
let mask = (pushers.len() - 1) as u64;
5753
for datum in container.drain() {

0 commit comments

Comments
 (0)