Skip to content

Commit 6300718

Browse files
committed
Remove Data trait
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 96079f8 commit 6300718

35 files changed

+101
-125
lines changed

timely/src/dataflow/channels/pact.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use crate::dataflow::channels::Message;
1818
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
1919
use crate::progress::Timestamp;
2020
use crate::worker::AsWorker;
21-
use crate::Data;
2221

2322
/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
2423
pub trait ParallelizationContract<T, C> {
@@ -84,7 +83,7 @@ impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for
8483
where
8584
CB: ContainerBuilder,
8685
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
87-
CB::Container: Data + Send + SizableContainer + crate::dataflow::channels::ContainerBytes,
86+
CB::Container: Send + SizableContainer + crate::dataflow::channels::ContainerBytes,
8887
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
8988
{
9089
type Pusher = ExchangePusher<T, CB, LogPusher<T, CB::Container, Box<dyn Push<Message<T, CB::Container>>>>, H>;

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use crate::communication::Push;
44
use crate::container::{ContainerBuilder, SizableContainer, PushInto};
55
use crate::dataflow::channels::Message;
6-
use crate::{Container, Data};
6+
use crate::Container;
77

88
// TODO : Software write combining
99
/// Distributes records among target pushees according to a distribution function.
@@ -50,7 +50,7 @@ where
5050
}
5151
}
5252

53-
impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
53+
impl<T: Eq+Clone, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
5454
where
5555
CB: ContainerBuilder,
5656
CB::Container: SizableContainer,

timely/src/dataflow/channels/pushers/owned.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use std::fmt;
55
use std::rc::Rc;
66

77
use timely_communication::Push;
8-
use crate::{Data, Container};
98
use crate::dataflow::channels::Message;
109

1110
/// A pusher that can bind to a single downstream pusher.
@@ -37,7 +36,7 @@ impl<T, D> Clone for PushOwned<T, D> {
3736
}
3837
}
3938

40-
impl<T: Data, D: Container> Push<Message<T, D>> for PushOwned<T, D> {
39+
impl<T, D> Push<Message<T, D>> for PushOwned<T, D> {
4140
#[inline]
4241
fn push(&mut self, message: &mut Option<Message<T, D>>) {
4342
let mut pusher = self.0.borrow_mut();

timely/src/dataflow/channels/pushers/tee.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::rc::Rc;
77
use crate::dataflow::channels::Message;
88

99
use crate::communication::Push;
10-
use crate::{Container, Data};
10+
use crate::Container;
1111

1212
type PushList<T, C> = Rc<RefCell<Vec<Box<dyn Push<Message<T, C>>>>>>;
1313

@@ -17,7 +17,7 @@ pub struct Tee<T, C> {
1717
shared: PushList<T, C>,
1818
}
1919

20-
impl<T: Data, C: Container + Data> Push<Message<T, C>> for Tee<T, C> {
20+
impl<T: Clone + 'static, C: Container + Clone + 'static> Push<Message<T, C>> for Tee<T, C> {
2121
#[inline]
2222
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2323
let mut pushers = self.shared.borrow_mut();

timely/src/dataflow/operators/aggregation/aggregate.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
use std::hash::Hash;
33
use std::collections::HashMap;
44

5-
use crate::{Data, ExchangeData};
5+
use crate::ExchangeData;
66
use crate::dataflow::{Scope, StreamLike, OwnedStream};
77
use crate::dataflow::operators::generic::operator::Operator;
88
use crate::dataflow::channels::pact::Exchange;
@@ -60,7 +60,7 @@ pub trait Aggregate<S: Scope, K: ExchangeData+Hash, V: ExchangeData> {
6060
/// .inspect(|x| assert!(*x == (0, 5) || *x == (1, 5)));
6161
/// });
6262
/// ```
63-
fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
63+
fn aggregate<R: 'static, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
6464
self,
6565
fold: F,
6666
emit: E,
@@ -75,7 +75,7 @@ where
7575
S: StreamLike<G, Vec<(K, V)>>,
7676
{
7777

78-
fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
78+
fn aggregate<R: 'static, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
7979
self,
8080
fold: F,
8181
emit: E,

timely/src/dataflow/operators/aggregation/state_machine.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
use std::hash::Hash;
33
use std::collections::HashMap;
44

5-
use crate::{Data, ExchangeData};
5+
use crate::ExchangeData;
66
use crate::dataflow::{OwnedStream, Scope, StreamLike};
77
use crate::dataflow::operators::generic::operator::Operator;
88
use crate::dataflow::channels::pact::Exchange;
@@ -46,7 +46,7 @@ pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
4646
/// });
4747
/// ```
4848
fn state_machine<
49-
R: Data, // output type
49+
R: 'static, // output type
5050
D: Default+'static, // per-key state (data)
5151
I: IntoIterator<Item=R>, // type of output iterator
5252
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
@@ -62,7 +62,7 @@ where
6262
S: StreamLike<G, Vec<(K, V)>>,
6363
{
6464
fn state_machine<
65-
R: Data, // output type
65+
R: 'static, // output type
6666
D: Default+'static, // per-key state (data)
6767
I: IntoIterator<Item=R>, // type of output iterator
6868
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic

timely/src/dataflow/operators/branch.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
use crate::dataflow::channels::pact::Pipeline;
44
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
55
use crate::dataflow::{Scope, OwnedStream, StreamLike};
6-
use crate::{Container, Data};
6+
use crate::Container;
77

88
/// Extension trait for `Stream`.
9-
pub trait Branch<S: Scope, D: Data> {
9+
pub trait Branch<S: Scope, D> {
1010
/// Takes one input stream and splits it into two output streams.
1111
/// For each record, the supplied closure is called with a reference to
1212
/// the data and its time. If it returns `true`, the record will be sent
@@ -34,7 +34,7 @@ pub trait Branch<S: Scope, D: Data> {
3434
) -> (OwnedStream<S, Vec<D>>, OwnedStream<S, Vec<D>>);
3535
}
3636

37-
impl<G: Scope, D: Data, S: StreamLike<G, Vec<D>>> Branch<G, D> for S {
37+
impl<G: Scope, D: 'static, S: StreamLike<G, Vec<D>>> Branch<G, D> for S {
3838
fn branch(
3939
self,
4040
condition: impl Fn(&G::Timestamp, &D) -> bool + 'static,
@@ -92,7 +92,7 @@ pub trait BranchWhen<G: Scope, C: Container>: Sized {
9292
fn branch_when(self, condition: impl Fn(&G::Timestamp) -> bool + 'static) -> (OwnedStream<G, C>, OwnedStream<G, C>);
9393
}
9494

95-
impl<G: Scope, C: Container + Data, S: StreamLike<G, C>> BranchWhen<G, C> for S {
95+
impl<G: Scope, C: Container + 'static, S: StreamLike<G, C>> BranchWhen<G, C> for S {
9696
fn branch_when(self, condition: impl Fn(&G::Timestamp) -> bool + 'static) -> (OwnedStream<G, C>, OwnedStream<G, C>) {
9797
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
9898

timely/src/dataflow/operators/broadcast.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub trait Broadcast<G: Scope, D: ExchangeData> {
2121
fn broadcast(self) -> OwnedStream<G, Vec<D>>;
2222
}
2323

24-
impl<G: Scope, D: ExchangeData, S: StreamLike<G, Vec<D>>> Broadcast<G, D> for S {
24+
impl<G: Scope, D: ExchangeData + Clone, S: StreamLike<G, Vec<D>>> Broadcast<G, D> for S {
2525
fn broadcast(self) -> OwnedStream<G, Vec<D>> {
2626

2727
// NOTE: Simplified implementation due to underlying motion

timely/src/dataflow/operators/core/enterleave.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::logging::{TimelyLogger, MessagesEvent};
2525
use crate::progress::Timestamp;
2626
use crate::progress::timestamp::Refines;
2727
use crate::progress::{Source, Target};
28-
use crate::{Container, Data};
28+
use crate::Container;
2929
use crate::communication::Push;
3030
use crate::dataflow::channels::pushers::{Counter, PushOwned};
3131
use crate::dataflow::channels::Message;
@@ -53,7 +53,7 @@ pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> {
5353
fn enter<'a>(self, _: &Child<'a, G, T>) -> OwnedStream<Child<'a, G, T>, C>;
5454
}
5555

56-
impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container, S: StreamLike<G, C>> Enter<G, T, C> for S {
56+
impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container + 'static, S: StreamLike<G, C>> Enter<G, T, C> for S {
5757
fn enter<'a>(self, scope: &Child<'a, G, T>) -> OwnedStream<Child<'a, G, T>, C> {
5858

5959
use crate::scheduling::Scheduler;
@@ -130,14 +130,14 @@ impl<'a, G: Scope, C: Container + 'static, T: Timestamp+Refines<G::Timestamp>, S
130130
}
131131

132132

133-
struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container + Data> {
133+
struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container> {
134134
targets: Counter<TInner, TContainer, PushOwned<TInner, TContainer>>,
135135
phantom: PhantomData<TOuter>,
136136
activator: crate::scheduling::Activator,
137137
active: bool,
138138
}
139139

140-
impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container + Data> Push<Message<TOuter, TContainer>> for IngressNub<TOuter, TInner, TContainer> {
140+
impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container> Push<Message<TOuter, TContainer>> for IngressNub<TOuter, TInner, TContainer> {
141141
fn push(&mut self, element: &mut Option<Message<TOuter, TContainer>>) {
142142
if let Some(outer_message) = element {
143143
let data = ::std::mem::take(&mut outer_message.data);

timely/src/dataflow/operators/core/feedback.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Create cycles in a timely dataflow graph.
22
3-
use crate::{Container, Data};
3+
use crate::Container;
44
use crate::container::CapacityContainerBuilder;
55
use crate::dataflow::channels::pact::Pipeline;
66
use crate::dataflow::channels::pushers::PushOwned;
@@ -85,7 +85,7 @@ impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T>
8585
}
8686

8787
/// Connect a `Stream` to the input of a loop variable.
88-
pub trait ConnectLoop<G: Scope, C: Container + Data> {
88+
pub trait ConnectLoop<G: Scope, C: Container> {
8989
/// Connect a `Stream` to be the input of a loop variable.
9090
///
9191
/// # Examples
@@ -106,7 +106,7 @@ pub trait ConnectLoop<G: Scope, C: Container + Data> {
106106
fn connect_loop(self, handle: Handle<G, C>);
107107
}
108108

109-
impl<G: Scope, C: Container + Data, S: StreamLike<G, C>> ConnectLoop<G, C> for S {
109+
impl<G: Scope, C: Container + 'static, S: StreamLike<G, C>> ConnectLoop<G, C> for S {
110110
fn connect_loop(self, handle: Handle<G, C>) {
111111

112112
let mut builder = handle.builder;

0 commit comments

Comments
 (0)