@@ -831,12 +831,12 @@ <h2 id="other-operators"><a class="header" href="#other-operators">Other operato
831831 .unary(Pipeline, "increment", |capability, info| {
832832
833833 move |input, output| {
834- while let Some(( time, data)) = input.next() {
834+ input.for_each_time(| time, data| {
835835 let mut session = output.session(&time);
836- for datum in data.drain(..) {
836+ for datum in data.flat_map(|d| d. drain(..) ) {
837837 session.give(datum + 1);
838838 }
839- }
839+ });
840840 }
841841 })
842842 .container::<Vec<_>>();
@@ -918,15 +918,15 @@ <h3 id="stateful-operators"><a class="header" href="#stateful-operators">Statefu
918918 let mut maximum = 0; // define this here; use in the closure
919919
920920 move |input, output| {
921- while let Some(( time, data)) = input.next() {
921+ input.for_each_time(| time, data| {
922922 let mut session = output.session(&time);
923- for datum in data.drain(..) {
923+ for datum in data.flat_map(|d| d. drain(..) ) {
924924 if datum > maximum {
925925 session.give(datum + 1);
926926 maximum = datum;
927927 }
928928 }
929- }
929+ });
930930 }
931931 })
932932 .container::<Vec<_>>();
@@ -958,21 +958,21 @@ <h3 id="frontiered-operators"><a class="header" href="#frontiered-operators">Fro
958958 let mut notificator = FrontierNotificator::default();
959959 let mut stash = HashMap::new();
960960
961- move |input1, input2, output| {
962- while let Some(( time, data)) = input1.next() {
961+ move |( input1, frontier1), ( input2, frontier2) , output| {
962+ input1.for_each_time(| time, data| {
963963 stash.entry(time.time().clone())
964964 .or_insert(Vec::new())
965- .push( std::mem::take(data ));
965+ .extend(data.map( std::mem::take));
966966 notificator.notify_at(time.retain());
967- }
968- while let Some(( time, data)) = input2.next() {
967+ });
968+ input2.for_each_time(| time, data| {
969969 stash.entry(time.time().clone())
970970 .or_insert(Vec::new())
971- .push( std::mem::take(data ));
971+ .extend(data.map( std::mem::take));
972972 notificator.notify_at(time.retain());
973- }
973+ });
974974
975- notificator.for_each(&[input1.frontier(), input2.frontier() ], |time, notificator| {
975+ notificator.for_each(&[frontier1, frontier2 ], |time, notificator| {
976976 let mut session = output.session(&time);
977977 if let Some(list) = stash.remove(time.time()) {
978978 for mut vector in list.into_iter() {
@@ -1003,21 +1003,21 @@ <h3 id="frontiered-operators"><a class="header" href="#frontiered-operators">Fro
10031003
10041004 let mut stash = HashMap::new();
10051005
1006- move |input1, input2, output| {
1006+ move |( input1, frontier1), ( input2, frontier2) , output| {
10071007
1008- while let Some(( time, data)) = input1.next() {
1008+ input1.for_each_time(| time, data| {
10091009 stash.entry(time.retain())
10101010 .or_insert(Vec::new())
1011- .push( std::mem::take(data ));
1012- }
1013- while let Some(( time, data)) = input2.next() {
1011+ .extend(data.map( std::mem::take));
1012+ });
1013+ input2.for_each_time(| time, data| {
10141014 stash.entry(time.retain())
10151015 .or_insert(Vec::new())
1016- .push( std::mem::take(data ));
1017- }
1016+ .extend(data.map( std::mem::take));
1017+ });
10181018
10191019 // consider sending everything in `stash`.
1020- let frontiers = &[input1.frontier(), input2.frontier() ];
1020+ let frontiers = &[frontier1, frontier2 ];
10211021 for (time, list) in stash.iter_mut() {
10221022 // if neither input can produce data at `time`, ship `list`.
10231023 if frontiers.iter().all(|f| !f.less_equal(time.time())) {
@@ -1199,18 +1199,18 @@ <h2 id="maintaining-word-counts"><a class="header" href="#maintaining-word-count
11991199 let mut counts = HashMap::new();
12001200 let mut buffer = Vec::new();
12011201
1202- move |input, output| {
1202+ move |( input, frontier) , output| {
12031203
12041204 // for each input batch, stash it at `time`.
1205- while let Some(( time, data)) = input.next() {
1205+ input.for_each_time(| time, data| {
12061206 queues.entry(time.retain())
12071207 .or_insert(Vec::new())
1208- .extend(std::mem::take( data));
1209- }
1208+ .extend(data.flat_map(|d| d.drain(..) ));
1209+ });
12101210
12111211 // enable each stashed time if ready.
12121212 for (time, vals) in queues.iter_mut() {
1213- if !input. frontier() .less_equal(time.time()) {
1213+ if !frontier.less_equal(time.time()) {
12141214 let vals = std::mem::replace(vals, Vec::new());
12151215 buffer.push((time.clone(), vals));
12161216 }
@@ -1641,20 +1641,20 @@ <h2 id="scopes-1"><a class="header" href="#scopes-1">Scopes</a></h2>
16411641 // Buffer records until all prior timestamps have completed.
16421642 .binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
16431643
1644- move |input1, input2, output| {
1644+ move |( input1, frontier1), ( input2, frontier2) , output| {
16451645
16461646 // Stash received data.
1647- input1.for_each (|time, data| {
1647+ input1.for_each_time (|time, data| {
16481648 stash.entry(time.retain())
16491649 .or_insert(Vec::new())
1650- .extend(data.drain(..));
1650+ .extend(data.flat_map(|d| d. drain(..) ));
16511651 });
16521652
16531653 // Consider sending stashed data.
16541654 for (time, data) in stash.iter_mut() {
16551655 // Only send data once the probe is not less than the time.
16561656 // That is, once we have finished all strictly prior work.
1657- if !input2.frontier() .less_than(time.time()) {
1657+ if !frontier2 .less_than(time.time()) {
16581658 output.session(&time).give_iterator(data.drain(..));
16591659 }
16601660 }
0 commit comments