Skip to content

Commit d9c32a7

Browse files
committed
Record progress updates for direct container sends
1 parent 32c8048 commit d9c32a7

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

timely/src/dataflow/channels/pushers/counter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::rc::Rc;
44
use std::cell::RefCell;
55

6-
use crate::progress::{ChangeBatch, Timestamp};
6+
use crate::progress::ChangeBatch;
77
use crate::dataflow::channels::Message;
88
use crate::communication::Push;
99
use crate::Accountable;
@@ -15,7 +15,7 @@ pub struct Counter<T, P> {
1515
produced: Rc<RefCell<ChangeBatch<T>>>,
1616
}
1717

18-
impl<T: Timestamp, C: Accountable, P> Push<Message<T, C>> for Counter<T, P> where P: Push<Message<T, C>> {
18+
impl<T: Clone+Ord, C: Accountable, P> Push<Message<T, C>> for Counter<T, P> where P: Push<Message<T, C>> {
1919
#[inline]
2020
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2121
if let Some(message) = message {
@@ -48,6 +48,6 @@ impl<T, P> Counter<T, P> where T : Ord+Clone+'static {
4848
/// Ideally, users would not have direct access to a `Counter`, and preventing this is the way
4949
/// to uphold invariants.
5050
#[inline] pub fn give<C: crate::Container>(&mut self, time: T, container: &mut C) where P: Push<Message<T, C>> {
51-
if !container.is_empty() { Message::push_at(container, time, &mut self.pushee); }
51+
if !container.is_empty() { Message::push_at(container, time, self); }
5252
}
5353
}

0 commit comments

Comments
 (0)