Skip to content

Commit d3e47a9

Browse files
committed
Address feedback
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 391fc39 commit d3e47a9

File tree

1 file changed

+17
-14
lines changed

1 file changed

+17
-14
lines changed

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use crate::communication::Push;
44
use crate::container::{ContainerBuilder, PushInto};
55
use crate::dataflow::channels::Message;
6-
use crate::{Container, Data};
6+
use crate::Container;
77

88
/// Distribute containers to several pushers.
99
///
@@ -32,9 +32,9 @@ pub struct DrainContainerDistributor<CB, T, P, H> {
3232
}
3333

3434
impl<CB: Default, T, P, H> DrainContainerDistributor<CB, T, P, H> {
35-
/// Allocates a new `DrainContainerPartitioner` with the given pusher count and hash function.
35+
/// Constructs a new `DrainContainerDistributor` with the given hash function.
3636
pub fn new(hash_func: H) -> Self {
37-
DrainContainerDistributor {
37+
Self {
3838
builders: Vec::new(),
3939
hash_func,
4040
_phantom: std::marker::PhantomData,
@@ -98,11 +98,7 @@ pub struct Exchange<T, C, P, D> {
9898
_phantom: std::marker::PhantomData<C>,
9999
}
100100

101-
impl<T: Clone, C, P, D> Exchange<T, C, P, D>
102-
where
103-
P: Push<Message<T, C>>,
104-
D: Distributor<T, C, P>,
105-
{
101+
impl<T: Clone, C, P, D> Exchange<T, C, P, D> {
106102
/// Allocates a new `Exchange` from a supplied set of pushers and a distributor.
107103
pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, C, P, D> {
108104
Exchange {
@@ -114,7 +110,7 @@ where
114110
}
115111
}
116112

117-
impl<T: Eq+Data, C, P, D> Push<Message<T, C>> for Exchange<T, C, P, D>
113+
impl<T: Eq+Clone, C, P, D> Push<Message<T, C>> for Exchange<T, C, P, D>
118114
where
119115
P: Push<Message<T, C>>,
120116
D: Distributor<T, C, P>,
@@ -131,17 +127,24 @@ where
131127
let data = &mut message.data;
132128

133129
// if the time isn't right, flush everything.
134-
if self.current.as_ref().is_some_and(|x| x != time) {
135-
self.distributor.flush(time, &mut self.pushers);
130+
match self.current.as_ref() {
131+
// We have a current time, and it is different from the new time.
132+
Some(current_time) if current_time != time => {
133+
self.distributor.flush(current_time, &mut self.pushers);
134+
self.current = Some(time.clone());
135+
}
136+
// We had no time before, or flushed.
137+
None => self.current = Some(time.clone()),
138+
// Time didn't change since last call.
139+
_ => {}
136140
}
137-
self.current = Some(time.clone());
138141

139142
self.distributor.partition(data, time, &mut self.pushers);
140143
}
141144
else {
142145
// flush
143-
if let Some(time) = self.current.as_ref() {
144-
self.distributor.flush(time, &mut self.pushers);
146+
if let Some(time) = self.current.take() {
147+
self.distributor.flush(&time, &mut self.pushers);
145148
}
146149
self.distributor.relax();
147150
for index in 0..self.pushers.len() {

0 commit comments

Comments
 (0)