Skip to content

Commit e89e020

Browse files
committed
Address my own feedback:
* Properly document where containers are left in an undefined state. * Change update count to record count. * Add some missing documentation and remove outdated comments Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent a8b41d9 commit e89e020

File tree

10 files changed

+37
-31
lines changed

10 files changed

+37
-31
lines changed

container/src/lib.rs

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,23 @@ use std::collections::VecDeque;
1212
///
1313
/// It must implement default for historic reason. The default implementation is not required
1414
/// to allocate memory for variable-length components.
15-
// TODO: Remove `Default` requirement in the future.
16-
// The container is `Default` because `CapacityContainerBuilder` only implements `ContainerBuilder`
17-
// for containers that implement `Default`, and we use the associated `::Container` all over Timely.
18-
// We can only access the type if all requirements for the `ContainerBuilder` implementation are
19-
// satisfied.
2015
pub trait WithProgress {
21-
/// The number of updates
16+
/// The number of records
2217
///
2318
/// This number is used in progress tracking to confirm the receipt of some number
24-
/// of outstanding updates, and it is highly load bearing. The main restriction is
19+
/// of outstanding records, and it is highly load bearing. The main restriction is
2520
/// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
26-
/// must preserve the number of items.
27-
fn update_count(&self) -> i64;
21+
/// must preserve the number of records.
22+
fn record_count(&self) -> i64;
2823

2924
/// Determine if this contains any updates, corresponding to `update_count() == 0`.
30-
#[inline] fn is_empty(&self) -> bool { self.update_count() == 0 }
25+
#[inline] fn is_empty(&self) -> bool { self.record_count() == 0 }
3126
}
3227

33-
/// TODO
28+
/// A container that allows iteration.
29+
///
30+
/// Iterating the container presents items in an implmentation-specific order.
31+
/// The container's contents are not changed.
3432
pub trait IterContainer {
3533
/// The type of elements when reading non-destructively from the container.
3634
type ItemRef<'a> where Self: 'a;
@@ -40,7 +38,11 @@ pub trait IterContainer {
4038
fn iter(&self) -> Self::Iter<'_>;
4139
}
4240

43-
/// TODO
41+
/// A container that can drain itself.
42+
///
43+
/// Draining the container presents items in an implementation-specific order.
44+
/// The container is in an undefined state after calling [`drain`]. Dropping
45+
/// the iterator also leaves the container in an undefined state.
4446
pub trait DrainContainer {
4547
/// The type of elements when draining the container.
4648
type Item<'a> where Self: 'a;
@@ -60,6 +62,9 @@ pub trait SizableContainer: Sized {
6062
/// The `stash` argument is available, and may have the intended capacity.
6163
/// However, it may be non-empty, and may be of the wrong capacity. The
6264
/// method should guard against these cases.
65+
///
66+
/// Assume that the `stash` is in an undefined state, and properly clear it
67+
/// before re-using it.
6368
fn ensure_capacity(&mut self, stash: &mut Option<Self>);
6469
}
6570

@@ -85,7 +90,8 @@ pub trait PushInto<T> {
8590
///
8691
/// The caller should consume the containers returned by [`Self::extract`] and
8792
/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
88-
/// any remaining elements.
93+
/// any remaining elements. It is up to the implementation of this trait to ensure that
94+
/// containers are properly cleared before recycling them.
8995
///
9096
/// For example, a consolidating builder can aggregate differences in-place, but it has
9197
/// to ensure that it preserves the intended information.
@@ -108,6 +114,7 @@ pub trait ContainerBuilder: Default + 'static {
108114
#[must_use]
109115
fn finish(&mut self) -> Option<&mut Self::Container>;
110116
/// Partitions `container` among `builders`, using the function `index` to direct items.
117+
/// Drains the container. The container is left in an undefined state.
111118
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
112119
where
113120
Self::Container: DrainContainer,
@@ -131,7 +138,7 @@ pub trait ContainerBuilder: Default + 'static {
131138

132139
/// A wrapper trait indicating that the container building will preserve the number of records.
133140
///
134-
/// Specifically, the sum of lengths of all extracted and finished containers must equal the
141+
/// Specifically, the sum of record counts of all extracted and finished containers must equal the
135142
/// number of times that `push_into` is called on the container builder.
136143
/// If you have any questions about this trait you are best off not implementing it.
137144
pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
@@ -194,7 +201,7 @@ impl<C: WithProgress + Default + Clone + 'static> ContainerBuilder for CapacityC
194201
impl<C: WithProgress + SizableContainer + Default + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
195202

196203
impl<T> WithProgress for Vec<T> {
197-
#[inline] fn update_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }
204+
#[inline] fn record_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }
198205
#[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
199206
}
200207

@@ -259,7 +266,7 @@ mod rc {
259266
use crate::{WithProgress, IterContainer, DrainContainer};
260267

261268
impl<T: WithProgress> WithProgress for Rc<T> {
262-
#[inline] fn update_count(&self) -> i64 { std::ops::Deref::deref(self).update_count() }
269+
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
263270
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
264271
}
265272
impl<T: IterContainer> IterContainer for Rc<T> {
@@ -281,7 +288,7 @@ mod arc {
281288
use crate::{WithProgress, IterContainer, DrainContainer};
282289

283290
impl<T: WithProgress> WithProgress for Arc<T> {
284-
#[inline] fn update_count(&self) -> i64 { std::ops::Deref::deref(self).update_count() }
291+
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
285292
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
286293
}
287294
impl<T: IterContainer> IterContainer for Arc<T> {

timely/examples/columnar.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ mod container {
177177
}
178178

179179
impl<C: columnar::ContainerBytes> timely::WithProgress for Column<C> {
180-
#[inline] fn update_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
180+
#[inline] fn record_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
181181
#[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() }
182182
}
183183
impl<C: columnar::ContainerBytes> timely::container::IterContainer for Column<C> {

timely/src/dataflow/channels/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl<T, C: Container> Message<T, C> {
3939
}
4040

4141
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
42-
/// leaves in place, or the container's default element. The buffer is cleared.
42+
/// leaves in place, or the container's default element. The buffer is left in an undefined state.
4343
#[inline]
4444
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
4545

timely/src/dataflow/channels/pact.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ impl<T, C: WithProgress, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPush
147147
source: self.source,
148148
target: self.target,
149149
seq_no: self.counter - 1,
150-
update_count: bundle.data.update_count(),
150+
record_count: bundle.data.record_count(),
151151
})
152152
}
153153
}
@@ -192,7 +192,7 @@ impl<T, C: WithProgress, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPull
192192
source: bundle.from,
193193
target,
194194
seq_no: bundle.seq,
195-
update_count: bundle.data.update_count(),
195+
record_count: bundle.data.record_count(),
196196
});
197197
}
198198
}

timely/src/dataflow/channels/pullers/counter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub struct Counter<T, C, P> {
1919
pub struct ConsumedGuard<T: Ord + Clone + 'static> {
2020
consumed: Rc<RefCell<ChangeBatch<T>>>,
2121
time: Option<T>,
22-
update_count: i64,
22+
record_count: i64,
2323
}
2424

2525
impl<T:Ord+Clone+'static> ConsumedGuard<T> {
@@ -32,7 +32,7 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
3232
fn drop(&mut self) {
3333
// SAFETY: we're in a Drop impl, so this runs at most once
3434
let time = self.time.take().unwrap();
35-
self.consumed.borrow_mut().update(time, self.update_count);
35+
self.consumed.borrow_mut().update(time, self.record_count);
3636
}
3737
}
3838

@@ -49,7 +49,7 @@ impl<T:Ord+Clone+'static, C: WithProgress, P: Pull<Message<T, C>>> Counter<T, C,
4949
let guard = ConsumedGuard {
5050
consumed: Rc::clone(&self.consumed),
5151
time: Some(message.time.clone()),
52-
update_count: message.data.update_count(),
52+
record_count: message.data.record_count(),
5353
};
5454
Some((guard, message))
5555
}

timely/src/dataflow/channels/pushers/counter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ impl<T: Timestamp, C: WithProgress, P> Push<Message<T, C>> for Counter<T, C, P>
2121
#[inline]
2222
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2323
if let Some(message) = message {
24-
self.produced.borrow_mut().update(message.time.clone(), message.data.update_count());
24+
self.produced.borrow_mut().update(message.time.clone(), message.data.record_count());
2525
}
2626

2727
// only propagate `None` if dirty (indicates flush)

timely/src/dataflow/operators/core/capture/extract.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,9 @@ pub trait Extract<T, C> {
4949
fn extract(self) -> Vec<(T, C)>;
5050
}
5151

52-
impl<T, C> Extract<T, C> for ::std::sync::mpsc::Receiver<Event<T, C>>
52+
impl<T: Ord, C: SizableContainer> Extract<T, C> for ::std::sync::mpsc::Receiver<Event<T, C>>
5353
where
54-
for<'a> C: Container + SizableContainer + DrainContainer<Item<'a>: Ord> + PushInto<C::Item<'a>>,
55-
T: Ord,
54+
for<'a> C: Container + DrainContainer<Item<'a>: Ord> + PushInto<C::Item<'a>>,
5655
{
5756
fn extract(self) -> Vec<(T, C)> {
5857
let mut staged = std::collections::BTreeMap::new();

timely/src/dataflow/operators/core/enterleave.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ where
218218
source: self.index,
219219
target: self.index,
220220
seq_no: self.counter,
221-
update_count: bundle.data.update_count(),
221+
record_count: bundle.data.record_count(),
222222
};
223223
let recv_event = MessagesEvent {
224224
is_send: false,

timely/src/dataflow/operators/core/input.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
374374

375375
/// Sends a container at each of the destinations. There can be more than one; clone if needed.
376376
/// Does not take `self` because `flush` and `extract` borrow `self` mutably.
377-
/// Clears the container.
377+
/// Leaves the container in an undefined state.
378378
// TODO: Find a better name for this function.
379379
#[inline]
380380
fn send_container(

timely/src/logging.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ pub struct MessagesEvent {
135135
/// Message sequence number.
136136
pub seq_no: usize,
137137
/// Number of typed records in the message.
138-
pub update_count: i64,
138+
pub record_count: i64,
139139
}
140140

141141
/// Records the starting and stopping of an operator.

0 commit comments

Comments
 (0)