Skip to content

Commit 3f2d504

Browse files
committed
Deploying to gh-pages from @ 229c114 🚀
1 parent 753d9b4 commit 3f2d504

File tree

9 files changed

+163
-123
lines changed

9 files changed

+163
-123
lines changed

chapter_0/chapter_0_0.html

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -175,16 +175,17 @@ <h1 class="menu-title"></h1>
175175
<main>
176176
<h2 id="a-simplest-example"><a class="header" href="#a-simplest-example">A simplest example</a></h2>
177177
<p>Let's start with what may be the simplest non-trivial timely dataflow program.</p>
178-
<pre><pre class="playground"><code class="language-rust">extern crate timely;
179-
178+
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
179+
</span><span class="boring">fn main() {
180+
</span><span class="boring">extern crate timely;
181+
</span>
180182
use timely::dataflow::operators::{ToStream, Inspect};
181183

182-
fn main() {
183-
timely::example(|scope| {
184-
(0..10).to_stream(scope)
185-
.inspect(|x| println!("seen: {:?}", x));
186-
});
187-
}</code></pre></pre>
184+
timely::example(|scope| {
185+
(0..10).to_stream(scope)
186+
.inspect(|x| println!("seen: {:?}", x));
187+
});
188+
<span class="boring">}</span></code></pre></pre>
188189
<p>This program gives us a bit of a flavor for what a timely dataflow program might look like, including a bit of what Rust looks like, without getting too bogged down in weird stream processing details. Not to worry; we will do that in just a moment!</p>
189190
<p>If we run the program up above, we see it print out the numbers zero through nine.</p>
190191
<pre><code class="language-ignore"> Echidnatron% cargo run --example simple
@@ -203,9 +204,10 @@ <h2 id="a-simplest-example"><a class="header" href="#a-simplest-example">A simpl
203204
Echidnatron%
204205
</code></pre>
205206
<p>This isn't very different from a Rust program that would do this much more simply, namely the program</p>
206-
<pre><pre class="playground"><code class="language-rust">fn main() {
207-
(0..10).for_each(|x| println!("seen: {:?}", x));
208-
}</code></pre></pre>
207+
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
208+
</span><span class="boring">fn main() {
209+
</span>(0..10).for_each(|x| println!("seen: {:?}", x));
210+
<span class="boring">}</span></code></pre></pre>
209211
<p>Why would we want to make our life so complicated? The main reason is that we can make our program <em>reactive</em>, so that we can run it without knowing ahead of time the data we will use, and it will respond as we produce new data.</p>
210212

211213
</main>

chapter_0/chapter_0_1.html

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -176,38 +176,39 @@ <h1 class="menu-title"></h1>
176176
<h2 id="an-example"><a class="header" href="#an-example">An example</a></h2>
177177
<p>Timely dataflow means to capture a large number of idioms, so it is a bit tricky to wrap together one example that shows off all of its features, but let's look at something that shows off some core functionality to give a taste.</p>
178178
<p>The following complete program initializes a timely dataflow computation, in which participants can supply a stream of numbers which are exchanged between the workers based on their value. Workers print to the screen when they see numbers. You can also find this as <a href="https://github.com/TimelyDataflow/timely-dataflow/blob/master/examples/hello.rs"><code>examples/hello.rs</code></a> in the <a href="https://github.com/TimelyDataflow/timely-dataflow/tree/master/examples">timely dataflow repository</a>.</p>
179-
<pre><pre class="playground"><code class="language-rust">extern crate timely;
179+
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
180+
</span><span class="boring">fn main() {
181+
</span>extern crate timely;
180182

181183
use timely::dataflow::InputHandle;
182184
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};
183185

184-
fn main() {
185-
// initializes and runs a timely dataflow.
186-
timely::execute_from_args(std::env::args(), |worker| {
187-
188-
let index = worker.index();
189-
let mut input = InputHandle::new();
190-
191-
// create a new input, exchange data, and inspect its output
192-
let probe = worker.dataflow(|scope|
193-
scope.input_from(&amp;mut input)
194-
.exchange(|x| *x)
195-
.inspect(move |x| println!("worker {}:\thello {}", index, x))
196-
.probe()
197-
);
198-
199-
// introduce data and watch!
200-
for round in 0..10 {
201-
if index == 0 {
202-
input.send(round);
203-
}
204-
input.advance_to(round + 1);
205-
while probe.less_than(input.time()) {
206-
worker.step();
207-
}
186+
// initializes and runs a timely dataflow.
187+
timely::execute_from_args(std::env::args(), |worker| {
188+
189+
let index = worker.index();
190+
let mut input = InputHandle::new();
191+
192+
// create a new input, exchange data, and inspect its output
193+
let probe = worker.dataflow(|scope|
194+
scope.input_from(&amp;mut input)
195+
.exchange(|x| *x)
196+
.inspect(move |x| println!("worker {}:\thello {}", index, x))
197+
.probe()
198+
);
199+
200+
// introduce data and watch!
201+
for round in 0..10 {
202+
if index == 0 {
203+
input.send(round);
204+
}
205+
input.advance_to(round + 1);
206+
while probe.less_than(input.time()) {
207+
worker.step();
208208
}
209-
}).unwrap();
210-
}</code></pre></pre>
209+
}
210+
}).unwrap();
211+
<span class="boring">}</span></code></pre></pre>
211212
<p>We can run this program in a variety of configurations: with just a single worker thread, with one process and multiple worker threads, and with multiple processes each with multiple worker threads.</p>
212213
<p>To try this out yourself, first clone the timely dataflow repository using <code>git</code></p>
213214
<pre><code class="language-ignore"> Echidnatron% git clone https://github.com/TimelyDataflow/timely-dataflow

chapter_4/chapter_4_4.html

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ <h2 id="replaying-streams"><a class="header" href="#replaying-streams">Replaying
264264
<h2 id="an-example"><a class="header" href="#an-example">An Example</a></h2>
265265
<p>We can check out the examples <code>examples/capture_send.rs</code> and <code>examples/capture_recv.rs</code> to see a paired use of capture and receive demonstrating the generality.</p>
266266
<p>The <code>capture_send</code> example creates a new TCP connection for each worker, which it wraps and uses as an <code>EventPusher</code>. Timely dataflow takes care of all the serialization and stuff like that (warning: it uses abomonation, so this is not great for long-term storage).</p>
267-
<pre><code class="language-rust ignore">extern crate timely;
267+
<pre><pre class="playground"><code class="language-rust no_run">extern crate timely;
268268

269269
use std::net::TcpStream;
270270
use timely::dataflow::operators::ToStream;
@@ -282,9 +282,9 @@ <h2 id="an-example"><a class="header" href="#an-example">An Example</a></h2>
282282
.capture_into(EventWriter::new(send))
283283
);
284284
}).unwrap();
285-
}</code></pre>
285+
}</code></pre></pre>
286286
<p>The <code>capture_recv</code> example is more complicated, because we may have a different number of workers replaying the stream than initially captured it.</p>
287-
<pre><code class="language-rust ignore">extern crate timely;
287+
<pre><pre class="playground"><code class="language-rust no_run">extern crate timely;
288288

289289
use std::net::TcpListener;
290290
use timely::dataflow::operators::Inspect;
@@ -303,16 +303,16 @@ <h2 id="an-example"><a class="header" href="#an-example">An Example</a></h2>
303303
.collect::&lt;Vec&lt;_&gt;&gt;()
304304
.into_iter()
305305
.map(|l| l.incoming().next().unwrap().unwrap())
306-
.map(|r| EventReader::&lt;_,u64,_&gt;::new(r))
306+
.map(|r| EventReader::&lt;_,Vec&lt;u64&gt;,_&gt;::new(r))
307307
.collect::&lt;Vec&lt;_&gt;&gt;();
308308

309-
worker.dataflow::&lt;u64,_,_&gt;(|scope| {
309+
worker.dataflow::&lt;u64,_,_&gt;(move |scope| {
310310
replayers
311311
.replay_into(scope)
312312
.inspect(|x| println!("replayed: {:?}", x));
313313
})
314314
}).unwrap(); // asserts error-free execution
315-
}</code></pre>
315+
}</code></pre></pre>
316316
<p>Almost all of the code up above is assigning responsibility for the replaying between the workers we have (from <code>worker.peers()</code>). We partition responsibility for <code>0 .. source_peers</code> among the workers, create <code>TcpListener</code>s to handle the connection requests, wrap them in <code>EventReader</code>s, and then collect them up as a vector. The workers have collectively partitioned the incoming captured streams between themselves.</p>
317317
<p>Finally, each worker just uses the list of <code>EventReader</code>s as the argument to <code>replay_into</code>, and we get the stream magically transported into a new dataflow, in a different process, with a potentially different number of workers.</p>
318318
<p>If you want to try it out, make sure to start up the <code>capture_recv</code> example first (otherwise the connections will be refused for <code>capture_send</code>) and specify the expected number of source workers, modifying the number of received workers if you like. Here we are expecting five source workers, and distributing them among three receive workers (to make life complicated):</p>

chapter_4/chapter_4_5.html

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,12 +185,18 @@ <h2 id="the-exchangedata-trait"><a class="header" href="#the-exchangedata-trait"
185185
<p>The <code>ExchangeData</code> trait is more complicated, and is established in the <code>communication/</code> module. The trait is a synonym for</p>
186186
<pre><code class="language-rust ignore">Send+Sync+Any+serde::Serialize+for&lt;'a&gt;serde::Deserialize&lt;'a&gt;+'static</code></pre>
187187
<p>where <code>serde</code> is Rust's most popular serialization and deserialization crate. A great many types implement these traits. If your types does not, you should add these decorators to their definition:</p>
188-
<pre><code class="language-rust ignore">#[derive(Serialize, Deserialize)]</code></pre>
188+
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
189+
</span><span class="boring">fn main() {
190+
</span><span class="boring">extern crate serde;
191+
</span><span class="boring">use serde::{Serialize, Deserialize};
192+
</span>#[derive(Serialize, Deserialize)]
193+
<span class="boring">struct Dummy {}
194+
</span><span class="boring">}</span></code></pre></pre>
189195
<p>You must include the <code>serde</code> crate, and if not on Rust 2018 the <code>serde_derive</code> crate.</p>
190196
<p>The downside to is that deserialization will always involve a clone of the data, which has the potential to adversely impact performance. For example, if you have structures that contain lots of strings, timely dataflow will create allocations for each string even if you do not plan to use all of them.</p>
191197
<h2 id="an-example"><a class="header" href="#an-example">An example</a></h2>
192198
<p>Let's imagine you would like to play around with a tree data structure as something you might send around in timely dataflow. I've written the following candidate example:</p>
193-
<pre><code class="language-rust ignore">extern crate timely;
199+
<pre><pre class="playground"><code class="language-rust compile_fail">extern crate timely;
194200

195201
use timely::dataflow::operators::*;
196202

@@ -212,7 +218,7 @@ <h2 id="an-example"><a class="header" href="#an-example">An example</a></h2>
212218
fn new(data: D) -&gt; Self {
213219
Self { data, children: Vec::new() }
214220
}
215-
}</code></pre>
221+
}</code></pre></pre>
216222
<p>This doesn't work. You'll probably get two errors, that <code>TreeNode</code> doesn't implement <code>Clone</code>, nor does it implement <code>Debug</code>. Timely data types need to implement <code>Clone</code>, and our attempt to print out the trees requires an implementation of <code>Debug</code>. We can create these implementations by decorating the <code>struct</code> declaration like so:</p>
217223
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
218224
</span><span class="boring">fn main() {
@@ -242,7 +248,7 @@ <h2 id="an-example"><a class="header" href="#an-example">An example</a></h2>
242248
<h3 id="exchanging-data"><a class="header" href="#exchanging-data">Exchanging data</a></h3>
243249
<p>Let's up the level a bit and try and shuffle our tree data between workers.</p>
244250
<p>If we replace our <code>main</code> method with this new one:</p>
245-
<pre><code class="language-rust ignore">extern crate timely;
251+
<pre><pre class="playground"><code class="language-rust compile_fail">extern crate timely;
246252

247253
use timely::dataflow::operators::*;
248254

@@ -265,19 +271,22 @@ <h3 id="exchanging-data"><a class="header" href="#exchanging-data">Exchanging da
265271
fn new(data: D) -&gt; Self {
266272
Self { data, children: Vec::new() }
267273
}
268-
}</code></pre>
274+
}</code></pre></pre>
269275
<p>We get a new error. A not especially helpful error. It says that it cannot find an <code>exchange</code> method, or more specifically that one exists but it doesn't apply to our type at hand. This is because the data need to satisfy the <code>ExchangeData</code> trait but do not. It would be better if this were clearer in the error messages, I agree.</p>
270276
<p>The fix is to update the source like so:</p>
271-
<pre><code class="language-rust ignore">#[macro_use]
272-
extern crate serde_derive;
273-
extern crate serde;
277+
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
278+
</span><span class="boring">fn main() {
279+
</span>extern crate serde;
280+
281+
use serde::{Serialize, Deserialize};
274282

275283
#[derive(Clone, Debug, Serialize, Deserialize)]
276284
struct TreeNode&lt;D&gt; {
277285
data: D,
278286
children: Vec&lt;TreeNode&lt;D&gt;&gt;,
279-
}</code></pre>
280-
<p>and make sure to include the <code>serde_derive</code> and <code>serde</code> crates.</p>
287+
}
288+
<span class="boring">}</span></code></pre></pre>
289+
<p>and make sure to include <code>serde</code> crate with the <code>derive</code> feature on.</p>
281290
<pre><code class="language-ignore"> Echidnatron% cargo run --example types
282291
Finished dev [unoptimized + debuginfo] target(s) in 0.07s
283292
Running `target/debug/examples/types`

chapter_5/chapter_5_1.html

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,22 +177,27 @@ <h1 id="communication"><a class="header" href="#communication">Communication</a>
177177
<p>Communication in timely dataflow starts from the <code>timely_communication</code> crate. This crate includes not only communication, but is actually where we start up the various worker threads and establish their identities. As in timely dataflow, everything starts by providing a per-worker closure, but this time we are given only a channel allocator as an argument.</p>
178178
<p>Before continuing, I want to remind you that this is the <em>internals</em> section; you could write your code against this crate if you really want, but one of the nice features of timely dataflow is that you don't have to. You can use a nice higher level layer, as discussed previously in the document.</p>
179179
<p>That being said, let's take a look at the example from the <code>timely_communication</code> documentation, which is not brief but shouldn't be wildly surprising either.</p>
180-
<pre><code class="language-rust ignore">extern crate timely_communication;
180+
<pre><pre class="playground"><code class="language-rust no_run">extern crate timely_communication;
181+
182+
use std::ops::Deref;
183+
184+
use timely_communication::{Allocate, Message};
181185

182186
fn main() {
183187

184188
// extract the configuration from user-supplied arguments, initialize the computation.
185-
let config = timely_communication::Configuration::from_args(std::env::args()).unwrap();
189+
// configure for two threads, just one process.
190+
let config = timely_communication::Config::Process(2);
186191
let guards = timely_communication::initialize(config, |mut allocator| {
187192

188193
println!("worker {} of {} started", allocator.index(), allocator.peers());
189194

190195
// allocates a pair of senders list and one receiver.
191-
let (mut senders, mut receiver) = allocator.allocate();
196+
let (mut senders, mut receiver) = allocator.allocate(0);
192197

193198
// send typed data along each channel
194199
for i in 0 .. allocator.peers() {
195-
senders[i].send(format!("hello, {}", i));
200+
senders[i].send(Message::from_typed(format!("hello, {}", i)));
196201
senders[i].done();
197202
}
198203

@@ -201,7 +206,7 @@ <h1 id="communication"><a class="header" href="#communication">Communication</a>
201206
let mut received = 0;
202207
while received &lt; allocator.peers() {
203208
if let Some(message) = receiver.recv() {
204-
println!("worker {}: received: &lt;{}&gt;", allocator.index(), message);
209+
println!("worker {}: received: &lt;{}&gt;", allocator.index(), message.deref());
205210
received += 1;
206211
}
207212
}
@@ -216,7 +221,7 @@ <h1 id="communication"><a class="header" href="#communication">Communication</a>
216221
}
217222
}
218223
else { println!("error in computation"); }
219-
}</code></pre>
224+
}</code></pre></pre>
220225
<p>There are a few steps here, and we'll talk through the important parts in each of them.</p>
221226
<h2 id="configuration"><a class="header" href="#configuration">Configuration</a></h2>
222227
<p>There is only a limited amount of configuration you can currently do in a timely dataflow computation, and it all lives in the <code>initialize::Configuration</code> type. This type is a simple enumeration of three ways a timely computation could run:</p>

chapter_5/chapter_5_2.html

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,9 @@ <h1 id="progress-tracking"><a class="header" href="#progress-tracking">Progress
184184
<h2 id="dataflow-structure"><a class="header" href="#dataflow-structure">Dataflow Structure</a></h2>
185185
<p>A dataflow graph hosts some number of operators. For progress tracking, these operators are simply identified by their index. Each operator has some number of <em>input ports</em>, and some number of <em>output ports</em>. The dataflow operators are connected by connecting each input port to a single output port (typically of another operator). Each output port may be connected to multiple distinct input ports (a message produced at an output port is to be delivered to all attached input ports).</p>
186186
<p>In timely dataflow progress tracking, we identify output ports by the type <code>Source</code> and input ports by the type <code>Target</code>, as from the progress coordinator's point of view, an operator's output port is a <em>source</em> of timestamped data, and an operator's input port is a <em>target</em> of timestamped data. Each source and target can be described by their operator index and then an operator-local index of the corresponding port. The use of distinct types helps us avoid mistaking input and output ports.</p>
187-
<pre><code class="language-rust ignore">pub struct Source {
187+
<pre><pre class="playground"><code class="language-rust"><span class="boring">#![allow(unused)]
188+
</span><span class="boring">fn main() {
189+
</span>pub struct Source {
188190
/// Index of the source operator.
189191
pub index: usize,
190192
/// Number of the output port from the operator.
@@ -196,7 +198,8 @@ <h2 id="dataflow-structure"><a class="header" href="#dataflow-structure">Dataflo
196198
pub index: usize,
197199
/// Number of the input port to the operator.
198200
pub port: usize,
199-
}</code></pre>
201+
}
202+
<span class="boring">}</span></code></pre></pre>
200203
<p>The structure of the dataflow graph can be described by a list of all of the connections in the graph, a <code>Vec&lt;(Source, Target)&gt;</code>. From this, we could infer the number of operators and their numbers of input and output ports, as well as enumerate all of the connections themselves.</p>
201204
<p>At this point we have the structure of a dataflow graph. We can draw a circle for each operator, a stub for each input and output port, and edges connecting the output ports to their destination input ports. Importantly, we have names for every location in the dataflow graph, which will either be a <code>Source</code> or a <code>Target</code>.</p>
202205
<h2 id="maintaining-capabilities"><a class="header" href="#maintaining-capabilities">Maintaining Capabilities</a></h2>

0 commit comments

Comments
 (0)