Skip to content

Commit 153f3d8

Browse files
committed
Address feedback
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent e89e020 commit 153f3d8

File tree

10 files changed

+41
-41
lines changed

10 files changed

+41
-41
lines changed

container/src/lib.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,12 @@
44

55
use std::collections::VecDeque;
66

7-
/// An object with effects on progress
7+
/// An type containing a number of records accounted for by progress tracking.
88
///
99
/// The object stores a number of updates and thus is able to describe it count
1010
/// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the
1111
/// update count is zero.
12-
///
13-
/// It must implement default for historic reason. The default implementation is not required
14-
/// to allocate memory for variable-length components.
15-
pub trait WithProgress {
12+
pub trait Accountable {
1613
/// The number of records
1714
///
1815
/// This number is used in progress tracking to confirm the receipt of some number
@@ -22,12 +19,13 @@ pub trait WithProgress {
2219
fn record_count(&self) -> i64;
2320

2421
/// Determine if this contains any updates, corresponding to `update_count() == 0`.
22+
/// It is a correctness error for this to by anything other than `self.record_count() == 0`.
2523
#[inline] fn is_empty(&self) -> bool { self.record_count() == 0 }
2624
}
2725

28-
/// A container that allows iteration.
26+
/// A container that allows iteration morally equivalent to [`IntoIterator`].
2927
///
30-
/// Iterating the container presents items in an implmentation-specific order.
28+
/// Iterating the container presents items in an implementation-specific order.
3129
/// The container's contents are not changed.
3230
pub trait IterContainer {
3331
/// The type of elements when reading non-destructively from the container.
@@ -54,7 +52,7 @@ pub trait DrainContainer {
5452
}
5553

5654
/// A container that can be sized and reveals its capacity.
57-
pub trait SizableContainer: Sized {
55+
pub trait SizableContainer {
5856
/// Indicates that the container is "full" and should be shipped.
5957
fn at_capacity(&self) -> bool;
6058
/// Restores `self` to its desired capacity, if it has one.
@@ -65,7 +63,7 @@ pub trait SizableContainer: Sized {
6563
///
6664
/// Assume that the `stash` is in an undefined state, and properly clear it
6765
/// before re-using it.
68-
fn ensure_capacity(&mut self, stash: &mut Option<Self>);
66+
fn ensure_capacity(&mut self, stash: &mut Option<Self>) where Self: Sized;
6967
}
7068

7169
/// A container that can absorb items of a specific type.
@@ -90,8 +88,10 @@ pub trait PushInto<T> {
9088
///
9189
/// The caller should consume the containers returned by [`Self::extract`] and
9290
/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
93-
/// any remaining elements. It is up to the implementation of this trait to ensure that
94-
/// containers are properly cleared before recycling them.
91+
/// any remaining elements.
92+
///
93+
/// Implementations are allowed to re-use the contents of the mutable references left by the caller,
94+
/// but they should ensure that they clear the contents before doing so.
9595
///
9696
/// For example, a consolidating builder can aggregate differences in-place, but it has
9797
/// to ensure that it preserves the intended information.
@@ -102,7 +102,7 @@ pub trait ContainerBuilder: Default + 'static {
102102
/// The container type we're building.
103103
// The container is `Clone` because `Tee` requires it, otherwise we need to repeat it
104104
// all over Timely. `'static` because we don't want lifetimes everywhere.
105-
type Container: WithProgress + Default + Clone + 'static;
105+
type Container: Accountable + Default + Clone + 'static;
106106
/// Extract assembled containers, potentially leaving unfinished data behind. Can
107107
/// be called repeatedly, for example while the caller can send data.
108108
///
@@ -139,7 +139,7 @@ pub trait ContainerBuilder: Default + 'static {
139139
/// A wrapper trait indicating that the container building will preserve the number of records.
140140
///
141141
/// Specifically, the sum of record counts of all extracted and finished containers must equal the
142-
/// number of times that `push_into` is called on the container builder.
142+
/// number of accounted records that are pushed into the container builder.
143143
/// If you have any questions about this trait you are best off not implementing it.
144144
pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
145145

@@ -175,7 +175,7 @@ impl<T, C: SizableContainer + Default + PushInto<T>> PushInto<T> for CapacityCon
175175
}
176176
}
177177

178-
impl<C: WithProgress + Default + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
178+
impl<C: Accountable + Default + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
179179
type Container = C;
180180

181181
#[inline]
@@ -198,9 +198,9 @@ impl<C: WithProgress + Default + Clone + 'static> ContainerBuilder for CapacityC
198198
}
199199
}
200200

201-
impl<C: WithProgress + SizableContainer + Default + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
201+
impl<C: Accountable + SizableContainer + Default + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
202202

203-
impl<T> WithProgress for Vec<T> {
203+
impl<T> Accountable for Vec<T> {
204204
#[inline] fn record_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }
205205
#[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
206206
}
@@ -263,9 +263,9 @@ mod rc {
263263
use std::ops::Deref;
264264
use std::rc::Rc;
265265

266-
use crate::{WithProgress, IterContainer, DrainContainer};
266+
use crate::{IterContainer, DrainContainer};
267267

268-
impl<T: WithProgress> WithProgress for Rc<T> {
268+
impl<T: crate::Accountable> crate::Accountable for Rc<T> {
269269
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
270270
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
271271
}
@@ -285,9 +285,9 @@ mod arc {
285285
use std::ops::Deref;
286286
use std::sync::Arc;
287287

288-
use crate::{WithProgress, IterContainer, DrainContainer};
288+
use crate::{IterContainer, DrainContainer};
289289

290-
impl<T: WithProgress> WithProgress for Arc<T> {
290+
impl<T: crate::Accountable> crate::Accountable for Arc<T> {
291291
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
292292
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
293293
}

timely/examples/columnar.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ mod container {
176176
}
177177
}
178178

179-
impl<C: columnar::ContainerBytes> timely::WithProgress for Column<C> {
179+
impl<C: columnar::ContainerBytes> timely::Accountable for Column<C> {
180180
#[inline] fn record_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
181181
#[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() }
182182
}

timely/src/dataflow/channels/pact.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use std::{fmt::{self, Debug}, marker::PhantomData};
1111
use std::rc::Rc;
1212

13-
use crate::WithProgress;
13+
use crate::Accountable;
1414
use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto};
1515
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
1616
use crate::communication::{Push, Pull};
@@ -34,7 +34,7 @@ pub trait ParallelizationContract<T, C> {
3434
#[derive(Debug)]
3535
pub struct Pipeline;
3636

37-
impl<T: 'static, C: WithProgress + 'static> ParallelizationContract<T, C> for Pipeline {
37+
impl<T: 'static, C: Accountable + 'static> ParallelizationContract<T, C> for Pipeline {
3838
type Pusher = LogPusher<ThreadPusher<Message<T, C>>>;
3939
type Puller = LogPuller<ThreadPuller<Message<T, C>>>;
4040
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
@@ -129,7 +129,7 @@ impl<P> LogPusher<P> {
129129
}
130130
}
131131

132-
impl<T, C: WithProgress, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
132+
impl<T, C: Accountable, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
133133
#[inline]
134134
fn push(&mut self, pair: &mut Option<Message<T, C>>) {
135135
if let Some(bundle) = pair {
@@ -177,7 +177,7 @@ impl<P> LogPuller<P> {
177177
}
178178
}
179179

180-
impl<T, C: WithProgress, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
180+
impl<T, C: Accountable, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
181181
#[inline]
182182
fn pull(&mut self) -> &mut Option<Message<T, C>> {
183183
let result = self.puller.pull();

timely/src/dataflow/channels/pullers/counter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::cell::RefCell;
66
use crate::dataflow::channels::Message;
77
use crate::progress::ChangeBatch;
88
use crate::communication::Pull;
9-
use crate::WithProgress;
9+
use crate::Accountable;
1010

1111
/// A wrapper which accounts records pulled past in a shared count map.
1212
pub struct Counter<T, C, P> {
@@ -36,7 +36,7 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
3636
}
3737
}
3838

39-
impl<T:Ord+Clone+'static, C: WithProgress, P: Pull<Message<T, C>>> Counter<T, C, P> {
39+
impl<T:Ord+Clone+'static, C: Accountable, P: Pull<Message<T, C>>> Counter<T, C, P> {
4040
/// Retrieves the next timestamp and batch of data.
4141
#[inline]
4242
pub fn next(&mut self) -> Option<&mut Message<T, C>> {

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
66
use crate::dataflow::channels::Message;
77
use crate::dataflow::operators::Capability;
88
use crate::progress::Timestamp;
9-
use crate::{Container, WithProgress};
9+
use crate::{Container, Accountable};
1010

1111
/// Buffers data sent at the same time, for efficient communication.
1212
///

timely/src/dataflow/channels/pushers/counter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::cell::RefCell;
77
use crate::progress::{ChangeBatch, Timestamp};
88
use crate::dataflow::channels::Message;
99
use crate::communication::Push;
10-
use crate::WithProgress;
10+
use crate::Accountable;
1111

1212
/// A wrapper which updates shared `produced` based on the number of records pushed.
1313
#[derive(Debug)]
@@ -17,7 +17,7 @@ pub struct Counter<T, C, P: Push<Message<T, C>>> {
1717
phantom: PhantomData<C>,
1818
}
1919

20-
impl<T: Timestamp, C: WithProgress, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
20+
impl<T: Timestamp, C: Accountable, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
2121
#[inline]
2222
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2323
if let Some(message) = message {

timely/src/dataflow/operators/core/enterleave.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::logging::{TimelyLogger, MessagesEvent};
2626
use crate::progress::Timestamp;
2727
use crate::progress::timestamp::Refines;
2828
use crate::progress::{Source, Target};
29-
use crate::{WithProgress, Container};
29+
use crate::{Accountable, Container};
3030
use crate::communication::Push;
3131
use crate::dataflow::channels::pushers::{Counter, Tee};
3232
use crate::dataflow::channels::Message;
@@ -207,7 +207,7 @@ impl<P> LogPusher<P> {
207207

208208
impl<T, C, P> Push<Message<T, C>> for LogPusher<P>
209209
where
210-
C: WithProgress,
210+
C: Accountable,
211211
P: Push<Message<T, C>>,
212212
{
213213
fn push(&mut self, element: &mut Option<Message<T, C>>) {

timely/src/dataflow/operators/core/input.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::scheduling::{Schedule, Activator};
1010
use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
1111
use crate::progress::Source;
1212
use crate::progress::operate::Connectivity;
13-
use crate::{WithProgress, Container};
13+
use crate::{Accountable, Container};
1414
use crate::communication::Push;
1515
use crate::dataflow::{Scope, ScopeParent, StreamCore};
1616
use crate::dataflow::channels::pushers::{Tee, Counter};

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::dataflow::channels::pushers::Counter as PushCounter;
1515
use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
1616
use crate::dataflow::channels::Message;
1717
use crate::communication::{Push, Pull};
18-
use crate::{Container, WithProgress};
18+
use crate::{Container, Accountable};
1919
use crate::container::{ContainerBuilder, CapacityContainerBuilder};
2020

2121
use crate::dataflow::operators::InputCapability;
@@ -46,7 +46,7 @@ pub struct FrontieredInputHandleCore<'a, T: Timestamp, C: 'a, P: Pull<Message<T,
4646
/// Handle to an operator's input stream and frontier, specialized to vectors.
4747
pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec<D>, P>;
4848

49-
impl<T: Timestamp, C: WithProgress, P: Pull<Message<T, C>>> InputHandleCore<T, C, P> {
49+
impl<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>> InputHandleCore<T, C, P> {
5050

5151
/// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
5252
/// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
@@ -87,7 +87,7 @@ impl<T: Timestamp, C: WithProgress, P: Pull<Message<T, C>>> InputHandleCore<T, C
8787

8888
}
8989

90-
impl<'a, T: Timestamp, C: WithProgress, P: Pull<Message<T, C>>+'a> FrontieredInputHandleCore<'a, T, C, P> {
90+
impl<'a, T: Timestamp, C: Accountable, P: Pull<Message<T, C>>+'a> FrontieredInputHandleCore<'a, T, C, P> {
9191
/// Allocate a new frontiered input handle.
9292
pub fn new(handle: &'a mut InputHandleCore<T, C, P>, frontier: &'a MutableAntichain<T>) -> Self {
9393
FrontieredInputHandleCore {
@@ -134,13 +134,13 @@ impl<'a, T: Timestamp, C: WithProgress, P: Pull<Message<T, C>>+'a> FrontieredInp
134134
}
135135
}
136136

137-
pub fn _access_pull_counter<T: Timestamp, C: WithProgress, P: Pull<Message<T, C>>>(input: &mut InputHandleCore<T, C, P>) -> &mut PullCounter<T, C, P> {
137+
pub fn _access_pull_counter<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>>(input: &mut InputHandleCore<T, C, P>) -> &mut PullCounter<T, C, P> {
138138
&mut input.pull_counter
139139
}
140140

141141
/// Constructs an input handle.
142142
/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
143-
pub fn new_input_handle<T: Timestamp, C: WithProgress, P: Pull<Message<T, C>>>(
143+
pub fn new_input_handle<T: Timestamp, C: Accountable, P: Pull<Message<T, C>>>(
144144
pull_counter: PullCounter<T, C, P>,
145145
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
146146
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,

timely/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub use timely_communication::Config as CommunicationConfig;
6565
pub use worker::Config as WorkerConfig;
6666
pub use execute::Config as Config;
6767

68-
pub use timely_container::WithProgress;
68+
pub use timely_container::Accountable;
6969
/// Re-export of the `timely_container` crate.
7070
pub mod container {
7171
pub use timely_container::*;
@@ -107,8 +107,8 @@ impl<T: Clone+'static> Data for T { }
107107
/// A composite trait for types usable as containers in timely dataflow.
108108
///
109109
/// The `Container` trait is necessary for all containers in timely dataflow channels.
110-
pub trait Container: WithProgress + Default + Clone + 'static { }
111-
impl<C: WithProgress + Default + Clone + 'static> Container for C { }
110+
pub trait Container: Accountable + Default + Clone + 'static { }
111+
impl<C: Accountable + Default + Clone + 'static> Container for C { }
112112

113113
/// A composite trait for types usable on exchange channels in timely dataflow.
114114
///

0 commit comments

Comments
 (0)