Skip to content

Commit eff40d5

Browse files
Merge pull request #668 from antiguru/exchange_flush
Optionally flush container builders
2 parents 7da5f68 + 8059744 commit eff40d5

File tree

4 files changed

+28
-8
lines changed

4 files changed

+28
-8
lines changed

container/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,14 @@ pub trait ContainerBuilder: Default + 'static {
125125
}
126126
container.clear();
127127
}
128+
129+
/// Indicates a good moment to release resources.
130+
///
131+
/// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`]
132+
/// before calling this function. The implementation should not change the contents of the
133+
/// builder.
134+
#[inline]
135+
fn relax(&mut self) { }
128136
}
129137

130138
/// A wrapper trait indicating that the container building will preserve the number of records.

timely/examples/columnar.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ mod builder {
300300
}
301301

302302
impl<C: Columnar> Default for ColumnBuilder<C> {
303+
#[inline]
303304
fn default() -> Self {
304305
ColumnBuilder {
305306
current: Default::default(),
@@ -331,6 +332,15 @@ mod builder {
331332
self.empty = self.pending.pop_front();
332333
self.empty.as_mut()
333334
}
335+
336+
#[inline]
337+
fn relax(&mut self) {
338+
// The caller is responsible for draining all contents; assert that we are empty.
339+
// The assertion is not strictly necessary, but it helps catch bugs.
340+
assert!(self.current.is_empty());
341+
assert!(self.pending.is_empty());
342+
*self = Self::default();
343+
}
334344
}
335345

336346
impl<C: Columnar> LengthPreservingContainerBuilder for ColumnBuilder<C> where C::Container: Clone { }

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB,
8181
/// Flushes all data and pushes a `None` to `self.pusher`, indicating a flush.
8282
pub fn cease(&mut self) {
8383
self.flush();
84+
self.builder.relax();
8485
self.pusher.push(&mut None);
8586
}
8687

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ where
1414
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
1515
{
1616
pushers: Vec<P>,
17-
buffers: Vec<CB>,
17+
builders: Vec<CB>,
1818
current: Option<T>,
1919
hash_func: H,
2020
}
@@ -27,20 +27,20 @@ where
2727
{
2828
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
2929
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, CB, P, H> {
30-
let mut buffers = vec![];
30+
let mut builders = vec![];
3131
for _ in 0..pushers.len() {
32-
buffers.push(Default::default());
32+
builders.push(Default::default());
3333
}
3434
Exchange {
3535
pushers,
3636
hash_func: key,
37-
buffers,
37+
builders,
3838
current: None,
3939
}
4040
}
4141
#[inline]
4242
fn flush(&mut self, index: usize) {
43-
while let Some(container) = self.buffers[index].finish() {
43+
while let Some(container) = self.builders[index].finish() {
4444
if let Some(ref time) = self.current {
4545
Message::push_at(container, time.clone(), &mut self.pushers[index]);
4646
}
@@ -79,14 +79,14 @@ where
7979
// if the number of pushers is a power of two, use a mask
8080
if self.pushers.len().is_power_of_two() {
8181
let mask = (self.pushers.len() - 1) as u64;
82-
CB::partition(data, &mut self.buffers, |datum| ((hash_func)(datum) & mask) as usize);
82+
CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) & mask) as usize);
8383
}
8484
// as a last resort, use mod (%)
8585
else {
8686
let num_pushers = self.pushers.len() as u64;
87-
CB::partition(data, &mut self.buffers, |datum| ((hash_func)(datum) % num_pushers) as usize);
87+
CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) % num_pushers) as usize);
8888
}
89-
for (buffer, pusher) in self.buffers.iter_mut().zip(self.pushers.iter_mut()) {
89+
for (buffer, pusher) in self.builders.iter_mut().zip(self.pushers.iter_mut()) {
9090
while let Some(container) = buffer.extract() {
9191
Message::push_at(container, time.clone(), pusher);
9292
}
@@ -96,6 +96,7 @@ where
9696
// flush
9797
for index in 0..self.pushers.len() {
9898
self.flush(index);
99+
self.builders[index].relax();
99100
self.pushers[index].push(&mut None);
100101
}
101102
}

0 commit comments

Comments
 (0)