Skip to content

Commit 70a2837

Browse files
authored
Remove push counter's give (#723)
Remove the `give` function from `pushers::counters::Counter` as it is a thin wrapper around `Message::push` and caused confusing bugs in the past. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent c0cdc02 commit 70a2837

File tree

4 files changed

+5
-13
lines changed

4 files changed

+5
-13
lines changed

timely/src/dataflow/channels/pushers/buffer.rs

Whitespace-only changes.

timely/src/dataflow/channels/pushers/counter.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,4 @@ impl<T, P> Counter<T, P> where T : Ord+Clone+'static {
4242
pub fn produced(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
4343
&self.produced
4444
}
45-
/// Ships a time and a container.
46-
///
47-
/// This is not a validated capability, and this method should not be used without great care.
48-
/// Ideally, users would not have direct access to a `Counter`, and preventing this is the way
49-
/// to uphold invariants.
50-
#[inline] pub fn give<C: crate::Container>(&mut self, time: T, container: &mut C) where P: Push<Message<T, C>> {
51-
if !container.is_empty() { Message::push_at(container, time, self); }
52-
}
5345
}

timely/src/dataflow/operators/core/capture/replay.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::progress::Timestamp;
4646
use super::Event;
4747
use super::event::EventIterator;
4848
use crate::Container;
49+
use crate::dataflow::channels::Message;
4950

5051
/// Replay a capture stream into a scope with the same timestamp.
5152
pub trait Replay<T: Timestamp, C> : Sized {
@@ -99,14 +100,14 @@ where
99100
progress.internals[0].extend(vec.into_iter());
100101
},
101102
Owned(Event::Messages(time, mut data)) => {
102-
output.give(time.clone(), &mut data);
103+
Message::push_at(&mut data, time, &mut output);
103104
}
104105
Borrowed(Event::Progress(vec)) => {
105106
progress.internals[0].extend(vec.iter().cloned());
106107
},
107108
Borrowed(Event::Messages(time, data)) => {
108109
allocation.clone_from(data);
109-
output.give(time.clone(), &mut allocation);
110+
Message::push_at(&mut allocation, time.clone(), &mut output);
110111
}
111112
}
112113
}

timely/src/dataflow/operators/core/probe.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
1313

1414
use crate::dataflow::{StreamCore, Scope};
1515
use crate::Container;
16+
use crate::dataflow::channels::Message;
1617

1718
/// Monitors progress at a `Stream`.
1819
pub trait Probe<G: Scope, C: Container> {
@@ -112,9 +113,7 @@ impl<G: Scope, C: Container> Probe<G, C> for StreamCore<G, C> {
112113
}
113114

114115
while let Some(message) = input.next() {
115-
let time = &message.time;
116-
let data = &mut message.data;
117-
output.give(time.clone(), data);
116+
Message::push_at(&mut message.data, message.time.clone(), &mut output);
118117
}
119118
use timely_communication::Push;
120119
output.done();

0 commit comments

Comments
 (0)