Skip to content

Commit 5b44c22

Browse files
committed
Remove Container: Clone + 'static
Remove the requirement that all Container implementations are Clone and 'static. This makes implementing types simpler that depend on Container, and requires us to explicitly mark various places as `Data` such that they comply with Timely's type requirements. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 819135e commit 5b44c22

File tree

25 files changed

+126
-109
lines changed

25 files changed

+126
-109
lines changed

container/src/columnation.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ mod container {
314314

315315
use crate::columnation::{Columnation, TimelyStack};
316316

317-
impl<T: Columnation + 'static> Container for TimelyStack<T> {
317+
impl<T: Columnation> Container for TimelyStack<T> {
318318
type ItemRef<'a> = &'a T where Self: 'a;
319319
type Item<'a> = &'a T where Self: 'a;
320320

@@ -330,13 +330,13 @@ mod container {
330330
TimelyStack::clear(self)
331331
}
332332

333-
type Iter<'a> = std::slice::Iter<'a, T>;
333+
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
334334

335335
fn iter(&self) -> Self::Iter<'_> {
336336
self.deref().iter()
337337
}
338338

339-
type DrainIter<'a> = std::slice::Iter<'a, T>;
339+
type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a;
340340

341341
fn drain(&mut self) -> Self::DrainIter<'_> {
342342
(*self).iter()

container/src/lib.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ pub mod flatcontainer;
1616
/// We require the container to be cloneable to enable efficient copies when providing references
1717
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
1818
/// is efficient (which is not necessarily the case when deriving `Clone`.)
19-
/// TODO: Don't require `Container: Clone`
20-
pub trait Container: Default + Clone + 'static {
19+
pub trait Container: Default {
2120
/// The type of elements when reading non-destructively from the container.
2221
type ItemRef<'a> where Self: 'a;
2322

@@ -42,13 +41,13 @@ pub trait Container: Default + Clone + 'static {
4241
fn clear(&mut self);
4342

4443
/// Iterator type when reading from the container.
45-
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>>;
44+
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
4645

4746
/// Returns an iterator that reads the contents of this container.
4847
fn iter(&self) -> Self::Iter<'_>;
4948

5049
/// Iterator type when draining the container.
51-
type DrainIter<'a>: Iterator<Item=Self::Item<'a>>;
50+
type DrainIter<'a>: Iterator<Item=Self::Item<'a>> where Self: 'a;
5251

5352
/// Returns an iterator that drains the contents of this container.
5453
/// Drain leaves the container in an undefined state.
@@ -83,7 +82,7 @@ pub trait PushContainer: Container {
8382
fn reserve(&mut self, additional: usize);
8483
}
8584

86-
impl<T: Clone + 'static> Container for Vec<T> {
85+
impl<T> Container for Vec<T> {
8786
type ItemRef<'a> = &'a T where T: 'a;
8887
type Item<'a> = T where T: 'a;
8988

@@ -97,13 +96,13 @@ impl<T: Clone + 'static> Container for Vec<T> {
9796

9897
fn clear(&mut self) { Vec::clear(self) }
9998

100-
type Iter<'a> = std::slice::Iter<'a, T>;
99+
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
101100

102101
fn iter(&self) -> Self::Iter<'_> {
103102
self.as_slice().iter()
104103
}
105104

106-
type DrainIter<'a> = std::vec::Drain<'a, T>;
105+
type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
107106

108107
fn drain(&mut self) -> Self::DrainIter<'_> {
109108
self.drain(..)
@@ -165,13 +164,13 @@ mod rc {
165164
}
166165
}
167166

168-
type Iter<'a> = T::Iter<'a>;
167+
type Iter<'a> = T::Iter<'a> where Self: 'a;
169168

170169
fn iter(&self) -> Self::Iter<'_> {
171170
self.deref().iter()
172171
}
173172

174-
type DrainIter<'a> = T::Iter<'a>;
173+
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
175174

176175
fn drain(&mut self) -> Self::DrainIter<'_> {
177176
self.iter()
@@ -206,13 +205,13 @@ mod arc {
206205
}
207206
}
208207

209-
type Iter<'a> = T::Iter<'a>;
208+
type Iter<'a> = T::Iter<'a> where Self: 'a;
210209

211210
fn iter(&self) -> Self::Iter<'_> {
212211
self.deref().iter()
213212
}
214213

215-
type DrainIter<'a> = T::Iter<'a>;
214+
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
216215

217216
fn drain(&mut self) -> Self::DrainIter<'_> {
218217
self.iter()
@@ -232,7 +231,7 @@ pub trait PushPartitioned: PushContainer {
232231
F: FnMut(usize, &mut Self);
233232
}
234233

235-
impl<T: PushContainer + 'static> PushPartitioned for T where for<'a> T::Item<'a>: PushInto<T> {
234+
impl<T: PushContainer> PushPartitioned for T where for<'a> T::Item<'a>: PushInto<T> {
236235
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
237236
where
238237
for<'a> I: FnMut(&Self::Item<'a>) -> usize,

timely/src/dataflow/channels/pact.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
1010
use std::{fmt::{self, Debug}, marker::PhantomData};
1111

12-
use crate::Container;
12+
use crate::{Container, ExchangeData};
1313
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
14-
use crate::communication::{Push, Pull, Data};
14+
use crate::communication::{Push, Pull};
1515
use crate::container::PushPartitioned;
1616
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
1717
use crate::dataflow::channels::{Bundle, Message};
@@ -33,7 +33,7 @@ pub trait ParallelizationContract<T, C> {
3333
#[derive(Debug)]
3434
pub struct Pipeline;
3535

36-
impl<T: 'static, C: Container> ParallelizationContract<T, C> for Pipeline {
36+
impl<T: 'static, C: Container + 'static> ParallelizationContract<T, C> for Pipeline {
3737
type Pusher = LogPusher<T, C, ThreadPusher<Bundle<T, C>>>;
3838
type Puller = LogPuller<T, C, ThreadPuller<Bundle<T, C>>>;
3939
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
@@ -66,7 +66,7 @@ where
6666
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
6767
impl<T: Timestamp, C, H: 'static> ParallelizationContract<T, C> for ExchangeCore<C, H>
6868
where
69-
C: Data + PushPartitioned,
69+
C: PushPartitioned + ExchangeData,
7070
for<'a> H: FnMut(&C::Item<'a>) -> u64
7171
{
7272
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>, H>;

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use crate::communication::Push;
44
use crate::container::PushPartitioned;
55
use crate::dataflow::channels::{Bundle, Message};
6-
use crate::{Container, Data};
6+
use crate::Data;
77

88
// TODO : Software write combining
99
/// Distributes records among target pushees according to a distribution function.
@@ -44,9 +44,9 @@ where
4444
}
4545
}
4646

47-
impl<T: Eq+Data, C: Container, P: Push<Bundle<T, C>>, H, > Push<Bundle<T, C>> for Exchange<T, C, P, H>
47+
impl<T: Eq+Data, C, P: Push<Bundle<T, C>>, H, > Push<Bundle<T, C>> for Exchange<T, C, P, H>
4848
where
49-
C: PushPartitioned,
49+
C: PushPartitioned+Data,
5050
for<'a> H: FnMut(&C::Item<'a>) -> u64
5151
{
5252
#[inline(never)]

timely/src/dataflow/channels/pushers/tee.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub struct Tee<T, C> {
1717
shared: PushList<T, C>,
1818
}
1919

20-
impl<T: Data, C: Container> Push<Bundle<T, C>> for Tee<T, C> {
20+
impl<T: Data, C: Container+Data> Push<Bundle<T, C>> for Tee<T, C> {
2121
#[inline]
2222
fn push(&mut self, message: &mut Option<Bundle<T, C>>) {
2323
let mut pushers = self.shared.borrow_mut();

timely/src/dataflow/operators/branch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ pub trait BranchWhen<T>: Sized {
9494
fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
9595
}
9696

97-
impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
97+
impl<S: Scope, C: Container + Data> BranchWhen<S::Timestamp> for StreamCore<S, C> {
9898
fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
9999
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
100100

timely/src/dataflow/operators/core/capture/capture.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ use crate::dataflow::channels::pact::Pipeline;
1010
use crate::dataflow::channels::pullers::Counter as PullCounter;
1111
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
1212

13-
use crate::Container;
13+
use crate::{Container, Data};
1414
use crate::progress::ChangeBatch;
1515
use crate::progress::Timestamp;
1616

1717
use super::{Event, EventPusher};
1818

1919
/// Capture a stream of timestamped data for later replay.
20-
pub trait Capture<T: Timestamp, C: Container> {
20+
pub trait Capture<T: Timestamp, C: Container+Data> {
2121
/// Captures a stream of timestamped data for later replay.
2222
///
2323
/// # Examples
@@ -113,7 +113,7 @@ pub trait Capture<T: Timestamp, C: Container> {
113113
}
114114
}
115115

116-
impl<S: Scope, C: Container> Capture<S::Timestamp, C> for StreamCore<S, C> {
116+
impl<S: Scope, C: Container + Data> Capture<S::Timestamp, C> for StreamCore<S, C> {
117117
fn capture_into<P: EventPusher<S::Timestamp, C>+'static>(&self, mut event_pusher: P) {
118118

119119
let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());

timely/src/dataflow/operators/core/capture/replay.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use crate::progress::Timestamp;
4646

4747
use super::Event;
4848
use super::event::EventIterator;
49-
use crate::Container;
49+
use crate::{Container, Data};
5050

5151
/// Replay a capture stream into a scope with the same timestamp.
5252
pub trait Replay<T: Timestamp, C> : Sized {
@@ -62,7 +62,7 @@ pub trait Replay<T: Timestamp, C> : Sized {
6262
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>;
6363
}
6464

65-
impl<T: Timestamp, C: Container, I> Replay<T, C> for I
65+
impl<T: Timestamp, C: Container + Data, I> Replay<T, C> for I
6666
where
6767
I : IntoIterator,
6868
<I as IntoIterator>::Item: EventIterator<T, C>+'static,

timely/src/dataflow/operators/core/concat.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Merges the contents of multiple streams.
22
33

4-
use crate::Container;
4+
use crate::{Container, Data};
55
use crate::dataflow::channels::pact::Pipeline;
66
use crate::dataflow::{StreamCore, Scope};
77

@@ -23,7 +23,7 @@ pub trait Concat<G: Scope, C: Container> {
2323
fn concat(&self, _: &StreamCore<G, C>) -> StreamCore<G, C>;
2424
}
2525

26-
impl<G: Scope, C: Container> Concat<G, C> for StreamCore<G, C> {
26+
impl<G: Scope, C: Container + Data> Concat<G, C> for StreamCore<G, C> {
2727
fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C> {
2828
self.scope().concatenate([self.clone(), other.clone()])
2929
}
@@ -52,7 +52,7 @@ pub trait Concatenate<G: Scope, C: Container> {
5252
I: IntoIterator<Item=StreamCore<G, C>>;
5353
}
5454

55-
impl<G: Scope, C: Container> Concatenate<G, C> for StreamCore<G, C> {
55+
impl<G: Scope, C: Container + Data> Concatenate<G, C> for StreamCore<G, C> {
5656
fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
5757
where
5858
I: IntoIterator<Item=StreamCore<G, C>>
@@ -62,7 +62,7 @@ impl<G: Scope, C: Container> Concatenate<G, C> for StreamCore<G, C> {
6262
}
6363
}
6464

65-
impl<G: Scope, C: Container> Concatenate<G, C> for G {
65+
impl<G: Scope, C: Container + Data> Concatenate<G, C> for G {
6666
fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
6767
where
6868
I: IntoIterator<Item=StreamCore<G, C>>

timely/src/dataflow/operators/core/enterleave.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pub trait Leave<G: Scope, C: Container> {
103103
fn leave(&self) -> StreamCore<G, C>;
104104
}
105105

106-
impl<'a, G: Scope, C: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'a, G, T>, C> {
106+
impl<'a, G: Scope, C: Container+Data, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'a, G, T>, C> {
107107
fn leave(&self) -> StreamCore<G, C> {
108108

109109
let scope = self.scope();
@@ -130,14 +130,14 @@ impl<'a, G: Scope, C: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave
130130
}
131131

132132

133-
struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container> {
133+
struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container+Data> {
134134
targets: Counter<TInner, TContainer, Tee<TInner, TContainer>>,
135135
phantom: ::std::marker::PhantomData<TOuter>,
136136
activator: crate::scheduling::Activator,
137137
active: bool,
138138
}
139139

140-
impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container> Push<Bundle<TOuter, TContainer>> for IngressNub<TOuter, TInner, TContainer> {
140+
impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container+Data> Push<Bundle<TOuter, TContainer>> for IngressNub<TOuter, TInner, TContainer> {
141141
fn push(&mut self, element: &mut Option<Bundle<TOuter, TContainer>>) {
142142
if let Some(message) = element {
143143
let outer_message = message.as_mut();

0 commit comments

Comments
 (0)