Skip to content

Commit 59680ec

Browse files
committed
Rework Container::len to Container::update_count
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 13a2b43 commit 59680ec

File tree

7 files changed

+30
-42
lines changed

7 files changed

+30
-42
lines changed

container/src/lib.rs

Lines changed: 20 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,27 @@ use std::collections::VecDeque;
66

77
/// A container transferring data through dataflow edges
88
///
9-
/// A container stores a number of elements and thus is able to describe it length (`len()`) and
10-
/// whether it is empty (`is_empty()`).
9+
/// A container stores a number of updates and thus is able to describe it count
10+
/// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the
11+
/// update count is zero.
1112
///
1213
/// A container must implement default. The default implementation is not required to allocate
1314
/// memory for variable-length components.
14-
///
15-
/// We require the container to be cloneable to enable efficient copies when providing references
16-
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
17-
/// is efficient (which is not necessarily the case when deriving `Clone`.)
1815
// The container is `Default` because `CapacityContainerBuilder` only implements `ContainerBuilder`
1916
// for containers that implement `Default`, and we use the associated `::Container` all over Timely.
2017
// We can only access the type if all requirements for the `ContainerBuilder` implementation are
2118
// satisfied.
2219
pub trait Container: Default {
23-
/// The number of elements in this container
20+
/// The number of updates in this container
2421
///
2522
/// This number is used in progress tracking to confirm the receipt of some number
26-
/// of outstanding records, and it is highly load bearing. The main restriction is
23+
/// of outstanding updates, and it is highly load bearing. The main restriction is
2724
/// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
2825
/// must preserve the number of items.
29-
fn len(&self) -> usize;
26+
fn update_count(&self) -> i64;
3027

31-
/// Determine if the container contains any elements, corresponding to `len() == 0`.
32-
fn is_empty(&self) -> bool {
33-
self.len() == 0
34-
}
28+
/// Determine if the container contains any updates, corresponding to `update_count() == 0`.
29+
#[inline] fn is_empty(&self) -> bool { self.update_count() == 0 }
3530
}
3631

3732
/// TODO
@@ -198,26 +193,22 @@ impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuild
198193
impl<C: Container + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
199194

200195
impl<T> Container for Vec<T> {
201-
fn len(&self) -> usize {
202-
Vec::len(self)
203-
}
204-
fn is_empty(&self) -> bool {
205-
Vec::is_empty(self)
206-
}
196+
#[inline] fn update_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }
197+
#[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
207198
}
208199

209200
impl<T> IterContainer for Vec<T> {
210201
type ItemRef<'a> = &'a T where T: 'a;
211202
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
212-
fn iter(&self) -> Self::Iter<'_> {
203+
#[inline] fn iter(&self) -> Self::Iter<'_> {
213204
self.as_slice().iter()
214205
}
215206
}
216207

217208
impl<T> DrainContainer for Vec<T> {
218209
type Item<'a> = T where T: 'a;
219210
type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
220-
fn drain(&mut self) -> Self::DrainIter<'_> {
211+
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> {
221212
self.drain(..)
222213
}
223214
}
@@ -267,22 +258,18 @@ mod rc {
267258
use crate::{Container, IterContainer, DrainContainer};
268259

269260
impl<T: Container> Container for Rc<T> {
270-
fn len(&self) -> usize {
271-
std::ops::Deref::deref(self).len()
272-
}
273-
fn is_empty(&self) -> bool {
274-
std::ops::Deref::deref(self).is_empty()
275-
}
261+
#[inline] fn update_count(&self) -> i64 { std::ops::Deref::deref(self).update_count() }
262+
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
276263
}
277264
impl<T: IterContainer> IterContainer for Rc<T> {
278265
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
279266
type Iter<'a> = T::Iter<'a> where Self: 'a;
280-
fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
267+
#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
281268
}
282269
impl<T: IterContainer> DrainContainer for Rc<T> {
283270
type Item<'a> = T::ItemRef<'a> where Self: 'a;
284271
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
285-
fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
272+
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
286273
}
287274
}
288275

@@ -293,18 +280,18 @@ mod arc {
293280
use crate::{Container, IterContainer, DrainContainer};
294281

295282
impl<T: Container> Container for Arc<T> {
296-
fn len(&self) -> usize { std::ops::Deref::deref(self).len() }
297-
fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
283+
#[inline] fn update_count(&self) -> i64 { std::ops::Deref::deref(self).update_count() }
284+
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
298285
}
299286
impl<T: IterContainer> IterContainer for Arc<T> {
300287
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
301288
type Iter<'a> = T::Iter<'a> where Self: 'a;
302-
fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
289+
#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
303290
}
304291
impl<T: IterContainer> DrainContainer for Arc<T> {
305292
type Item<'a> = T::ItemRef<'a> where Self: 'a;
306293
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
307-
fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
294+
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
308295
}
309296
}
310297

timely/examples/columnar.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ mod container {
177177
}
178178

179179
impl<C: columnar::ContainerBytes> timely::Container for Column<C> {
180-
fn len(&self) -> usize { self.borrow().len() }
180+
#[inline] fn update_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
181+
#[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() }
181182
}
182183
impl<C: columnar::ContainerBytes> timely::container::IterContainer for Column<C> {
183184
type ItemRef<'a> = C::Ref<'a>;

timely/src/dataflow/channels/pact.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<
148148
source: self.source,
149149
target: self.target,
150150
seq_no: self.counter - 1,
151-
length: bundle.data.len(),
151+
update_count: bundle.data.update_count(),
152152
})
153153
}
154154
}
@@ -193,7 +193,7 @@ impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<
193193
source: bundle.from,
194194
target,
195195
seq_no: bundle.seq,
196-
length: bundle.data.len(),
196+
update_count: bundle.data.update_count(),
197197
});
198198
}
199199
}

timely/src/dataflow/channels/pullers/counter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Message<T, C>>> {
1919
pub struct ConsumedGuard<T: Ord + Clone + 'static> {
2020
consumed: Rc<RefCell<ChangeBatch<T>>>,
2121
time: Option<T>,
22-
len: usize,
22+
update_count: i64,
2323
}
2424

2525
impl<T:Ord+Clone+'static> ConsumedGuard<T> {
@@ -32,7 +32,7 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
3232
fn drop(&mut self) {
3333
// SAFETY: we're in a Drop impl, so this runs at most once
3434
let time = self.time.take().unwrap();
35-
self.consumed.borrow_mut().update(time, self.len as i64);
35+
self.consumed.borrow_mut().update(time, self.update_count);
3636
}
3737
}
3838

@@ -49,7 +49,7 @@ impl<T:Ord+Clone+'static, C: Container, P: Pull<Message<T, C>>> Counter<T, C, P>
4949
let guard = ConsumedGuard {
5050
consumed: Rc::clone(&self.consumed),
5151
time: Some(message.time.clone()),
52-
len: message.data.len(),
52+
update_count: message.data.update_count(),
5353
};
5454
Some((guard, message))
5555
}

timely/src/dataflow/channels/pushers/counter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ impl<T: Timestamp, C: Container, P> Push<Message<T, C>> for Counter<T, C, P> whe
2121
#[inline]
2222
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2323
if let Some(message) = message {
24-
self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64);
24+
self.produced.borrow_mut().update(message.time.clone(), message.data.update_count());
2525
}
2626

2727
// only propagate `None` if dirty (indicates flush)

timely/src/dataflow/operators/core/enterleave.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ where
218218
source: self.index,
219219
target: self.index,
220220
seq_no: self.counter,
221-
length: bundle.data.len(),
221+
update_count: bundle.data.update_count(),
222222
};
223223
let recv_event = MessagesEvent {
224224
is_send: false,

timely/src/logging.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ pub struct MessagesEvent {
135135
/// Message sequence number.
136136
pub seq_no: usize,
137137
/// Number of typed records in the message.
138-
pub length: usize,
138+
pub update_count: i64,
139139
}
140140

141141
/// Records the starting and stopping of an operator.

0 commit comments

Comments
 (0)