Skip to content

Commit cb41e11

Browse files
committed
Owned streams take 2
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent f2ea960 commit cb41e11

29 files changed

+108
-107
lines changed

timely/examples/bfs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ fn main() {
4848

4949
// use the stream of edges
5050
graph.binary_notify(
51-
&stream,
51+
stream,
5252
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
5353
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
5454
"BFS",

timely/examples/hashjoin.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ fn main() {
3535
let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
3636

3737
stream1
38-
.binary(&stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
38+
.binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
3939

4040
let mut map1 = HashMap::<u64, Vec<u64>>::new();
4141
let mut map2 = HashMap::<u64, Vec<u64>>::new();

timely/examples/loopdemo.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ fn main() {
3232
.concat(&loop_stream)
3333
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 })
3434
.filter(|x| x > &1);
35-
36-
step.connect_loop(loop_handle);
37-
step.probe_with(&mut probe);
35+
step
36+
.probe_with(&mut probe)
37+
.connect_loop(loop_handle);
3838
});
3939

4040
let ns_per_request = 1_000_000_000 / rate;
@@ -122,4 +122,4 @@ fn main() {
122122
}
123123

124124
}).unwrap();
125-
}
125+
}

timely/examples/pagerank.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ fn main() {
2626

2727
// bring edges and ranks together!
2828
let changes = edge_stream.binary_frontier(
29-
&rank_stream,
29+
rank_stream,
3030
Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
3131
Exchange::new(|x: &(usize, i64)| x.0 as u64),
3232
"PageRank",

timely/examples/unionfind.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ fn main() {
5151
}
5252

5353
trait UnionFind {
54-
fn union_find(&self) -> Self;
54+
fn union_find(self) -> Self;
5555
}
5656

5757
impl<G: Scope> UnionFind for Stream<G, (usize, usize)> {
58-
fn union_find(&self) -> Stream<G, (usize, usize)> {
58+
fn union_find(self) -> Stream<G, (usize, usize)> {
5959

6060
self.unary(Pipeline, "UnionFind", |_,_| {
6161

timely/src/dataflow/operators/aggregation/aggregate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub trait Aggregate<S: Scope, K: ExchangeData+Hash, V: ExchangeData> {
6161
/// });
6262
/// ```
6363
fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
64-
&self,
64+
self,
6565
fold: F,
6666
emit: E,
6767
hash: H) -> Stream<S, R> where S::Timestamp: Eq;
@@ -70,7 +70,7 @@ pub trait Aggregate<S: Scope, K: ExchangeData+Hash, V: ExchangeData> {
7070
impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)> {
7171

7272
fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
73-
&self,
73+
self,
7474
fold: F,
7575
emit: E,
7676
hash: H) -> Stream<S, R> where S::Timestamp: Eq {

timely/src/dataflow/operators/aggregation/state_machine.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
5151
I: IntoIterator<Item=R>, // type of output iterator
5252
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
5353
H: Fn(&K)->u64+'static, // "hash" function for keys
54-
>(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq ;
54+
>(self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq ;
5555
}
5656

5757
impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)> {
@@ -61,7 +61,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
6161
I: IntoIterator<Item=R>, // type of output iterator
6262
F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic
6363
H: Fn(&K)->u64+'static, // "hash" function for keys
64-
>(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq {
64+
>(self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq {
6565

6666
let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
6767
let mut states = HashMap::new(); // keys -> state

timely/src/dataflow/operators/branch.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ pub trait Branch<S: Scope, D: Data> {
2929
/// });
3030
/// ```
3131
fn branch(
32-
&self,
32+
self,
3333
condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
3434
) -> (Stream<S, D>, Stream<S, D>);
3535
}
3636

3737
impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
3838
fn branch(
39-
&self,
39+
self,
4040
condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
4141
) -> (Stream<S, D>, Stream<S, D>) {
4242
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
@@ -91,11 +91,11 @@ pub trait BranchWhen<T>: Sized {
9191
/// after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
9292
/// });
9393
/// ```
94-
fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
94+
fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
9595
}
9696

9797
impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
98-
fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
98+
fn branch_when(self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
9999
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
100100

101101
let mut input = builder.new_input(self, Pipeline);

timely/src/dataflow/operators/broadcast.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ pub trait Broadcast<D: ExchangeData> {
1818
/// .inspect(|x| println!("seen: {:?}", x));
1919
/// });
2020
/// ```
21-
fn broadcast(&self) -> Self;
21+
fn broadcast(self) -> Self;
2222
}
2323

2424
impl<G: Scope, D: ExchangeData> Broadcast<D> for Stream<G, D> {
25-
fn broadcast(&self) -> Stream<G, D> {
25+
fn broadcast(self) -> Stream<G, D> {
2626

2727
// NOTE: Simplified implementation due to underlying motion
2828
// in timely dataflow internals. Optimize once they have

timely/src/dataflow/operators/capture/capture.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::progress::Timestamp;
1717
use super::{EventCore, EventPusherCore};
1818

1919
/// Capture a stream of timestamped data for later replay.
20-
pub trait Capture<T: Timestamp, D: Container> {
20+
pub trait Capture<T: Timestamp, D: Container>: Sized {
2121
/// Captures a stream of timestamped data for later replay.
2222
///
2323
/// # Examples
@@ -103,18 +103,18 @@ pub trait Capture<T: Timestamp, D: Container> {
103103
///
104104
/// assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());
105105
/// ```
106-
fn capture_into<P: EventPusherCore<T, D>+'static>(&self, pusher: P);
106+
fn capture_into<P: EventPusherCore<T, D>+'static>(self, pusher: P);
107107

108108
/// Captures a stream using Rust's MPSC channels.
109-
fn capture(&self) -> ::std::sync::mpsc::Receiver<EventCore<T, D>> {
109+
fn capture(self) -> ::std::sync::mpsc::Receiver<EventCore<T, D>> {
110110
let (send, recv) = ::std::sync::mpsc::channel();
111111
self.capture_into(send);
112112
recv
113113
}
114114
}
115115

116116
impl<S: Scope, D: Container> Capture<S::Timestamp, D> for StreamCore<S, D> {
117-
fn capture_into<P: EventPusherCore<S::Timestamp, D>+'static>(&self, mut event_pusher: P) {
117+
fn capture_into<P: EventPusherCore<S::Timestamp, D>+'static>(self, mut event_pusher: P) {
118118

119119
let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
120120
let mut input = PullCounter::new(builder.new_input(self, Pipeline));

0 commit comments

Comments
 (0)