Skip to content

Commit 753d9b4

Browse files
committed
Deploying to gh-pages from @ 5323fd6 🚀
1 parent d46b884 commit 753d9b4

File tree

6 files changed

+20
-34
lines changed

6 files changed

+20
-34
lines changed

chapter_2/chapter_2_4.html

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,10 @@ <h1 id="creating-operators"><a class="header" href="#creating-operators">Creatin
189189
.to_stream(scope)
190190
.unary(Pipeline, "increment", |capability, info| {
191191

192-
let mut vector = Vec::new();
193192
move |input, output| {
194193
while let Some((time, data)) = input.next() {
195-
data.swap(&amp;mut vector);
196194
let mut session = output.session(&amp;time);
197-
for datum in vector.drain(..) {
195+
for datum in data.drain(..) {
198196
session.give(datum + 1);
199197
}
200198
}
@@ -277,13 +275,11 @@ <h3 id="stateful-operators"><a class="header" href="#stateful-operators">Statefu
277275
.unary(Pipeline, "increment", |capability, info| {
278276

279277
let mut maximum = 0; // define this here; use in the closure
280-
let mut vector = Vec::new();
281278

282279
move |input, output| {
283280
while let Some((time, data)) = input.next() {
284-
data.swap(&amp;mut vector);
285281
let mut session = output.session(&amp;time);
286-
for datum in vector.drain(..) {
282+
for datum in data.drain(..) {
287283
if datum &gt; maximum {
288284
session.give(datum + 1);
289285
maximum = datum;
@@ -325,13 +321,13 @@ <h3 id="frontiered-operators"><a class="header" href="#frontiered-operators">Fro
325321
while let Some((time, data)) = input1.next() {
326322
stash.entry(time.time().clone())
327323
.or_insert(Vec::new())
328-
.push(data.replace(Vec::new()));
324+
.push(std::mem::take(data));
329325
notificator.notify_at(time.retain());
330326
}
331327
while let Some((time, data)) = input2.next() {
332328
stash.entry(time.time().clone())
333329
.or_insert(Vec::new())
334-
.push(data.replace(Vec::new()));
330+
.push(std::mem::take(data));
335331
notificator.notify_at(time.retain());
336332
}
337333

@@ -371,12 +367,12 @@ <h3 id="frontiered-operators"><a class="header" href="#frontiered-operators">Fro
371367
while let Some((time, data)) = input1.next() {
372368
stash.entry(time.retain())
373369
.or_insert(Vec::new())
374-
.push(data.replace(Vec::new()));
370+
.push(std::mem::take(data));
375371
}
376372
while let Some((time, data)) = input2.next() {
377373
stash.entry(time.retain())
378374
.or_insert(Vec::new())
379-
.push(data.replace(Vec::new()));
375+
.push(std::mem::take(data));
380376
}
381377

382378
// consider sending everything in `stash`.

chapter_2/chapter_2_5.html

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ <h2 id="maintaining-word-counts"><a class="header" href="#maintaining-word-count
343343
while let Some((time, data)) = input.next() {
344344
queues.entry(time.retain())
345345
.or_insert(Vec::new())
346-
.extend(data.replace(Vec::new()));
346+
.extend(std::mem::take(data));
347347
}
348348

349349
// enable each stashed time if ready.
@@ -412,7 +412,7 @@ <h2 id="maintaining-word-counts"><a class="header" href="#maintaining-word-count
412412
while let Some((time, data)) = input.next() {
413413
queues.entry(time.retain())
414414
.or_insert(Vec::new())
415-
.extend(data.replace(Vec::new()));
415+
.extend(std::mem::take(data));
416416
}</code></pre>
417417
<p>The <code>input</code> handle has a <code>next</code> method, and it optionally returns a pair of <code>time</code> and <code>data</code>, representing a timely dataflow timestamp and a hunk of data bearing that timestamp, respectively. Our plan is to iterate through all available input (the <code>next()</code> method doesn't block, it just returns <code>None</code> when it runs out of data), accepting it from the timely dataflow system and moving it into our <code>queue</code> hash map.</p>
418418
<p>Why do we do this? Because this is a streaming system, we could be getting data out of order. Our goal is to update the counts in time order, and to do this we'll need to enqueue what we get until we also get word that the associated <code>time</code> is complete. That happens in the next few hunks of code</p>

chapter_4/chapter_4_3.html

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,16 +234,13 @@ <h1 id="flow-control"><a class="header" href="#flow-control">Flow Control</a></h
234234
// Buffer records until all prior timestamps have completed.
235235
.binary_frontier(&amp;cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
236236

237-
let mut vector = Vec::new();
238-
239237
move |input1, input2, output| {
240238

241239
// Stash received data.
242240
input1.for_each(|time, data| {
243-
data.swap(&amp;mut vector);
244241
stash.entry(time.retain())
245242
.or_insert(Vec::new())
246-
.extend(vector.drain(..));
243+
.extend(data.drain(..));
247244
});
248245

249246
// Consider sending stashed data.

print.html

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -827,12 +827,10 @@ <h2 id="other-operators"><a class="header" href="#other-operators">Other operato
827827
.to_stream(scope)
828828
.unary(Pipeline, "increment", |capability, info| {
829829

830-
let mut vector = Vec::new();
831830
move |input, output| {
832831
while let Some((time, data)) = input.next() {
833-
data.swap(&amp;mut vector);
834832
let mut session = output.session(&amp;time);
835-
for datum in vector.drain(..) {
833+
for datum in data.drain(..) {
836834
session.give(datum + 1);
837835
}
838836
}
@@ -915,13 +913,11 @@ <h3 id="stateful-operators"><a class="header" href="#stateful-operators">Statefu
915913
.unary(Pipeline, "increment", |capability, info| {
916914

917915
let mut maximum = 0; // define this here; use in the closure
918-
let mut vector = Vec::new();
919916

920917
move |input, output| {
921918
while let Some((time, data)) = input.next() {
922-
data.swap(&amp;mut vector);
923919
let mut session = output.session(&amp;time);
924-
for datum in vector.drain(..) {
920+
for datum in data.drain(..) {
925921
if datum &gt; maximum {
926922
session.give(datum + 1);
927923
maximum = datum;
@@ -963,13 +959,13 @@ <h3 id="frontiered-operators"><a class="header" href="#frontiered-operators">Fro
963959
while let Some((time, data)) = input1.next() {
964960
stash.entry(time.time().clone())
965961
.or_insert(Vec::new())
966-
.push(data.replace(Vec::new()));
962+
.push(std::mem::take(data));
967963
notificator.notify_at(time.retain());
968964
}
969965
while let Some((time, data)) = input2.next() {
970966
stash.entry(time.time().clone())
971967
.or_insert(Vec::new())
972-
.push(data.replace(Vec::new()));
968+
.push(std::mem::take(data));
973969
notificator.notify_at(time.retain());
974970
}
975971

@@ -1009,12 +1005,12 @@ <h3 id="frontiered-operators"><a class="header" href="#frontiered-operators">Fro
10091005
while let Some((time, data)) = input1.next() {
10101006
stash.entry(time.retain())
10111007
.or_insert(Vec::new())
1012-
.push(data.replace(Vec::new()));
1008+
.push(std::mem::take(data));
10131009
}
10141010
while let Some((time, data)) = input2.next() {
10151011
stash.entry(time.retain())
10161012
.or_insert(Vec::new())
1017-
.push(data.replace(Vec::new()));
1013+
.push(std::mem::take(data));
10181014
}
10191015

10201016
// consider sending everything in `stash`.
@@ -1206,7 +1202,7 @@ <h2 id="maintaining-word-counts"><a class="header" href="#maintaining-word-count
12061202
while let Some((time, data)) = input.next() {
12071203
queues.entry(time.retain())
12081204
.or_insert(Vec::new())
1209-
.extend(data.replace(Vec::new()));
1205+
.extend(std::mem::take(data));
12101206
}
12111207

12121208
// enable each stashed time if ready.
@@ -1275,7 +1271,7 @@ <h2 id="maintaining-word-counts"><a class="header" href="#maintaining-word-count
12751271
while let Some((time, data)) = input.next() {
12761272
queues.entry(time.retain())
12771273
.or_insert(Vec::new())
1278-
.extend(data.replace(Vec::new()));
1274+
.extend(std::mem::take(data));
12791275
}</code></pre>
12801276
<p>The <code>input</code> handle has a <code>next</code> method, and it optionally returns a pair of <code>time</code> and <code>data</code>, representing a timely dataflow timestamp and a hunk of data bearing that timestamp, respectively. Our plan is to iterate through all available input (the <code>next()</code> method doesn't block, it just returns <code>None</code> when it runs out of data), accepting it from the timely dataflow system and moving it into our <code>queue</code> hash map.</p>
12811277
<p>Why do we do this? Because this is a streaming system, we could be getting data out of order. Our goal is to update the counts in time order, and to do this we'll need to enqueue what we get until we also get word that the associated <code>time</code> is complete. That happens in the next few hunks of code</p>
@@ -1642,16 +1638,13 @@ <h2 id="scopes-1"><a class="header" href="#scopes-1">Scopes</a></h2>
16421638
// Buffer records until all prior timestamps have completed.
16431639
.binary_frontier(&amp;cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
16441640

1645-
let mut vector = Vec::new();
1646-
16471641
move |input1, input2, output| {
16481642

16491643
// Stash received data.
16501644
input1.for_each(|time, data| {
1651-
data.swap(&amp;mut vector);
16521645
stash.entry(time.retain())
16531646
.or_insert(Vec::new())
1654-
.extend(vector.drain(..));
1647+
.extend(data.drain(..));
16551648
});
16561649

16571650
// Consider sending stashed data.

searchindex.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

searchindex.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)