Skip to content

Commit 711258b

Browse files
committed
Bring back Container trait
It's a composite trait requiring WithProgress, Default, Clone, 'static Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent d941d0f commit 711258b

32 files changed

+125
-121
lines changed

container/src/lib.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::collections::VecDeque;
1717
// for containers that implement `Default`, and we use the associated `::Container` all over Timely.
1818
// We can only access the type if all requirements for the `ContainerBuilder` implementation are
1919
// satisfied.
20-
pub trait WithProgress: Default {
20+
pub trait WithProgress {
2121
/// The number of updates
2222
///
2323
/// This number is used in progress tracking to confirm the receipt of some number
@@ -52,7 +52,7 @@ pub trait DrainContainer {
5252
}
5353

5454
/// A container that can be sized and reveals its capacity.
55-
pub trait SizableContainer: WithProgress {
55+
pub trait SizableContainer: Sized {
5656
/// Indicates that the container is "full" and should be shipped.
5757
fn at_capacity(&self) -> bool;
5858
/// Restores `self` to its desired capacity, if it has one.
@@ -96,7 +96,7 @@ pub trait ContainerBuilder: Default + 'static {
9696
/// The container type we're building.
9797
// The container is `Clone` because `Tee` requires it, otherwise we need to repeat it
9898
// all over Timely. `'static` because we don't want lifetimes everywhere.
99-
type Container: WithProgress + Clone + 'static;
99+
type Container: WithProgress + Default + Clone + 'static;
100100
/// Extract assembled containers, potentially leaving unfinished data behind. Can
101101
/// be called repeatedly, for example while the caller can send data.
102102
///
@@ -152,7 +152,7 @@ pub struct CapacityContainerBuilder<C>{
152152
pending: VecDeque<C>,
153153
}
154154

155-
impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
155+
impl<T, C: SizableContainer + Default + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
156156
#[inline]
157157
fn push_into(&mut self, item: T) {
158158
// Ensure capacity
@@ -168,7 +168,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
168168
}
169169
}
170170

171-
impl<C: WithProgress + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
171+
impl<C: WithProgress + Default + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
172172
type Container = C;
173173

174174
#[inline]
@@ -191,7 +191,7 @@ impl<C: WithProgress + Clone + 'static> ContainerBuilder for CapacityContainerBu
191191
}
192192
}
193193

194-
impl<C: WithProgress + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
194+
impl<C: WithProgress + SizableContainer + Default + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
195195

196196
impl<T> WithProgress for Vec<T> {
197197
#[inline] fn update_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }

timely/src/dataflow/channels/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use serde::{Deserialize, Serialize};
44
use crate::communication::Push;
5-
use crate::WithProgress;
5+
use crate::Container;
66

77
/// A collection of types that may be pushed at.
88
pub mod pushers;
@@ -32,7 +32,7 @@ impl<T, C> Message<T, C> {
3232
}
3333
}
3434

35-
impl<T, C: WithProgress> Message<T, C> {
35+
impl<T, C: Container> Message<T, C> {
3636
/// Creates a new message instance from arguments.
3737
pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
3838
Message { time, data, from, seq }

timely/src/dataflow/channels/pact.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use crate::dataflow::channels::Message;
1919
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
2020
use crate::progress::Timestamp;
2121
use crate::worker::AsWorker;
22-
use crate::Data;
2322

2423
/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
2524
pub trait ParallelizationContract<T, C> {
@@ -86,7 +85,7 @@ where
8685
CB: ContainerBuilder,
8786
CB::Container: DrainContainer,
8887
CB: for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
89-
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
88+
CB::Container: Send + crate::dataflow::channels::ContainerBytes,
9089
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64
9190
{
9291
type Pusher = ExchangePusher<T, CB, LogPusher<Box<dyn Push<Message<T, CB::Container>>>>, H>;

timely/src/dataflow/channels/pullers/counter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::communication::Pull;
99
use crate::WithProgress;
1010

1111
/// A wrapper which accounts records pulled past in a shared count map.
12-
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Message<T, C>>> {
12+
pub struct Counter<T, C, P> {
1313
pullable: P,
1414
consumed: Rc<RefCell<ChangeBatch<T>>>,
1515
phantom: ::std::marker::PhantomData<C>,

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
66
use crate::dataflow::channels::Message;
77
use crate::dataflow::operators::Capability;
88
use crate::progress::Timestamp;
9-
use crate::{WithProgress, Data};
9+
use crate::{Container, WithProgress};
1010

1111
/// Buffers data sent at the same time, for efficient communication.
1212
///
@@ -44,7 +44,7 @@ impl<T, CB: Default, P> Buffer<T, CB, P> {
4444
}
4545
}
4646

47-
impl<T, C: WithProgress + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
47+
impl<T, C: Container, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
4848
/// Returns a `Session`, which accepts data to send at the associated time
4949
#[inline]
5050
pub fn session(&mut self, time: &T) -> Session<'_, T, CapacityContainerBuilder<C>, P> {

timely/src/dataflow/channels/pushers/tee.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::rc::Rc;
77
use crate::dataflow::channels::Message;
88

99
use crate::communication::Push;
10-
use crate::{WithProgress, Data};
10+
use crate::{Container, Data};
1111

1212
type PushList<T, C> = Rc<RefCell<Vec<Box<dyn Push<Message<T, C>>>>>>;
1313

@@ -17,7 +17,7 @@ pub struct Tee<T, C> {
1717
shared: PushList<T, C>,
1818
}
1919

20-
impl<T: Data, C: WithProgress + Data> Push<Message<T, C>> for Tee<T, C> {
20+
impl<T: Data, C: Container> Push<Message<T, C>> for Tee<T, C> {
2121
#[inline]
2222
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2323
let mut pushers = self.shared.borrow_mut();
@@ -32,14 +32,14 @@ impl<T: Data, C: WithProgress + Data> Push<Message<T, C>> for Tee<T, C> {
3232
pushers[index-1].push(&mut None);
3333
}
3434
}
35-
if pushers.len() > 0 {
35+
if !pushers.is_empty() {
3636
let last = pushers.len() - 1;
3737
pushers[last].push(message);
3838
}
3939
}
4040
}
4141

42-
impl<T, C: WithProgress> Tee<T, C> {
42+
impl<T, C: Default> Tee<T, C> {
4343
/// Allocates a new pair of `Tee` and `TeeHelper`.
4444
pub fn new() -> (Tee<T, C>, TeeHelper<T, C>) {
4545
let shared = Rc::new(RefCell::new(Vec::new()));
@@ -52,7 +52,7 @@ impl<T, C: WithProgress> Tee<T, C> {
5252
}
5353
}
5454

55-
impl<T, C: WithProgress> Clone for Tee<T, C> {
55+
impl<T, C: Default> Clone for Tee<T, C> {
5656
fn clone(&self) -> Self {
5757
Self {
5858
buffer: Default::default(),

timely/src/dataflow/operators/branch.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use crate::dataflow::channels::pact::Pipeline;
44
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
55
use crate::dataflow::{Scope, Stream, StreamCore};
6-
use crate::{WithProgress, Data};
6+
use crate::{Container, Data};
77

88
/// Extension trait for `Stream`.
99
pub trait Branch<S: Scope, D: Data> {
@@ -93,7 +93,7 @@ pub trait BranchWhen<T>: Sized {
9393
fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
9494
}
9595

96-
impl<S: Scope, C: WithProgress + Data> BranchWhen<S::Timestamp> for StreamCore<S, C> {
96+
impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
9797
fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
9898
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
9999
builder.set_notify(false);

timely/src/dataflow/operators/core/capture/capture.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ use crate::dataflow::channels::pact::Pipeline;
1010
use crate::dataflow::channels::pullers::Counter as PullCounter;
1111
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
1212

13-
use crate::{WithProgress, Data};
13+
use crate::Container;
1414
use crate::progress::ChangeBatch;
1515
use crate::progress::Timestamp;
1616

1717
use super::{Event, EventPusher};
1818

1919
/// Capture a stream of timestamped data for later replay.
20-
pub trait Capture<T: Timestamp, C: WithProgress + Data> {
20+
pub trait Capture<T: Timestamp, C: Container> {
2121
/// Captures a stream of timestamped data for later replay.
2222
///
2323
/// # Examples
@@ -113,7 +113,7 @@ pub trait Capture<T: Timestamp, C: WithProgress + Data> {
113113
}
114114
}
115115

116-
impl<S: Scope, C: WithProgress + Data> Capture<S::Timestamp, C> for StreamCore<S, C> {
116+
impl<S: Scope, C: Container> Capture<S::Timestamp, C> for StreamCore<S, C> {
117117
fn capture_into<P: EventPusher<S::Timestamp, C>+'static>(&self, mut event_pusher: P) {
118118

119119
let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());

timely/src/dataflow/operators/core/capture/extract.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Traits and types for extracting captured timely dataflow streams.
22
33
use super::Event;
4+
use crate::Container;
45
use crate::container::{SizableContainer, DrainContainer, PushInto};
56

67
/// Supports extracting a sequence of timestamp and data.
@@ -50,7 +51,7 @@ pub trait Extract<T, C> {
5051

5152
impl<T, C> Extract<T, C> for ::std::sync::mpsc::Receiver<Event<T, C>>
5253
where
53-
for<'a> C: SizableContainer + DrainContainer<Item<'a>: Ord> + PushInto<C::Item<'a>>,
54+
for<'a> C: Container + SizableContainer + DrainContainer<Item<'a>: Ord> + PushInto<C::Item<'a>>,
5455
T: Ord,
5556
{
5657
fn extract(self) -> Vec<(T, C)> {

timely/src/dataflow/operators/core/capture/replay.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use crate::progress::Timestamp;
4646

4747
use super::Event;
4848
use super::event::EventIterator;
49-
use crate::WithProgress;
49+
use crate::Container;
5050

5151
/// Replay a capture stream into a scope with the same timestamp.
5252
pub trait Replay<T: Timestamp, C> : Sized {
@@ -62,7 +62,7 @@ pub trait Replay<T: Timestamp, C> : Sized {
6262
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>;
6363
}
6464

65-
impl<T: Timestamp, C: WithProgress + Clone + 'static, I> Replay<T, C> for I
65+
impl<T: Timestamp, C: Container, I> Replay<T, C> for I
6666
where
6767
I : IntoIterator,
6868
<I as IntoIterator>::Item: EventIterator<T, C>+'static,

0 commit comments

Comments
 (0)