Skip to content

Commit 95792c5

Browse files
committed
Remove Container: Clone + 'static
Remove the requirement that all Container implementations are Clone and 'static. This makes implementing types simpler that depend on Container, and requires us to explicitly mark various places as `Data` such that they comply with Timely's type requirements. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 64be92b commit 95792c5

File tree

22 files changed

+86
-87
lines changed

22 files changed

+86
-87
lines changed

container/src/columnation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ mod container {
295295

296296
use crate::columnation::{Columnation, TimelyStack};
297297

298-
impl<T: Columnation + 'static> Container for TimelyStack<T> {
298+
impl<T: Columnation> Container for TimelyStack<T> {
299299
type Item = T;
300300

301301
fn len(&self) -> usize {
@@ -315,7 +315,7 @@ mod container {
315315
}
316316
}
317317

318-
impl<T: Columnation + 'static> PushPartitioned for TimelyStack<T> {
318+
impl<T: Columnation> PushPartitioned for TimelyStack<T> {
319319
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
320320
where
321321
I: FnMut(&Self::Item) -> usize,

container/src/lib.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ pub mod columnation;
1515
/// We require the container to be cloneable to enable efficient copies when providing references
1616
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
1717
/// is efficient (which is not necessarily the case when deriving `Clone`.)
18-
/// TODO: Don't require `Container: Clone`
19-
pub trait Container: Default + Clone + 'static {
18+
pub trait Container: Default {
2019
/// The type of elements this container holds.
2120
type Item;
2221

@@ -40,7 +39,7 @@ pub trait Container: Default + Clone + 'static {
4039
fn clear(&mut self);
4140
}
4241

43-
impl<T: Clone + 'static> Container for Vec<T> {
42+
impl<T> Container for Vec<T> {
4443
type Item = T;
4544

4645
fn len(&self) -> usize {
@@ -132,7 +131,7 @@ pub trait PushPartitioned: Container {
132131
F: FnMut(usize, &mut Self);
133132
}
134133

135-
impl<T: Clone + 'static> PushPartitioned for Vec<T> {
134+
impl<T> PushPartitioned for Vec<T> {
136135
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
137136
where
138137
I: FnMut(&Self::Item) -> usize,

timely/examples/rc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use abomonation::Abomonation;
88

99
#[derive(Debug, Clone)]
1010
pub struct Test {
11-
field: Rc<usize>,
11+
_field: Rc<usize>,
1212
}
1313

1414
impl Abomonation for Test {
@@ -32,7 +32,7 @@ fn main() {
3232

3333
// introduce data and watch!
3434
for round in 0..10 {
35-
input.send(Test { field: Rc::new(round) } );
35+
input.send(Test { _field: Rc::new(round) } );
3636
input.advance_to(round + 1);
3737
worker.step_while(|| probe.less_than(input.time()));
3838
}

timely/src/dataflow/channels/pact.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
use std::{fmt::{self, Debug}, marker::PhantomData};
1111
use timely_container::PushPartitioned;
1212

13-
use crate::communication::{Push, Pull, Data};
13+
use crate::communication::{Push, Pull};
1414
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
15-
use crate::Container;
15+
use crate::{Container, ExchangeData};
1616

1717
use crate::worker::AsWorker;
1818
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
@@ -33,14 +33,14 @@ pub trait ParallelizationContractCore<T, D> {
3333

3434
/// A `ParallelizationContractCore` specialized for `Vec` containers
3535
/// TODO: Use trait aliases once stable.
36-
pub trait ParallelizationContract<T, D: Clone>: ParallelizationContractCore<T, Vec<D>> { }
37-
impl<T, D: Clone, P: ParallelizationContractCore<T, Vec<D>>> ParallelizationContract<T, D> for P { }
36+
pub trait ParallelizationContract<T, D>: ParallelizationContractCore<T, Vec<D>> { }
37+
impl<T, D, P: ParallelizationContractCore<T, Vec<D>>> ParallelizationContract<T, D> for P { }
3838

3939
/// A direct connection
4040
#[derive(Debug)]
4141
pub struct Pipeline;
4242

43-
impl<T: 'static, D: Container> ParallelizationContractCore<T, D> for Pipeline {
43+
impl<T: 'static, D: Container + 'static> ParallelizationContractCore<T, D> for Pipeline {
4444
type Pusher = LogPusher<T, D, ThreadPusher<BundleCore<T, D>>>;
4545
type Puller = LogPuller<T, D, ThreadPuller<BundleCore<T, D>>>;
4646
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
@@ -69,9 +69,9 @@ impl<C, D, F: FnMut(&D)->u64+'static> ExchangeCore<C, D, F> {
6969
}
7070

7171
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
72-
impl<T: Timestamp, C, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContractCore<T, C> for ExchangeCore<C, D, F>
72+
impl<T: Timestamp, C, D: ExchangeData, F: FnMut(&D)->u64+'static> ParallelizationContractCore<T, C> for ExchangeCore<C, D, F>
7373
where
74-
C: Data + Container + PushPartitioned<Item=D>,
74+
C: ExchangeData + Container + PushPartitioned<Item=D>,
7575
{
7676
type Pusher = ExchangePusher<T, C, D, LogPusher<T, C, Box<dyn Push<BundleCore<T, C>>>>, F>;
7777
type Puller = LogPuller<T, C, Box<dyn Pull<BundleCore<T, C>>>>;

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl<T: Clone, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) ->
4040
}
4141
}
4242

43-
impl<T: Eq+Data, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> Push<BundleCore<T, C>> for Exchange<T, C, D, P, H>
43+
impl<T: Eq+Data, C: Container + Data, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> Push<BundleCore<T, C>> for Exchange<T, C, D, P, H>
4444
where
4545
C: PushPartitioned<Item=D>
4646
{

timely/src/dataflow/channels/pushers/tee.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub struct TeeCore<T, D> {
2020
/// [TeeCore] specialized to `Vec`-based container.
2121
pub type Tee<T, D> = TeeCore<T, Vec<D>>;
2222

23-
impl<T: Data, D: Container> Push<BundleCore<T, D>> for TeeCore<T, D> {
23+
impl<T: Data, D: Container + Data> Push<BundleCore<T, D>> for TeeCore<T, D> {
2424
#[inline]
2525
fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
2626
let mut pushers = self.shared.borrow_mut();

timely/src/dataflow/operators/branch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ pub trait BranchWhen<T>: Sized {
9494
fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
9595
}
9696

97-
impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
97+
impl<S: Scope, C: Container + Data> BranchWhen<S::Timestamp> for StreamCore<S, C> {
9898
fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
9999
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
100100

timely/src/dataflow/operators/capture/capture.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ use crate::dataflow::channels::pact::Pipeline;
1010
use crate::dataflow::channels::pullers::Counter as PullCounter;
1111
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
1212

13-
use crate::Container;
13+
use crate::{Container, Data};
1414
use crate::progress::ChangeBatch;
1515
use crate::progress::Timestamp;
1616

1717
use super::{EventCore, EventPusherCore};
1818

1919
/// Capture a stream of timestamped data for later replay.
20-
pub trait Capture<T: Timestamp, D: Container> {
20+
pub trait Capture<T: Timestamp, D: Container + Data> {
2121
/// Captures a stream of timestamped data for later replay.
2222
///
2323
/// # Examples
@@ -113,7 +113,7 @@ pub trait Capture<T: Timestamp, D: Container> {
113113
}
114114
}
115115

116-
impl<S: Scope, D: Container> Capture<S::Timestamp, D> for StreamCore<S, D> {
116+
impl<S: Scope, D: Container + Data> Capture<S::Timestamp, D> for StreamCore<S, D> {
117117
fn capture_into<P: EventPusherCore<S::Timestamp, D>+'static>(&self, mut event_pusher: P) {
118118

119119
let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
@@ -142,7 +142,7 @@ impl<S: Scope, D: Container> Capture<S::Timestamp, D> for StreamCore<S, D> {
142142
RefOrMut::Ref(reference) => (&reference.time, RefOrMut::Ref(&reference.data)),
143143
RefOrMut::Mut(reference) => (&reference.time, RefOrMut::Mut(&mut reference.data)),
144144
};
145-
let vector = data.replace(Default::default());
145+
let vector = data.take();
146146
event_pusher.push(EventCore::Messages(time.clone(), vector));
147147
}
148148
input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]);

timely/src/dataflow/operators/capture/replay.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use crate::progress::Timestamp;
4646

4747
use super::EventCore;
4848
use super::event::EventIteratorCore;
49-
use crate::Container;
49+
use crate::{Container, Data};
5050

5151
/// Replay a capture stream into a scope with the same timestamp.
5252
pub trait Replay<T: Timestamp, C> : Sized {
@@ -62,7 +62,7 @@ pub trait Replay<T: Timestamp, C> : Sized {
6262
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>;
6363
}
6464

65-
impl<T: Timestamp, C: Container, I> Replay<T, C> for I
65+
impl<T: Timestamp, C: Container + Data, I> Replay<T, C> for I
6666
where I : IntoIterator,
6767
<I as IntoIterator>::Item: EventIteratorCore<T, C>+'static {
6868
fn replay_core<S: Scope<Timestamp=T>>(self, scope: &mut S, period: Option<std::time::Duration>) -> StreamCore<S, C>{

timely/src/dataflow/operators/concat.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Merges the contents of multiple streams.
22
33

4-
use crate::Container;
4+
use crate::{Container, Data};
55
use crate::dataflow::channels::pact::Pipeline;
66
use crate::dataflow::{StreamCore, Scope};
77

@@ -23,7 +23,7 @@ pub trait Concat<G: Scope, D: Container> {
2323
fn concat(&self, _: &StreamCore<G, D>) -> StreamCore<G, D>;
2424
}
2525

26-
impl<G: Scope, D: Container> Concat<G, D> for StreamCore<G, D> {
26+
impl<G: Scope, D: Container + Data> Concat<G, D> for StreamCore<G, D> {
2727
fn concat(&self, other: &StreamCore<G, D>) -> StreamCore<G, D> {
2828
self.scope().concatenate([self.clone(), other.clone()])
2929
}
@@ -52,7 +52,7 @@ pub trait Concatenate<G: Scope, D: Container> {
5252
I: IntoIterator<Item=StreamCore<G, D>>;
5353
}
5454

55-
impl<G: Scope, D: Container> Concatenate<G, D> for StreamCore<G, D> {
55+
impl<G: Scope, D: Container + Data> Concatenate<G, D> for StreamCore<G, D> {
5656
fn concatenate<I>(&self, sources: I) -> StreamCore<G, D>
5757
where
5858
I: IntoIterator<Item=StreamCore<G, D>>
@@ -62,7 +62,7 @@ impl<G: Scope, D: Container> Concatenate<G, D> for StreamCore<G, D> {
6262
}
6363
}
6464

65-
impl<G: Scope, D: Container> Concatenate<G, D> for G {
65+
impl<G: Scope, D: Container + Data> Concatenate<G, D> for G {
6666
fn concatenate<I>(&self, sources: I) -> StreamCore<G, D>
6767
where
6868
I: IntoIterator<Item=StreamCore<G, D>>

0 commit comments

Comments
 (0)