Skip to content

Commit 9548bf9

Browse files
authored
updated consumed counts after capabilityrefs are dropped (#429)
CapabilityRefs are valid to exist as long as the data in the input are not marked as consumed. This change makes sure that this is the case by including an extra drop guard in the capability ref. Signed-off-by: Petros Angelatos <[email protected]> Signed-off-by: Petros Angelatos <[email protected]>
1 parent 13a1415 commit 9548bf9

File tree

3 files changed

+37
-20
lines changed

3 files changed

+37
-20
lines changed

timely/src/dataflow/channels/pullers/counter.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,36 @@ pub struct Counter<T: Ord+Clone+'static, D, P: Pull<BundleCore<T, D>>> {
1515
phantom: ::std::marker::PhantomData<D>,
1616
}
1717

18+
/// A guard type that updates the change batch counts on drop
19+
pub struct ConsumedGuard<'a, T: Ord + Clone + 'static> {
20+
consumed: &'a Rc<RefCell<ChangeBatch<T>>>,
21+
time: Option<T>,
22+
len: usize,
23+
}
24+
25+
impl<'a, T:Ord+Clone+'static> Drop for ConsumedGuard<'a, T> {
26+
fn drop(&mut self) {
27+
self.consumed.borrow_mut().update(self.time.take().unwrap(), self.len as i64);
28+
}
29+
}
30+
1831
impl<T:Ord+Clone+'static, D: Container, P: Pull<BundleCore<T, D>>> Counter<T, D, P> {
1932
/// Retrieves the next timestamp and batch of data.
2033
#[inline]
2134
pub fn next(&mut self) -> Option<&mut BundleCore<T, D>> {
35+
self.next_guarded().map(|(_guard, bundle)| bundle)
36+
}
37+
38+
#[inline]
39+
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<'_, T>, &mut BundleCore<T, D>)> {
2240
if let Some(message) = self.pullable.pull() {
2341
if message.data.len() > 0 {
24-
self.consumed.borrow_mut().update(message.time.clone(), message.data.len() as i64);
25-
Some(message)
42+
let guard = ConsumedGuard {
43+
consumed: &self.consumed,
44+
time: Some(message.time.clone()),
45+
len: message.data.len(),
46+
};
47+
Some((guard, message))
2648
}
2749
else { None }
2850
}

timely/src/dataflow/operators/capability.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::order::PartialOrder;
3030
use crate::progress::Timestamp;
3131
use crate::progress::ChangeBatch;
3232
use crate::scheduling::Activations;
33+
use crate::dataflow::channels::pullers::counter::ConsumedGuard;
3334

3435
/// An internal trait expressing the capability to send messages with a given timestamp.
3536
pub trait CapabilityTrait<T: Timestamp> {
@@ -231,6 +232,8 @@ type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
231232
pub struct CapabilityRef<'cap, T: Timestamp+'cap> {
232233
time: &'cap T,
233234
internal: CapabilityUpdates<T>,
235+
/// A drop guard that updates the consumed capability this CapabilityRef refers to on drop
236+
_consumed_guard: ConsumedGuard<'cap, T>,
234237
}
235238

236239
impl<'cap, T: Timestamp+'cap> CapabilityTrait<T> for CapabilityRef<'cap, T> {
@@ -244,10 +247,11 @@ impl<'cap, T: Timestamp+'cap> CapabilityTrait<T> for CapabilityRef<'cap, T> {
244247
impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> {
245248
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
246249
/// the provided [`ChangeBatch`].
247-
pub(crate) fn new(time: &'cap T, internal: CapabilityUpdates<T>) -> Self {
250+
pub(crate) fn new(time: &'cap T, internal: CapabilityUpdates<T>, guard: ConsumedGuard<'cap, T>) -> Self {
248251
CapabilityRef {
249252
time,
250253
internal,
254+
_consumed_guard: guard,
251255
}
252256
}
253257

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> InputHandleCore<
4949
#[inline]
5050
pub fn next(&mut self) -> Option<(CapabilityRef<T>, RefOrMut<D>)> {
5151
let internal = &self.internal;
52-
self.pull_counter.next().map(|bundle| {
52+
self.pull_counter.next_guarded().map(|(guard, bundle)| {
5353
match bundle.as_ref_or_mut() {
5454
RefOrMut::Ref(bundle) => {
55-
(CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Ref(&bundle.data))
55+
(CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Ref(&bundle.data))
5656
},
5757
RefOrMut::Mut(bundle) => {
58-
(CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Mut(&mut bundle.data))
58+
(CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Mut(&mut bundle.data))
5959
},
6060
}
6161
})
@@ -81,22 +81,13 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> InputHandleCore<
8181
/// ```
8282
#[inline]
8383
pub fn for_each<F: FnMut(CapabilityRef<T>, RefOrMut<D>)>(&mut self, mut logic: F) {
84-
// We inline `next()` so that we can use `self.logging` without cloning (and dropping) the logger.
85-
let internal = &self.internal;
86-
while let Some((cap, data)) = self.pull_counter.next().map(|bundle| {
87-
match bundle.as_ref_or_mut() {
88-
RefOrMut::Ref(bundle) => {
89-
(CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Ref(&bundle.data))
90-
},
91-
RefOrMut::Mut(bundle) => {
92-
(CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Mut(&mut bundle.data))
93-
},
94-
}
95-
}) {
96-
self.logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true }));
84+
let mut logging = self.logging.take();
85+
while let Some((cap, data)) = self.next() {
86+
logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true }));
9787
logic(cap, data);
98-
self.logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false }));
88+
logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false }));
9989
}
90+
self.logging = logging;
10091
}
10192

10293
}

0 commit comments

Comments
 (0)