Skip to content

Commit ee8ef50

Browse files
committed
make tests work
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent e76401f commit ee8ef50

File tree

8 files changed

+40
-54
lines changed

8 files changed

+40
-54
lines changed

mdbook/src/chapter_2/chapter_2_3.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,12 @@ use timely::dataflow::operators::{ToStream, Partition, Inspect};
126126

127127
fn main() {
128128
timely::example(|scope| {
129-
let streams = (0..10).to_stream(scope)
129+
let mut streams = (0..10).to_stream(scope)
130130
.partition(3, |x| (x % 3, x));
131131

132-
streams[0].inspect(|x| println!("seen 0: {:?}", x));
133-
streams[1].inspect(|x| println!("seen 1: {:?}", x));
134-
streams[2].inspect(|x| println!("seen 2: {:?}", x));
132+
streams.pop().unwrap().inspect(|x| println!("seen 2: {:?}", x));
133+
streams.pop().unwrap().inspect(|x| println!("seen 1: {:?}", x));
134+
streams.pop().unwrap().inspect(|x| println!("seen 0: {:?}", x));
135135
});
136136
}
137137
```
@@ -147,11 +147,11 @@ use timely::dataflow::operators::{ToStream, Partition, Concat, Inspect};
147147

148148
fn main() {
149149
timely::example(|scope| {
150-
let streams = (0..10).to_stream(scope)
150+
let mut streams = (0..10).to_stream(scope)
151151
.partition(3, |x| (x % 3, x));
152-
streams[0]
153-
.concat(&streams[1])
154-
.concat(&streams[2])
152+
streams.pop().unwrap()
153+
.concat(streams.pop().unwrap())
154+
.concat(streams.pop().unwrap())
155155
.inspect(|x| println!("seen: {:?}", x));
156156
});
157157
}

mdbook/src/chapter_2/chapter_2_4.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ fn main() {
183183
let in1 = (0 .. 10).to_stream(scope);
184184
let in2 = (0 .. 10).to_stream(scope);
185185

186-
in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
186+
in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
187187

188188
let mut notificator = FrontierNotificator::new();
189189
let mut stash = HashMap::new();
@@ -234,7 +234,7 @@ fn main() {
234234
let in1 = (0 .. 10).to_stream(scope);
235235
let in2 = (0 .. 10).to_stream(scope);
236236

237-
in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
237+
in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
238238

239239
let mut stash = HashMap::new();
240240

mdbook/src/chapter_4/chapter_4_2.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn main() {
2424
// circulate numbers, Collatz stepping each time.
2525
(1 .. 10)
2626
.to_stream(scope)
27-
.concat(&stream)
27+
.concat(stream)
2828
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } )
2929
.inspect(|x| println!("{:?}", x))
3030
.filter(|x| *x != 1)
@@ -63,17 +63,17 @@ fn main() {
6363
let results1 = stream1.map(|x| 3 * x + 1);
6464

6565
// partition the input and feedback streams by even-ness.
66-
let parts =
66+
let mut parts =
6767
(1 .. 10)
6868
.to_stream(scope)
69-
.concat(&results0)
70-
.concat(&results1)
69+
.concat(results0)
70+
.concat(results1)
7171
.inspect(|x| println!("{:?}", x))
7272
.partition(2, |x| (x % 2, x));
7373

7474
// connect each part appropriately.
75-
parts[0].connect_loop(handle0);
76-
parts[1].connect_loop(handle1);
75+
parts.pop().unwrap().connect_loop(handle1);
76+
parts.pop().unwrap().connect_loop(handle0);
7777
});
7878
}
7979
```
@@ -103,7 +103,7 @@ fn main() {
103103

104104
input
105105
.enter(subscope)
106-
.concat(&stream)
106+
.concat(stream)
107107
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } )
108108
.inspect(|x| println!("{:?}", x))
109109
.filter(|x| *x != 1)

mdbook/src/chapter_4/chapter_4_3.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ fn main() {
7676
// Assign timestamps to records so that not much work is in each time.
7777
.delay(|number, time| number / 100 )
7878
// Buffer records until all prior timestamps have completed.
79-
.binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
79+
.binary_frontier(cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
8080
8181
let mut vector = Vec::new();
8282

timely/src/dataflow/operators/concat.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub trait Concat<G: Scope, D: Container, S: StreamLike<G, D>> {
1616
/// timely::example(|scope| {
1717
///
1818
/// let stream = (0..10).to_stream(scope).tee();
19-
/// stream.clone().concat(stream)
19+
/// stream.concat(&stream)
2020
/// .inspect(|x| println!("seen: {:?}", x));
2121
/// });
2222
/// ```

timely/src/dataflow/operators/enterleave.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ use crate::progress::{Source, Target};
2727
use crate::order::Product;
2828
use crate::{Container, Data};
2929
use crate::communication::Push;
30-
use crate::dataflow::channels::pushers::{CounterCore, TeeCore};
30+
use crate::dataflow::channels::pushers::{CounterCore, PushOwned};
3131
use crate::dataflow::channels::{BundleCore, Message};
3232

3333
use crate::worker::AsWorker;
34-
use crate::dataflow::{StreamCore, Scope};
35-
use crate::dataflow::channels::pushers::PushOwned;
34+
use crate::dataflow::{OwnedStream, StreamLike, Scope};
3635
use crate::dataflow::scopes::{Child, ScopeParent};
36+
use crate::dataflow::scopes::child::Iterative;
3737
use crate::dataflow::operators::delay::Delay;
3838

3939
/// Extension trait to move a `Stream` into a child of its current `Scope`.
@@ -52,12 +52,9 @@ pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> {
5252
/// });
5353
/// });
5454
/// ```
55-
fn enter<'a>(self, _: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C>;
55+
fn enter<'a>(self, _: &Child<'a, G, T>) -> OwnedStream<Child<'a, G, T>, C>;
5656
}
5757

58-
use crate::dataflow::scopes::child::Iterative;
59-
use crate::dataflow::stream::{OwnedStream, StreamLike};
60-
6158
/// Extension trait to move a `Stream` into a child of its current `Scope` setting the timestamp for each element.
6259
pub trait EnterAt<G: Scope, T: Timestamp, D: Data> {
6360
/// Moves the `Stream` argument into a child of its current `Scope` setting the timestamp for each element by `initial`.
@@ -85,11 +82,11 @@ impl<G: Scope, T: Timestamp, D: Data, E: Enter<G, Product<<G as ScopeParent>::Ti
8582
}
8683

8784
impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container+Data, S: StreamLike<G, C>> Enter<G, T, C> for S {
88-
fn enter<'a>(self, scope: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C> {
85+
fn enter<'a>(self, scope: &Child<'a, G, T>) -> OwnedStream<Child<'a, G, T>, C> {
8986

9087
use crate::scheduling::Scheduler;
9188

92-
let (targets, registrar) = TeeCore::<T, C>::new();
89+
let (targets, registrar) = PushOwned::<T, C>::new();
9390
let ingress = IngressNub {
9491
targets: CounterCore::new(targets),
9592
phantom: ::std::marker::PhantomData,
@@ -102,7 +99,7 @@ impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container+Data, S: StreamL
10299

103100
let channel_id = scope.clone().new_identifier();
104101
self.connect_to(input, ingress, channel_id);
105-
StreamCore::new(Source::new(0, input.port), registrar, scope.clone())
102+
OwnedStream::new(Source::new(0, input.port), registrar, scope.clone())
106103
}
107104
}
108105

@@ -151,7 +148,7 @@ where
151148

152149

153150
struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Container+Data> {
154-
targets: CounterCore<TInner, TData, TeeCore<TInner, TData>>,
151+
targets: CounterCore<TInner, TData, PushOwned<TInner, TData>>,
155152
phantom: ::std::marker::PhantomData<TOuter>,
156153
activator: crate::scheduling::Activator,
157154
active: bool,

timely/src/dataflow/operators/rc.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ mod test {
5151
fn test_shared() {
5252
let output = crate::example(|scope| {
5353
let shared = vec![Ok(0), Err(())].to_stream(scope).shared().tee();
54-
let shared2 = shared.clone();
5554
scope
5655
.concatenate([
5756
shared.unary(Pipeline, "read shared 1", |_, _| {
@@ -63,7 +62,7 @@ mod test {
6362
});
6463
}
6564
}),
66-
shared2.unary(Pipeline, "read shared 2", |_, _| {
65+
shared.unary(Pipeline, "read shared 2", |_, _| {
6766
let mut container = Default::default();
6867
move |input, output| {
6968
input.for_each(|time, data| {

timely/src/dataflow/stream.rs

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,7 @@ impl<S: Scope, D: Container> OwnedStream<S, D> {
6969
/// A stream batching data in vectors.
7070
pub type Stream<S, D> = StreamCore<S, Vec<D>>;
7171

72-
impl<S: Scope, D: Container> StreamLike<S, D> for StreamCore<S, D> {
73-
fn connect_to<P: Push<BundleCore<S::Timestamp, D>> + 'static>(self, target: Target, pusher: P, identifier: usize) {
74-
self.connect_to(target, pusher, identifier)
75-
}
76-
77-
fn scope(&self) -> S {
78-
self.scope()
79-
}
80-
}
81-
82-
impl<S: Scope, D: Container> StreamLike<S, D> for OwnedStream<S, D> {
72+
impl<S: Scope, D: Container> StreamLike<S, D> for &StreamCore<S, D> {
8373
fn connect_to<P: Push<BundleCore<S::Timestamp, D>> + 'static>(self, target: Target, pusher: P, identifier: usize) {
8474
let mut logging = self.scope().logging();
8575
logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
@@ -90,21 +80,16 @@ impl<S: Scope, D: Container> StreamLike<S, D> for OwnedStream<S, D> {
9080
}));
9181

9282
self.scope.add_edge(self.name, target);
93-
self.port.set(pusher);
83+
self.ports.add_pusher(pusher);
9484
}
9585

9686
fn scope(&self) -> S {
9787
self.scope.clone()
9888
}
9989
}
10090

101-
impl<S: Scope, D: Container> StreamCore<S, D> {
102-
/// Connects the stream to a destination.
103-
///
104-
/// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the
105-
/// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
106-
pub fn connect_to<P: Push<BundleCore<S::Timestamp, D>>+'static>(self, target: Target, pusher: P, identifier: usize) {
107-
91+
impl<S: Scope, D: Container> StreamLike<S, D> for OwnedStream<S, D> {
92+
fn connect_to<P: Push<BundleCore<S::Timestamp, D>> + 'static>(self, target: Target, pusher: P, identifier: usize) {
10893
let mut logging = self.scope().logging();
10994
logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
11095
id: identifier,
@@ -114,16 +99,21 @@ impl<S: Scope, D: Container> StreamCore<S, D> {
11499
}));
115100

116101
self.scope.add_edge(self.name, target);
117-
self.ports.add_pusher(pusher);
102+
self.port.set(pusher);
118103
}
104+
105+
fn scope(&self) -> S {
106+
self.scope.clone()
107+
}
108+
}
109+
110+
impl<S: Scope, D: Container> StreamCore<S, D> {
119111
/// Allocates a `Stream` from a supplied `Source` name and rendezvous point.
120112
pub fn new(source: Source, output: TeeHelper<S::Timestamp, D>, scope: S) -> Self {
121113
Self { name: source, ports: output, scope }
122114
}
123115
/// The name of the stream's source operator.
124116
pub fn name(&self) -> &Source { &self.name }
125-
/// The scope immediately containing the stream.
126-
pub fn scope(&self) -> S { self.scope.clone() }
127117
}
128118

129119
impl<S, D> Debug for StreamCore<S, D>

0 commit comments

Comments
 (0)