Skip to content

Commit d941d0f

Browse files
committed
Rename Container to WithProgress
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 59680ec commit d941d0f

File tree

31 files changed

+123
-122
lines changed

31 files changed

+123
-122
lines changed

container/src/lib.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,29 @@
44

55
use std::collections::VecDeque;
66

7-
/// A container transferring data through dataflow edges
7+
/// An object with effects on progress
88
///
9-
/// A container stores a number of updates and thus is able to describe it count
9+
/// The object stores a number of updates and thus is able to describe it count
1010
/// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the
1111
/// update count is zero.
1212
///
13-
/// A container must implement default. The default implementation is not required to allocate
14-
/// memory for variable-length components.
13+
/// It must implement default for historic reason. The default implementation is not required
14+
/// to allocate memory for variable-length components.
15+
// TODO: Remove `Default` requirement in the future.
1516
// The container is `Default` because `CapacityContainerBuilder` only implements `ContainerBuilder`
1617
// for containers that implement `Default`, and we use the associated `::Container` all over Timely.
1718
// We can only access the type if all requirements for the `ContainerBuilder` implementation are
1819
// satisfied.
19-
pub trait Container: Default {
20-
/// The number of updates in this container
20+
pub trait WithProgress: Default {
21+
/// The number of updates
2122
///
2223
/// This number is used in progress tracking to confirm the receipt of some number
2324
/// of outstanding updates, and it is highly load bearing. The main restriction is
2425
/// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
2526
/// must preserve the number of items.
2627
fn update_count(&self) -> i64;
2728

28-
/// Determine if the container contains any updates, corresponding to `update_count() == 0`.
29+
/// Determine if this contains any updates, corresponding to `update_count() == 0`.
2930
#[inline] fn is_empty(&self) -> bool { self.update_count() == 0 }
3031
}
3132

@@ -51,7 +52,7 @@ pub trait DrainContainer {
5152
}
5253

5354
/// A container that can be sized and reveals its capacity.
54-
pub trait SizableContainer: Container {
55+
pub trait SizableContainer: WithProgress {
5556
/// Indicates that the container is "full" and should be shipped.
5657
fn at_capacity(&self) -> bool;
5758
/// Restores `self` to its desired capacity, if it has one.
@@ -95,7 +96,7 @@ pub trait ContainerBuilder: Default + 'static {
9596
/// The container type we're building.
9697
// The container is `Clone` because `Tee` requires it, otherwise we need to repeat it
9798
// all over Timely. `'static` because we don't want lifetimes everywhere.
98-
type Container: Container + Clone + 'static;
99+
type Container: WithProgress + Clone + 'static;
99100
/// Extract assembled containers, potentially leaving unfinished data behind. Can
100101
/// be called repeatedly, for example while the caller can send data.
101102
///
@@ -167,7 +168,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
167168
}
168169
}
169170

170-
impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
171+
impl<C: WithProgress + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
171172
type Container = C;
172173

173174
#[inline]
@@ -190,9 +191,9 @@ impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuild
190191
}
191192
}
192193

193-
impl<C: Container + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
194+
impl<C: WithProgress + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
194195

195-
impl<T> Container for Vec<T> {
196+
impl<T> WithProgress for Vec<T> {
196197
#[inline] fn update_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }
197198
#[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
198199
}
@@ -255,9 +256,9 @@ mod rc {
255256
use std::ops::Deref;
256257
use std::rc::Rc;
257258

258-
use crate::{Container, IterContainer, DrainContainer};
259+
use crate::{WithProgress, IterContainer, DrainContainer};
259260

260-
impl<T: Container> Container for Rc<T> {
261+
impl<T: WithProgress> WithProgress for Rc<T> {
261262
#[inline] fn update_count(&self) -> i64 { std::ops::Deref::deref(self).update_count() }
262263
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
263264
}
@@ -277,9 +278,9 @@ mod arc {
277278
use std::ops::Deref;
278279
use std::sync::Arc;
279280

280-
use crate::{Container, IterContainer, DrainContainer};
281+
use crate::{WithProgress, IterContainer, DrainContainer};
281282

282-
impl<T: Container> Container for Arc<T> {
283+
impl<T: WithProgress> WithProgress for Arc<T> {
283284
#[inline] fn update_count(&self) -> i64 { std::ops::Deref::deref(self).update_count() }
284285
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
285286
}

timely/examples/columnar.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ mod container {
176176
}
177177
}
178178

179-
impl<C: columnar::ContainerBytes> timely::Container for Column<C> {
179+
impl<C: columnar::ContainerBytes> timely::WithProgress for Column<C> {
180180
#[inline] fn update_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
181181
#[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() }
182182
}

timely/src/dataflow/channels/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use serde::{Deserialize, Serialize};
44
use crate::communication::Push;
5-
use crate::Container;
5+
use crate::WithProgress;
66

77
/// A collection of types that may be pushed at.
88
pub mod pushers;
@@ -32,7 +32,7 @@ impl<T, C> Message<T, C> {
3232
}
3333
}
3434

35-
impl<T, C: Container> Message<T, C> {
35+
impl<T, C: WithProgress> Message<T, C> {
3636
/// Creates a new message instance from arguments.
3737
pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
3838
Message { time, data, from, seq }

timely/src/dataflow/channels/pact.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use std::{fmt::{self, Debug}, marker::PhantomData};
1111
use std::rc::Rc;
1212

13-
use crate::Container;
13+
use crate::WithProgress;
1414
use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto};
1515
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
1616
use crate::communication::{Push, Pull};
@@ -35,7 +35,7 @@ pub trait ParallelizationContract<T, C> {
3535
#[derive(Debug)]
3636
pub struct Pipeline;
3737

38-
impl<T: 'static, C: Container + 'static> ParallelizationContract<T, C> for Pipeline {
38+
impl<T: 'static, C: WithProgress + 'static> ParallelizationContract<T, C> for Pipeline {
3939
type Pusher = LogPusher<ThreadPusher<Message<T, C>>>;
4040
type Puller = LogPuller<ThreadPuller<Message<T, C>>>;
4141
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
@@ -130,7 +130,7 @@ impl<P> LogPusher<P> {
130130
}
131131
}
132132

133-
impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
133+
impl<T, C: WithProgress, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
134134
#[inline]
135135
fn push(&mut self, pair: &mut Option<Message<T, C>>) {
136136
if let Some(bundle) = pair {
@@ -178,7 +178,7 @@ impl<P> LogPuller<P> {
178178
}
179179
}
180180

181-
impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
181+
impl<T, C: WithProgress, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
182182
#[inline]
183183
fn pull(&mut self) -> &mut Option<Message<T, C>> {
184184
let result = self.puller.pull();

timely/src/dataflow/channels/pullers/counter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::cell::RefCell;
66
use crate::dataflow::channels::Message;
77
use crate::progress::ChangeBatch;
88
use crate::communication::Pull;
9-
use crate::Container;
9+
use crate::WithProgress;
1010

1111
/// A wrapper which accounts records pulled past in a shared count map.
1212
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Message<T, C>>> {
@@ -36,7 +36,7 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
3636
}
3737
}
3838

39-
impl<T:Ord+Clone+'static, C: Container, P: Pull<Message<T, C>>> Counter<T, C, P> {
39+
impl<T:Ord+Clone+'static, C: WithProgress, P: Pull<Message<T, C>>> Counter<T, C, P> {
4040
/// Retrieves the next timestamp and batch of data.
4141
#[inline]
4242
pub fn next(&mut self) -> Option<&mut Message<T, C>> {

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
66
use crate::dataflow::channels::Message;
77
use crate::dataflow::operators::Capability;
88
use crate::progress::Timestamp;
9-
use crate::{Container, Data};
9+
use crate::{WithProgress, Data};
1010

1111
/// Buffers data sent at the same time, for efficient communication.
1212
///
@@ -44,7 +44,7 @@ impl<T, CB: Default, P> Buffer<T, CB, P> {
4444
}
4545
}
4646

47-
impl<T, C: Container + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
47+
impl<T, C: WithProgress + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
4848
/// Returns a `Session`, which accepts data to send at the associated time
4949
#[inline]
5050
pub fn session(&mut self, time: &T) -> Session<'_, T, CapacityContainerBuilder<C>, P> {

timely/src/dataflow/channels/pushers/counter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::cell::RefCell;
77
use crate::progress::{ChangeBatch, Timestamp};
88
use crate::dataflow::channels::Message;
99
use crate::communication::Push;
10-
use crate::Container;
10+
use crate::WithProgress;
1111

1212
/// A wrapper which updates shared `produced` based on the number of records pushed.
1313
#[derive(Debug)]
@@ -17,7 +17,7 @@ pub struct Counter<T, C, P: Push<Message<T, C>>> {
1717
phantom: PhantomData<C>,
1818
}
1919

20-
impl<T: Timestamp, C: Container, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
20+
impl<T: Timestamp, C: WithProgress, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
2121
#[inline]
2222
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2323
if let Some(message) = message {

timely/src/dataflow/channels/pushers/tee.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::rc::Rc;
77
use crate::dataflow::channels::Message;
88

99
use crate::communication::Push;
10-
use crate::{Container, Data};
10+
use crate::{WithProgress, Data};
1111

1212
type PushList<T, C> = Rc<RefCell<Vec<Box<dyn Push<Message<T, C>>>>>>;
1313

@@ -17,7 +17,7 @@ pub struct Tee<T, C> {
1717
shared: PushList<T, C>,
1818
}
1919

20-
impl<T: Data, C: Container + Data> Push<Message<T, C>> for Tee<T, C> {
20+
impl<T: Data, C: WithProgress + Data> Push<Message<T, C>> for Tee<T, C> {
2121
#[inline]
2222
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2323
let mut pushers = self.shared.borrow_mut();
@@ -39,7 +39,7 @@ impl<T: Data, C: Container + Data> Push<Message<T, C>> for Tee<T, C> {
3939
}
4040
}
4141

42-
impl<T, C: Container> Tee<T, C> {
42+
impl<T, C: WithProgress> Tee<T, C> {
4343
/// Allocates a new pair of `Tee` and `TeeHelper`.
4444
pub fn new() -> (Tee<T, C>, TeeHelper<T, C>) {
4545
let shared = Rc::new(RefCell::new(Vec::new()));
@@ -52,7 +52,7 @@ impl<T, C: Container> Tee<T, C> {
5252
}
5353
}
5454

55-
impl<T, C: Container> Clone for Tee<T, C> {
55+
impl<T, C: WithProgress> Clone for Tee<T, C> {
5656
fn clone(&self) -> Self {
5757
Self {
5858
buffer: Default::default(),

timely/src/dataflow/operators/branch.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use crate::dataflow::channels::pact::Pipeline;
44
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
55
use crate::dataflow::{Scope, Stream, StreamCore};
6-
use crate::{Container, Data};
6+
use crate::{WithProgress, Data};
77

88
/// Extension trait for `Stream`.
99
pub trait Branch<S: Scope, D: Data> {
@@ -93,7 +93,7 @@ pub trait BranchWhen<T>: Sized {
9393
fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
9494
}
9595

96-
impl<S: Scope, C: Container + Data> BranchWhen<S::Timestamp> for StreamCore<S, C> {
96+
impl<S: Scope, C: WithProgress + Data> BranchWhen<S::Timestamp> for StreamCore<S, C> {
9797
fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
9898
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
9999
builder.set_notify(false);

timely/src/dataflow/operators/core/capture/capture.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ use crate::dataflow::channels::pact::Pipeline;
1010
use crate::dataflow::channels::pullers::Counter as PullCounter;
1111
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
1212

13-
use crate::{Container, Data};
13+
use crate::{WithProgress, Data};
1414
use crate::progress::ChangeBatch;
1515
use crate::progress::Timestamp;
1616

1717
use super::{Event, EventPusher};
1818

1919
/// Capture a stream of timestamped data for later replay.
20-
pub trait Capture<T: Timestamp, C: Container + Data> {
20+
pub trait Capture<T: Timestamp, C: WithProgress + Data> {
2121
/// Captures a stream of timestamped data for later replay.
2222
///
2323
/// # Examples
@@ -113,7 +113,7 @@ pub trait Capture<T: Timestamp, C: Container + Data> {
113113
}
114114
}
115115

116-
impl<S: Scope, C: Container + Data> Capture<S::Timestamp, C> for StreamCore<S, C> {
116+
impl<S: Scope, C: WithProgress + Data> Capture<S::Timestamp, C> for StreamCore<S, C> {
117117
fn capture_into<P: EventPusher<S::Timestamp, C>+'static>(&self, mut event_pusher: P) {
118118

119119
let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());

0 commit comments

Comments
 (0)