Skip to content

Commit 23cff99

Browse files
committed
wip
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent cd87e24 commit 23cff99

36 files changed

+201
-187
lines changed

container/src/lib.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,7 @@ pub mod columnation;
1111
///
1212
/// A container must implement default. The default implementation is not required to allocate
1313
/// memory for variable-length components.
14-
///
15-
/// We require the container to be cloneable to enable efficient copies when providing references
16-
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
17-
/// is efficient (which is not necessarily the case when deriving `Clone`.)
18-
/// TODO: Don't require `Container: Clone`
19-
pub trait Container: Default + Clone + 'static {
14+
pub trait Container: Default + 'static {
2015
/// The type of elements this container holds.
2116
type Item;
2217

@@ -40,7 +35,7 @@ pub trait Container: Default + Clone + 'static {
4035
fn clear(&mut self);
4136
}
4237

43-
impl<T: Clone + 'static> Container for Vec<T> {
38+
impl<T: 'static> Container for Vec<T> {
4439
type Item = T;
4540

4641
fn len(&self) -> usize {
@@ -118,7 +113,7 @@ pub trait PushPartitioned: Container {
118113
F: FnMut(usize, &mut Self);
119114
}
120115

121-
impl<T: Clone + 'static> PushPartitioned for Vec<T> {
116+
impl<T: 'static> PushPartitioned for Vec<T> {
122117
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
123118
where
124119
I: FnMut(&Self::Item) -> usize,

kafkaesque/src/kafka_source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use timely::dataflow::operators::generic::OutputHandle;
55

66
use rdkafka::Message;
77
use rdkafka::consumer::{ConsumerContext, BaseConsumer};
8-
use timely::dataflow::channels::pushers::tee::PushOwned;
8+
use timely::dataflow::channels::pushers::PushOwned;
99
use timely::dataflow::stream::OwnedStream;
1010

1111
/// Constructs a stream of data from a Kafka consumer.

timely/src/dataflow/channels/pact.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ pub trait ParallelizationContractCore<T, D> {
3333

3434
/// A `ParallelizationContractCore` specialized for `Vec` containers
3535
/// TODO: Use trait aliases once stable.
36-
pub trait ParallelizationContract<T, D: Clone>: ParallelizationContractCore<T, Vec<D>> { }
37-
impl<T, D: Clone, P: ParallelizationContractCore<T, Vec<D>>> ParallelizationContract<T, D> for P { }
36+
pub trait ParallelizationContract<T, D>: ParallelizationContractCore<T, Vec<D>> { }
37+
impl<T, D, P: ParallelizationContractCore<T, Vec<D>>> ParallelizationContract<T, D> for P { }
3838

3939
/// A direct connection
4040
#[derive(Debug)]
@@ -71,7 +71,7 @@ impl<C, D, F: FnMut(&D)->u64+'static> ExchangeCore<C, D, F> {
7171
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
7272
impl<T: Timestamp, C, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContractCore<T, C> for ExchangeCore<C, D, F>
7373
where
74-
C: Data + Container + PushPartitioned<Item=D>,
74+
C: Data + Clone + PushPartitioned<Item=D>,
7575
{
7676
type Pusher = ExchangePusher<T, C, D, LogPusher<T, C, Box<dyn Push<BundleCore<T, C>>>>, F>;
7777
type Puller = LogPuller<T, C, Box<dyn Pull<BundleCore<T, C>>>>;

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl<T: Clone, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) ->
4040
}
4141
}
4242

43-
impl<T: Eq+Data, C: Container, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> Push<BundleCore<T, C>> for Exchange<T, C, D, P, H>
43+
impl<T: Eq+Data, C: Container + Data, D: Data, P: Push<BundleCore<T, C>>, H: FnMut(&D) -> u64> Push<BundleCore<T, C>> for Exchange<T, C, D, P, H>
4444
where
4545
C: PushPartitioned<Item=D>
4646
{

timely/src/dataflow/channels/pushers/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
pub use self::owned::PushOwned;
12
pub use self::tee::{Tee, TeeCore, TeeHelper};
23
pub use self::exchange::Exchange;
34
pub use self::counter::{Counter, CounterCore};
45

6+
pub mod owned;
57
pub mod tee;
68
pub mod exchange;
79
pub mod counter;
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//! A `Push` implementor with a single target.
2+
3+
use std::cell::RefCell;
4+
use std::fmt;
5+
use std::rc::Rc;
6+
7+
use timely_communication::Push;
8+
use crate::{Data, Container};
9+
10+
use crate::dataflow::channels::BundleCore;
11+
12+
/// A pusher that can bind to a single downstream pusher.
13+
pub struct PushOwned<T, D>(Rc<RefCell<Option<Box<dyn Push<BundleCore<T, D>>>>>>);
14+
15+
impl<T, D> PushOwned<T, D> {
16+
/// Create a new `PushOwned`. Similarly to `Tee`, it returns a pair where either element
17+
/// can be used as pusher or registrar.
18+
pub fn new() -> (Self, Self) {
19+
let zelf = Self(Rc::new(RefCell::new(None)));
20+
(zelf.clone(), zelf)
21+
}
22+
23+
/// Set the downstream pusher.
24+
pub fn set<P: Push<BundleCore<T, D>> + 'static>(self, pusher: P) {
25+
*self.0.borrow_mut() = Some(Box::new(pusher));
26+
}
27+
}
28+
29+
impl<T, D> fmt::Debug for PushOwned<T, D> {
30+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31+
f.debug_struct("PushOwned").finish_non_exhaustive()
32+
}
33+
}
34+
35+
impl<T, D> Clone for PushOwned<T, D> {
36+
fn clone(&self) -> Self {
37+
Self(Rc::clone(&self.0))
38+
}
39+
}
40+
41+
impl<T: Data, D: Container> Push<BundleCore<T, D>> for PushOwned<T, D> {
42+
#[inline]
43+
fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
44+
let mut pusher = self.0.borrow_mut();
45+
if let Some(pusher) = pusher.as_mut() {
46+
pusher.push(message);
47+
}
48+
}
49+
}

timely/src/dataflow/channels/pushers/tee.rs

Lines changed: 2 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! A `Push` implementor with a list of `Box<Push>` to forward pushes to.
22
33
use std::cell::RefCell;
4-
use std::fmt::{self, Debug, Formatter};
4+
use std::fmt::{self, Debug};
55
use std::rc::Rc;
66

77
use crate::dataflow::channels::{BundleCore, Message};
@@ -10,39 +10,6 @@ use crate::communication::Push;
1010
use crate::{Container, Data};
1111

1212
type PushList<T, D> = Rc<RefCell<Vec<Box<dyn Push<BundleCore<T, D>>>>>>;
13-
/// TODO
14-
pub struct PushOwned<T, D>(Rc<RefCell<Option<Box<dyn Push<BundleCore<T, D>>>>>>);
15-
16-
impl<T, D> PushOwned<T, D> {
17-
/// TODO
18-
pub fn new() -> (Self, Self) {
19-
let zelf = Self(Rc::new(RefCell::new(None)));
20-
(zelf.clone(), zelf)
21-
}
22-
23-
/// TODO
24-
pub fn set<P: Push<BundleCore<T, D>> + 'static>(&self, pusher: P) {
25-
*self.0.borrow_mut() = Some(Box::new(pusher));
26-
}
27-
}
28-
29-
impl<T, D> Default for PushOwned<T, D> {
30-
fn default() -> Self {
31-
Self(Rc::new(RefCell::new(None)))
32-
}
33-
}
34-
35-
impl<T, D> Debug for PushOwned<T, D> {
36-
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
37-
f.debug_struct("PushOwned").finish_non_exhaustive()
38-
}
39-
}
40-
41-
impl<T, D> Clone for PushOwned<T, D> {
42-
fn clone(&self) -> Self {
43-
Self(Rc::clone(&self.0))
44-
}
45-
}
4613

4714
/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
4815
pub struct TeeCore<T, D> {
@@ -53,18 +20,7 @@ pub struct TeeCore<T, D> {
5320
/// [TeeCore] specialized to `Vec`-based container.
5421
pub type Tee<T, D> = TeeCore<T, Vec<D>>;
5522

56-
impl<T: Data, D: Container> Push<BundleCore<T, D>> for PushOwned<T, D> {
57-
#[inline]
58-
fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
59-
let mut pusher = self.0.borrow_mut();
60-
if let Some(pusher) = pusher.as_mut() {
61-
pusher.push(message);
62-
}
63-
}
64-
}
65-
66-
67-
impl<T: Data, D: Container> Push<BundleCore<T, D>> for TeeCore<T, D> {
23+
impl<T: Data, D: Container+Clone> Push<BundleCore<T, D>> for TeeCore<T, D> {
6824
#[inline]
6925
fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
7026
let mut pushers = self.shared.borrow_mut();

timely/src/dataflow/operators/aggregation/aggregate.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@ use std::hash::Hash;
33
use std::collections::HashMap;
44

55
use crate::{Data, ExchangeData};
6-
use crate::dataflow::{Stream, Scope};
6+
use crate::dataflow::{Scope, StreamLike, OwnedStream};
77
use crate::dataflow::operators::generic::operator::Operator;
88
use crate::dataflow::channels::pact::Exchange;
9-
use crate::dataflow::stream::OwnedStream;
109

1110
/// Generic intra-timestamp aggregation
1211
///
@@ -68,13 +67,19 @@ pub trait Aggregate<S: Scope, K: ExchangeData+Hash, V: ExchangeData> {
6867
hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp: Eq;
6968
}
7069

71-
impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)> {
70+
impl<G, K, V, S> Aggregate<G, K, V> for S
71+
where
72+
G: Scope,
73+
K: ExchangeData + Hash + Eq + Clone,
74+
V: ExchangeData,
75+
S: StreamLike<G, Vec<(K, V)>>,
76+
{
7277

7378
fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
7479
self,
7580
fold: F,
7681
emit: E,
77-
hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp: Eq {
82+
hash: H) -> OwnedStream<G, Vec<R>> where G::Timestamp: Eq {
7883

7984
let mut aggregates = HashMap::new();
8085
let mut vector = Vec::new();

timely/src/dataflow/operators/aggregation/state_machine.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@ use std::hash::Hash;
33
use std::collections::HashMap;
44

55
use crate::{Data, ExchangeData};
6-
use crate::dataflow::{Stream, Scope};
6+
use crate::dataflow::{OwnedStream, Scope, StreamLike};
77
use crate::dataflow::operators::generic::operator::Operator;
88
use crate::dataflow::channels::pact::Exchange;
9-
use crate::dataflow::stream::OwnedStream;
109

1110
/// Generic state-transition machinery: each key has a state, and receives a sequence of events.
1211
/// Events are applied in time-order, but no other promises are made. Each state transition can
@@ -55,14 +54,20 @@ pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
5554
>(self, fold: F, hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp : Hash+Eq ;
5655
}
5756

58-
impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)> {
57+
impl<G, K, V, S> StateMachine<G, K, V> for S
58+
where
59+
G: Scope,
60+
K: ExchangeData + Hash + Eq + Clone,
61+
V: ExchangeData,
62+
S: StreamLike<G, Vec<(K, V)>>,
63+
{
5964
fn state_machine<
6065
R: Data, // output type
6166
D: Default+'static, // per-key state (data)
6267
I: IntoIterator<Item=R>, // type of output iterator
6368
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
6469
H: Fn(&K)->u64+'static, // "hash" function for keys
65-
>(self, fold: F, hash: H) -> OwnedStream<S, Vec<R>> where S::Timestamp : Hash+Eq {
70+
>(self, fold: F, hash: H) -> OwnedStream<G, Vec<R>> where G::Timestamp : Hash+Eq {
6671

6772
let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
6873
let mut states = HashMap::new(); // keys -> state

timely/src/dataflow/operators/branch.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
33
use crate::dataflow::channels::pact::Pipeline;
44
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
5-
use crate::dataflow::{Scope, Stream, StreamCore};
5+
use crate::dataflow::{Scope, OwnedStream, StreamLike};
66
use crate::{Container, Data};
7-
use crate::dataflow::stream::{OwnedStream, StreamLike};
87

98
/// Extension trait for `Stream`.
109
pub trait Branch<S: Scope, D: Data> {
@@ -35,11 +34,11 @@ pub trait Branch<S: Scope, D: Data> {
3534
) -> (OwnedStream<S, Vec<D>>, OwnedStream<S, Vec<D>>);
3635
}
3736

38-
impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
37+
impl<G: Scope, D: Data, S: StreamLike<G, Vec<D>>> Branch<G, D> for S {
3938
fn branch(
4039
self,
41-
condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
42-
) -> (OwnedStream<S, Vec<D>>, OwnedStream<S, Vec<D>>) {
40+
condition: impl Fn(&G::Timestamp, &D) -> bool + 'static,
41+
) -> (OwnedStream<G, Vec<D>>, OwnedStream<G, Vec<D>>) {
4342
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
4443

4544
let mut input = builder.new_input(self, Pipeline);
@@ -95,7 +94,7 @@ pub trait BranchWhen<G: Scope, C: Container>: Sized {
9594
fn branch_when(self, condition: impl Fn(&G::Timestamp) -> bool + 'static) -> (OwnedStream<G, C>, OwnedStream<G, C>);
9695
}
9796

98-
impl<G: Scope, C: Container, S: StreamLike<G, C>> BranchWhen<G, C> for S {
97+
impl<G: Scope, C: Container + Clone, S: StreamLike<G, C>> BranchWhen<G, C> for S {
9998
fn branch_when(self, condition: impl Fn(&G::Timestamp) -> bool + 'static) -> (OwnedStream<G, C>, OwnedStream<G, C>) {
10099
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
101100

0 commit comments

Comments
 (0)