Skip to content

Commit b411957

Browse files
authored
Remove Container: Clone + 'static (#540)
Remove the requirement that all Container implementations are Clone and 'static. This makes implementing types simpler that depend on Container, and requires us to explicitly mark various places as `Data` such that they comply with Timely's type requirements. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 1bc6ae8 commit b411957

File tree

25 files changed

+105
-88
lines changed

25 files changed

+105
-88
lines changed

container/src/columnation.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ mod container {
339339

340340
use crate::columnation::{Columnation, TimelyStack};
341341

342-
impl<T: Columnation + 'static> Container for TimelyStack<T> {
342+
impl<T: Columnation> Container for TimelyStack<T> {
343343
type ItemRef<'a> = &'a T where Self: 'a;
344344
type Item<'a> = &'a T where Self: 'a;
345345

@@ -355,20 +355,20 @@ mod container {
355355
TimelyStack::clear(self)
356356
}
357357

358-
type Iter<'a> = std::slice::Iter<'a, T>;
358+
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
359359

360360
fn iter(&self) -> Self::Iter<'_> {
361361
self.deref().iter()
362362
}
363363

364-
type DrainIter<'a> = std::slice::Iter<'a, T>;
364+
type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a;
365365

366366
fn drain(&mut self) -> Self::DrainIter<'_> {
367367
(*self).iter()
368368
}
369369
}
370370

371-
impl<T: Columnation + 'static> SizableContainer for TimelyStack<T> {
371+
impl<T: Columnation> SizableContainer for TimelyStack<T> {
372372
fn capacity(&self) -> usize {
373373
self.capacity()
374374
}

container/src/flatcontainer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
pub use flatcontainer::*;
44
use crate::{buffer, Container, SizableContainer, PushInto};
55

6-
impl<R: Region + Clone + 'static> Container for FlatStack<R> {
6+
impl<R: Region> Container for FlatStack<R> {
77
type ItemRef<'a> = R::ReadItem<'a> where Self: 'a;
88
type Item<'a> = R::ReadItem<'a> where Self: 'a;
99

@@ -15,20 +15,20 @@ impl<R: Region + Clone + 'static> Container for FlatStack<R> {
1515
self.clear()
1616
}
1717

18-
type Iter<'a> = <&'a Self as IntoIterator>::IntoIter;
18+
type Iter<'a> = <&'a Self as IntoIterator>::IntoIter where Self: 'a;
1919

2020
fn iter(&self) -> Self::Iter<'_> {
2121
IntoIterator::into_iter(self)
2222
}
2323

24-
type DrainIter<'a> = Self::Iter<'a>;
24+
type DrainIter<'a> = Self::Iter<'a> where Self: 'a;
2525

2626
fn drain(&mut self) -> Self::DrainIter<'_> {
2727
IntoIterator::into_iter(&*self)
2828
}
2929
}
3030

31-
impl<R: Region + Clone + 'static> SizableContainer for FlatStack<R> {
31+
impl<R: Region> SizableContainer for FlatStack<R> {
3232
fn capacity(&self) -> usize {
3333
self.capacity()
3434
}

container/src/lib.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ pub mod flatcontainer;
1818
/// We require the container to be cloneable to enable efficient copies when providing references
1919
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
2020
/// is efficient (which is not necessarily the case when deriving `Clone`.)
21-
/// TODO: Don't require `Container: Clone`
22-
pub trait Container: Default + Clone + 'static {
21+
pub trait Container: Default {
2322
/// The type of elements when reading non-destructively from the container.
2423
type ItemRef<'a> where Self: 'a;
2524

@@ -50,13 +49,13 @@ pub trait Container: Default + Clone + 'static {
5049
fn clear(&mut self);
5150

5251
/// Iterator type when reading from the container.
53-
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>>;
52+
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
5453

5554
/// Returns an iterator that reads the contents of this container.
5655
fn iter(&self) -> Self::Iter<'_>;
5756

5857
/// Iterator type when draining the container.
59-
type DrainIter<'a>: Iterator<Item=Self::Item<'a>>;
58+
type DrainIter<'a>: Iterator<Item=Self::Item<'a>> where Self: 'a;
6059

6160
/// Returns an iterator that drains the contents of this container.
6261
/// Drain leaves the container in an undefined state.
@@ -104,7 +103,7 @@ pub trait PushInto<T> {
104103
/// decide to represent a push order for `extract` and `finish`, or not.
105104
pub trait ContainerBuilder: Default + 'static {
106105
/// The container type we're building.
107-
type Container: Container;
106+
type Container: Container + Clone + 'static;
108107
/// Extract assembled containers, potentially leaving unfinished data behind. Can
109108
/// be called repeatedly, for example while the caller can send data.
110109
///
@@ -160,7 +159,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
160159
}
161160
}
162161

163-
impl<C: Container> ContainerBuilder for CapacityContainerBuilder<C> {
162+
impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
164163
type Container = C;
165164

166165
#[inline]
@@ -204,7 +203,7 @@ impl<C: Container> CapacityContainerBuilder<C> {
204203
}
205204
}
206205

207-
impl<T: Clone + 'static> Container for Vec<T> {
206+
impl<T> Container for Vec<T> {
208207
type ItemRef<'a> = &'a T where T: 'a;
209208
type Item<'a> = T where T: 'a;
210209

@@ -218,20 +217,20 @@ impl<T: Clone + 'static> Container for Vec<T> {
218217

219218
fn clear(&mut self) { Vec::clear(self) }
220219

221-
type Iter<'a> = std::slice::Iter<'a, T>;
220+
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
222221

223222
fn iter(&self) -> Self::Iter<'_> {
224223
self.as_slice().iter()
225224
}
226225

227-
type DrainIter<'a> = std::vec::Drain<'a, T>;
226+
type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
228227

229228
fn drain(&mut self) -> Self::DrainIter<'_> {
230229
self.drain(..)
231230
}
232231
}
233232

234-
impl<T: Clone + 'static> SizableContainer for Vec<T> {
233+
impl<T> SizableContainer for Vec<T> {
235234
fn capacity(&self) -> usize {
236235
self.capacity()
237236
}
@@ -294,13 +293,13 @@ mod rc {
294293
}
295294
}
296295

297-
type Iter<'a> = T::Iter<'a>;
296+
type Iter<'a> = T::Iter<'a> where Self: 'a;
298297

299298
fn iter(&self) -> Self::Iter<'_> {
300299
self.deref().iter()
301300
}
302301

303-
type DrainIter<'a> = T::Iter<'a>;
302+
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
304303

305304
fn drain(&mut self) -> Self::DrainIter<'_> {
306305
self.iter()
@@ -335,13 +334,13 @@ mod arc {
335334
}
336335
}
337336

338-
type Iter<'a> = T::Iter<'a>;
337+
type Iter<'a> = T::Iter<'a> where Self: 'a;
339338

340339
fn iter(&self) -> Self::Iter<'_> {
341340
self.deref().iter()
342341
}
343342

344-
type DrainIter<'a> = T::Iter<'a>;
343+
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
345344

346345
fn drain(&mut self) -> Self::DrainIter<'_> {
347346
self.iter()

timely/src/dataflow/channels/pact.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub trait ParallelizationContract<T, C> {
3535
#[derive(Debug)]
3636
pub struct Pipeline;
3737

38-
impl<T: 'static, C: Container> ParallelizationContract<T, C> for Pipeline {
38+
impl<T: 'static, C: Container + 'static> ParallelizationContract<T, C> for Pipeline {
3939
type Pusher = LogPusher<T, C, ThreadPusher<Message<T, C>>>;
4040
type Puller = LogPuller<T, C, ThreadPuller<Message<T, C>>>;
4141
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
66
use crate::dataflow::channels::Message;
77
use crate::dataflow::operators::Capability;
88
use crate::progress::Timestamp;
9-
use crate::Container;
9+
use crate::{Container, Data};
1010

1111
/// Buffers data sent at the same time, for efficient communication.
1212
///
@@ -44,7 +44,7 @@ impl<T, CB: Default, P> Buffer<T, CB, P> {
4444
}
4545
}
4646

47-
impl<T, C: Container, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
47+
impl<T, C: Container + Data, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
4848
/// Returns a `Session`, which accepts data to send at the associated time
4949
#[inline]
5050
pub fn session(&mut self, time: &T) -> Session<T, CapacityContainerBuilder<C>, P> {
@@ -133,7 +133,7 @@ pub struct Session<'a, T, CB, P> {
133133
buffer: &'a mut Buffer<T, CB, P>,
134134
}
135135

136-
impl<'a, T, C: Container, P> Session<'a, T, CapacityContainerBuilder<C>, P>
136+
impl<'a, T, C: Container + Data, P> Session<'a, T, CapacityContainerBuilder<C>, P>
137137
where
138138
T: Eq + Clone + 'a,
139139
P: Push<Message<T, C>> + 'a,

timely/src/dataflow/channels/pushers/tee.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub struct Tee<T, C> {
1717
shared: PushList<T, C>,
1818
}
1919

20-
impl<T: Data, C: Container> Push<Message<T, C>> for Tee<T, C> {
20+
impl<T: Data, C: Container + Data> Push<Message<T, C>> for Tee<T, C> {
2121
#[inline]
2222
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2323
let mut pushers = self.shared.borrow_mut();

timely/src/dataflow/operators/branch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub trait BranchWhen<T>: Sized {
9292
fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
9393
}
9494

95-
impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
95+
impl<S: Scope, C: Container + Data> BranchWhen<S::Timestamp> for StreamCore<S, C> {
9696
fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
9797
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
9898

timely/src/dataflow/operators/core/capture/capture.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ use crate::dataflow::channels::pact::Pipeline;
1010
use crate::dataflow::channels::pullers::Counter as PullCounter;
1111
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
1212

13-
use crate::Container;
13+
use crate::{Container, Data};
1414
use crate::progress::ChangeBatch;
1515
use crate::progress::Timestamp;
1616

1717
use super::{Event, EventPusher};
1818

1919
/// Capture a stream of timestamped data for later replay.
20-
pub trait Capture<T: Timestamp, C: Container> {
20+
pub trait Capture<T: Timestamp, C: Container + Data> {
2121
/// Captures a stream of timestamped data for later replay.
2222
///
2323
/// # Examples
@@ -113,7 +113,7 @@ pub trait Capture<T: Timestamp, C: Container> {
113113
}
114114
}
115115

116-
impl<S: Scope, C: Container> Capture<S::Timestamp, C> for StreamCore<S, C> {
116+
impl<S: Scope, C: Container + Data> Capture<S::Timestamp, C> for StreamCore<S, C> {
117117
fn capture_into<P: EventPusher<S::Timestamp, C>+'static>(&self, mut event_pusher: P) {
118118

119119
let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());

timely/src/dataflow/operators/core/capture/replay.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub trait Replay<T: Timestamp, C> : Sized {
6262
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>;
6363
}
6464

65-
impl<T: Timestamp, C: Container, I> Replay<T, C> for I
65+
impl<T: Timestamp, C: Container + Clone + 'static, I> Replay<T, C> for I
6666
where
6767
I : IntoIterator,
6868
<I as IntoIterator>::Item: EventIterator<T, C>+'static,

timely/src/dataflow/operators/core/concat.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Merges the contents of multiple streams.
22
33

4-
use crate::Container;
4+
use crate::{Container, Data};
55
use crate::dataflow::channels::pact::Pipeline;
66
use crate::dataflow::{StreamCore, Scope};
77

@@ -23,7 +23,7 @@ pub trait Concat<G: Scope, C: Container> {
2323
fn concat(&self, _: &StreamCore<G, C>) -> StreamCore<G, C>;
2424
}
2525

26-
impl<G: Scope, C: Container> Concat<G, C> for StreamCore<G, C> {
26+
impl<G: Scope, C: Container + Data> Concat<G, C> for StreamCore<G, C> {
2727
fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C> {
2828
self.scope().concatenate([self.clone(), other.clone()])
2929
}
@@ -52,7 +52,7 @@ pub trait Concatenate<G: Scope, C: Container> {
5252
I: IntoIterator<Item=StreamCore<G, C>>;
5353
}
5454

55-
impl<G: Scope, C: Container> Concatenate<G, C> for StreamCore<G, C> {
55+
impl<G: Scope, C: Container + Data> Concatenate<G, C> for StreamCore<G, C> {
5656
fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
5757
where
5858
I: IntoIterator<Item=StreamCore<G, C>>
@@ -62,7 +62,7 @@ impl<G: Scope, C: Container> Concatenate<G, C> for StreamCore<G, C> {
6262
}
6363
}
6464

65-
impl<G: Scope, C: Container> Concatenate<G, C> for G {
65+
impl<G: Scope, C: Container + Data> Concatenate<G, C> for G {
6666
fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
6767
where
6868
I: IntoIterator<Item=StreamCore<G, C>>

0 commit comments

Comments
 (0)