diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index dbbf7fbc1..6c5bf6eb9 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -20,12 +20,12 @@ fn main() { .unary(Pipeline, "increment", |capability, info| { move |input, output| { - while let Some((time, data)) = input.next() { + input.for_each_time(|time, data| { let mut session = output.session(&time); - for datum in data.drain(..) { + for datum in data.flat_map(|d| d.drain(..)) { session.give(datum + 1); } - } + }); } }) .container::>(); @@ -136,15 +136,15 @@ fn main() { let mut maximum = 0; // define this here; use in the closure move |input, output| { - while let Some((time, data)) = input.next() { + input.for_each_time(|time, data| { let mut session = output.session(&time); - for datum in data.drain(..) { + for datum in data.flat_map(|d| d.drain(..)) { if datum > maximum { session.give(datum + 1); maximum = datum; } } - } + }); } }) .container::>(); @@ -187,21 +187,21 @@ fn main() { let mut notificator = FrontierNotificator::default(); let mut stash = HashMap::new(); - move |input1, input2, output| { - while let Some((time, data)) = input1.next() { + move |(input1, frontier1), (input2, frontier2), output| { + input1.for_each_time(|time, data| { stash.entry(time.time().clone()) .or_insert(Vec::new()) - .push(std::mem::take(data)); + .extend(data.map(std::mem::take)); notificator.notify_at(time.retain()); - } - while let Some((time, data)) = input2.next() { + }); + input2.for_each_time(|time, data| { stash.entry(time.time().clone()) .or_insert(Vec::new()) - .push(std::mem::take(data)); + .extend(data.map(std::mem::take)); notificator.notify_at(time.retain()); - } + }); - notificator.for_each(&[input1.frontier(), input2.frontier()], |time, notificator| { + notificator.for_each(&[frontier1, frontier2], |time, notificator| { let mut session = output.session(&time); if let Some(list) = stash.remove(time.time()) { for mut vector in list.into_iter() { @@ -237,21 +237,21 @@ fn main() { let mut stash = HashMap::new(); - move |input1, input2, output| { + move |(input1, frontier1), (input2, frontier2), output| { - while let Some((time, data)) = input1.next() { + input1.for_each_time(|time, data| { stash.entry(time.retain()) .or_insert(Vec::new()) - .push(std::mem::take(data)); - } - while let Some((time, data)) = input2.next() { + .extend(data.map(std::mem::take)); + }); + input2.for_each_time(|time, data| { stash.entry(time.retain()) .or_insert(Vec::new()) - .push(std::mem::take(data)); - } + .extend(data.map(std::mem::take)); + }); // consider sending everything in `stash`. - let frontiers = &[input1.frontier(), input2.frontier()]; + let frontiers = &[frontier1, frontier2]; for (time, list) in stash.iter_mut() { // if neither input can produce data at `time`, ship `list`. if frontiers.iter().all(|f| !f.less_equal(time.time())) { diff --git a/mdbook/src/chapter_2/chapter_2_5.md b/mdbook/src/chapter_2/chapter_2_5.md index 4aa8857fb..529788951 100644 --- a/mdbook/src/chapter_2/chapter_2_5.md +++ b/mdbook/src/chapter_2/chapter_2_5.md @@ -206,18 +206,18 @@ As before, I'm just going to show you the new code, which now lives just after ` let mut counts = HashMap::new(); let mut buffer = Vec::new(); - move |input, output| { + move |(input, frontier), output| { // for each input batch, stash it at `time`. - while let Some((time, data)) = input.next() { + input.for_each_time(|time, data| { queues.entry(time.retain()) .or_insert(Vec::new()) - .extend(std::mem::take(data)); - } + .extend(data.flat_map(|d| d.drain(..))); + }); // enable each stashed time if ready. for (time, vals) in queues.iter_mut() { - if !input.frontier().less_equal(time.time()) { + if !frontier.less_equal(time.time()) { let vals = std::mem::replace(vals, Vec::new()); buffer.push((time.clone(), vals)); } diff --git a/mdbook/src/chapter_4/chapter_4_3.md b/mdbook/src/chapter_4/chapter_4_3.md index beeabd834..f50f5e527 100644 --- a/mdbook/src/chapter_4/chapter_4_3.md +++ b/mdbook/src/chapter_4/chapter_4_3.md @@ -78,20 +78,20 @@ fn main() { // Buffer records until all prior timestamps have completed. .binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| { - move |input1, input2, output| { + move |(input1, frontier1), (input2, frontier2), output| { // Stash received data. - input1.for_each(|time, data| { + input1.for_each_time(|time, data| { stash.entry(time.retain()) .or_insert(Vec::new()) - .extend(data.drain(..)); + .extend(data.flat_map(|d| d.drain(..))); }); // Consider sending stashed data. for (time, data) in stash.iter_mut() { // Only send data once the probe is not less than the time. // That is, once we have finished all strictly prior work. - if !input2.frontier().less_than(time.time()) { + if !frontier2.less_than(time.time()) { output.session(&time).give_iterator(data.drain(..)); } } diff --git a/timely/examples/bfs.rs b/timely/examples/bfs.rs index 45d97fb33..33376b285 100644 --- a/timely/examples/bfs.rs +++ b/timely/examples/bfs.rs @@ -53,19 +53,19 @@ fn main() { move |input1, input2, output, notify| { // receive edges, start to sort them - input1.for_each(|time, data| { + input1.for_each_time(|time, data| { notify.notify_at(time.retain()); - edge_list.push(std::mem::take(data)); + edge_list.extend(data.map(std::mem::take)); }); // receive (node, worker) pairs, note any new ones. - input2.for_each(|time, data| { + input2.for_each_time(|time, data| { node_lists.entry(*time.time()) .or_insert_with(|| { notify.notify_at(time.retain()); Vec::new() }) - .push(std::mem::take(data)); + .extend(data.map(std::mem::take)); }); notify.for_each(|time, _num, _notify| { diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 1348e4984..834f82cde 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -44,14 +44,16 @@ fn main() { "Split", |_cap, _info| { move |input, output| { - while let Some((time, data)) = input.next() { + input.for_each_time(|time, data| { let mut session = output.session(&time); - for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| { - wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff }) - }) { - session.give(wordcount); + for data in data { + for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| { + wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff }) + }) { + session.give(wordcount); + } } - } + }); } }, ) @@ -63,16 +65,17 @@ fn main() { let mut queues = HashMap::new(); let mut counts = HashMap::new(); - move |input, output| { - while let Some((time, data)) = input.next() { + move |(input, frontier), output| { + input.for_each_time(|time, data| { queues .entry(time.retain()) .or_insert(Vec::new()) - .push(std::mem::take(data)); - } + .extend(data.map(std::mem::take)); + + }); for (key, val) in queues.iter_mut() { - if !input.frontier().less_equal(key.time()) { + if !frontier.less_equal(key.time()) { let mut session = output.session(key); for batch in val.drain(..) { for wordcount in batch.borrow().into_index_iter() { diff --git a/timely/examples/distinct.rs b/timely/examples/distinct.rs index b3e58f5c0..70e5cfd89 100644 --- a/timely/examples/distinct.rs +++ b/timely/examples/distinct.rs @@ -18,18 +18,20 @@ fn main() { scope.input_from(&mut input) .unary(Exchange::new(|x| *x), "Distinct", move |_, _| move |input, output| { - input.for_each(|time, data| { + input.for_each_time(|time, data| { let counts = counts_by_time .entry(*time.time()) .or_insert(HashMap::new()); let mut session = output.session(&time); - for &datum in data.iter() { - let count = counts.entry(datum).or_insert(0); - if *count == 0 { - session.give(datum); + for data in data { + for &datum in data.iter() { + let count = counts.entry(datum).or_insert(0); + if *count == 0 { + session.give(datum); + } + *count += 1; } - *count += 1; } }) }) diff --git a/timely/examples/hashjoin.rs b/timely/examples/hashjoin.rs index 7ff612d2d..4cb9588c2 100644 --- a/timely/examples/hashjoin.rs +++ b/timely/examples/hashjoin.rs @@ -40,9 +40,9 @@ fn main() { move |input1, input2, output| { // Drain first input, check second map, update first map. - input1.for_each(|time, data| { + input1.for_each_time(|time, data| { let mut session = output.session(&time); - for (key, val1) in data.drain(..) { + for (key, val1) in data.flat_map(|d| d.drain(..)) { if let Some(values) = map2.get(&key) { for val2 in values.iter() { session.give((val1, *val2)); @@ -54,9 +54,9 @@ fn main() { }); // Drain second input, check first map, update second map. - input2.for_each(|time, data| { + input2.for_each_time(|time, data| { let mut session = output.session(&time); - for (key, val2) in data.drain(..) { + for (key, val2) in data.flat_map(|d| d.drain(..)) { if let Some(values) = map1.get(&key) { for val1 in values.iter() { session.give((*val1, val2)); diff --git a/timely/examples/pagerank.rs b/timely/examples/pagerank.rs index 4716182c8..17ff79fec 100644 --- a/timely/examples/pagerank.rs +++ b/timely/examples/pagerank.rs @@ -41,19 +41,21 @@ fn main() { let timer = ::std::time::Instant::now(); - move |input1, input2, output| { + move |(input1, frontier1), (input2, frontier2), output| { // hold on to edge changes until it is time. - input1.for_each(|time, data| { - edge_stash.entry(time.retain()).or_default().append(data); + input1.for_each_time(|time, data| { + let entry = edge_stash.entry(time.retain()).or_default(); + data.for_each(|data| entry.append(data)); }); // hold on to rank changes until it is time. - input2.for_each(|time, data| { - rank_stash.entry(time.retain()).or_default().append(data); + input2.for_each_time(|time, data| { + let entry = rank_stash.entry(time.retain()).or_default(); + data.for_each(|data| entry.append(data)); }); - let frontiers = &[input1.frontier(), input2.frontier()]; + let frontiers = &[frontier1, frontier2]; for (time, edge_changes) in edge_stash.iter_mut() { if frontiers.iter().all(|f| !f.less_equal(time)) { diff --git a/timely/examples/unionfind.rs b/timely/examples/unionfind.rs index 23068466c..beaf3e962 100644 --- a/timely/examples/unionfind.rs +++ b/timely/examples/unionfind.rs @@ -61,10 +61,9 @@ impl UnionFind for Stream { move |input, output| { - while let Some((time, data)) = input.next() { - + input.for_each_time(|time, data| { let mut session = output.session(&time); - for &(mut x, mut y) in data.iter() { + for &mut (mut x, mut y) in data.flatten() { // grow arrays if required. let m = ::std::cmp::max(x, y); @@ -86,7 +85,7 @@ impl UnionFind for Stream { } } } - } + }); } }) } diff --git a/timely/examples/wordcount.rs b/timely/examples/wordcount.rs index de8d3c89b..af70744a4 100644 --- a/timely/examples/wordcount.rs +++ b/timely/examples/wordcount.rs @@ -29,15 +29,15 @@ fn main() { let mut queues = HashMap::new(); let mut counts = HashMap::new(); - move |input, output| { - while let Some((time, data)) = input.next() { + move |(input, frontier), output| { + input.for_each_time(|time, data| { queues.entry(time.retain()) .or_insert(Vec::new()) - .push(std::mem::take(data)); - } + .extend(data.map(std::mem::take)); + }); for (key, val) in queues.iter_mut() { - if !input.frontier().less_equal(key.time()) { + if !frontier.less_equal(key.time()) { let mut session = output.session(key); for mut batch in val.drain(..) { for (word, diff) in batch.drain(..) { diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index c99d76060..d9e69755f 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -115,7 +115,7 @@ impl>> Buffer(&mut self, item: D) where CB: PushInto { @@ -138,11 +138,14 @@ where T: Eq + Clone + 'a, P: Push> + 'a, { - /// Provide a container at the time specified by the [Session]. Maintains FIFO order with - /// previously pushed data. + /// Provide a container at the time specified by the [Session]. pub fn give_container(&mut self, container: &mut CB::Container) { self.buffer.give_container(container) } + /// Provide multiple containers at the time specifid by the [Session]. + pub fn give_containers<'b>(&mut self, containers: impl Iterator) { + for container in containers { self.buffer.give_container(container); } + } } impl<'a, T, CB, P> Session<'a, T, CB, P> diff --git a/timely/src/dataflow/operators/aggregation/aggregate.rs b/timely/src/dataflow/operators/aggregation/aggregate.rs index d8262010a..f22a73110 100644 --- a/timely/src/dataflow/operators/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/aggregation/aggregate.rs @@ -79,9 +79,9 @@ impl, K: ExchangeData+Hash+Eq, V: Exchang self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| { // read each input, fold into aggregates - input.for_each(|time, data| { + input.for_each_time(|time, data| { let agg_time = aggregates.entry(time.time().clone()).or_insert_with(HashMap::new); - for (key, val) in data.drain(..) { + for (key, val) in data.flat_map(|d| d.drain(..)) { let agg = agg_time.entry(key.clone()).or_insert_with(Default::default); fold(&key, val, agg); } diff --git a/timely/src/dataflow/operators/aggregation/state_machine.rs b/timely/src/dataflow/operators/aggregation/state_machine.rs index 7347f8c12..bad423827 100644 --- a/timely/src/dataflow/operators/aggregation/state_machine.rs +++ b/timely/src/dataflow/operators/aggregation/state_machine.rs @@ -84,17 +84,17 @@ impl StateMachine f }); // stash each input and request a notification when ready - input.for_each(|time, data| { + input.for_each_time(|time, data| { // stash if not time yet if notificator.frontier(0).less_than(time.time()) { - pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data); + for data in data { pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data); } notificator.notify_at(time.retain()); } else { // else we can process immediately let mut session = output.session(&time); - for (key, val) in data.drain(..) { + for (key, val) in data.flat_map(|d| d.drain(..)) { let (remove, output) = { let state = states.entry(key.clone()).or_insert_with(Default::default); fold(&key, val, state) diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index dad88be2b..42752ad93 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -51,10 +51,10 @@ impl Branch for Stream { let mut output1_handle = output1.activate(); let mut output2_handle = output2.activate(); - input.for_each(|time, data| { + input.activate().for_each_time(|time, data| { let mut out1 = output1_handle.session(&time); let mut out2 = output2_handle.session(&time); - for datum in data.drain(..) { + for datum in data.flat_map(|d| d.drain(..)) { if condition(time.time(), &datum) { out2.give(datum); } else { @@ -108,13 +108,13 @@ impl BranchWhen for StreamCore { let mut output1_handle = output1.activate(); let mut output2_handle = output2.activate(); - input.for_each(|time, data| { + input.activate().for_each_time(|time, data| { let mut out = if condition(time.time()) { output2_handle.session(&time) } else { output1_handle.session(&time) }; - out.give_container(data); + out.give_containers(data); }); } }); diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 17cd1f86c..8a1deaa57 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -426,10 +426,9 @@ impl CapabilitySet { /// vec![()].into_iter().to_stream(scope) /// .unary_frontier(Pipeline, "example", |default_cap, _info| { /// let mut cap = CapabilitySet::from_elem(default_cap); - /// move |input, output| { - /// cap.downgrade(&input.frontier().frontier()); - /// while let Some((time, data)) = input.next() { - /// } + /// move |(input, frontier), output| { + /// cap.downgrade(&frontier.frontier()); + /// input.for_each_time(|time, data| {}); /// let a_cap = cap.first(); /// if let Some(a_cap) = a_cap.as_ref() { /// output.session(a_cap).give(()); diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index f377d3626..431353941 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -84,8 +84,8 @@ impl Concatenate for G { move |_frontier| { let mut output = output.activate(); for handle in handles.iter_mut() { - handle.for_each(|time, data| { - output.session(&time).give_container(data); + handle.for_each_time(|time, data| { + output.session(&time).give_containers(data); }) } } diff --git a/timely/src/dataflow/operators/core/exchange.rs b/timely/src/dataflow/operators/core/exchange.rs index 92c9d78f5..593e3144a 100644 --- a/timely/src/dataflow/operators/core/exchange.rs +++ b/timely/src/dataflow/operators/core/exchange.rs @@ -43,8 +43,8 @@ where { self.unary(ExchangeCore::new(route), "Exchange", |_, _| { move |input, output| { - input.for_each(|time, data| { - output.session(&time).give_container(data); + input.for_each_time(|time, data| { + output.session(&time).give_containers(data); }); } }) diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index 067ea3db8..529b65766 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -118,12 +118,12 @@ impl ConnectLoop for StreamCore { builder.build(move |_capability| move |_frontier| { let mut output = output.activate(); - input.for_each(|cap, data| { + input.activate().for_each_time(|cap, data| { if let Some(new_time) = summary.results_in(cap.time()) { let new_cap = cap.delayed(&new_time); output .session(&new_cap) - .give_container(data); + .give_containers(data); } }); }); diff --git a/timely/src/dataflow/operators/core/filter.rs b/timely/src/dataflow/operators/core/filter.rs index f82a76339..aecadd050 100644 --- a/timely/src/dataflow/operators/core/filter.rs +++ b/timely/src/dataflow/operators/core/filter.rs @@ -29,8 +29,9 @@ where { fn filter)->bool+'static>(&self, mut predicate: P) -> StreamCore { self.unary(Pipeline, "Filter", move |_,_| move |input, output| { - input.for_each(|time, data| { - output.session(&time).give_iterator(data.drain().filter(&mut predicate)); + input.for_each_time(|time, data| { + output.session(&time) + .give_iterator(data.flat_map(|d| d.drain()).filter(&mut predicate)); }); }) } diff --git a/timely/src/dataflow/operators/core/inspect.rs b/timely/src/dataflow/operators/core/inspect.rs index 0058d2fcf..bb4d882cc 100644 --- a/timely/src/dataflow/operators/core/inspect.rs +++ b/timely/src/dataflow/operators/core/inspect.rs @@ -133,15 +133,18 @@ impl InspectCore for StreamCore { { use crate::progress::timestamp::Timestamp; let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum()); - self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| { - if input.frontier.frontier() != frontier.borrow() { + self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |(input, chain), output| { + if chain.frontier() != frontier.borrow() { frontier.clear(); - frontier.extend(input.frontier.frontier().iter().cloned()); + frontier.extend(chain.frontier().iter().cloned()); func(Err(frontier.elements())); } - input.for_each(|time, data| { - func(Ok((&time, &*data))); - output.session(&time).give_container(data); + input.for_each_time(|time, data| { + let mut session = output.session(&time); + for data in data { + func(Ok((&time, &*data))); + session.give_container(data); + } }); }) } diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 221b86201..61d9fd359 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -97,8 +97,9 @@ impl Map for StreamCore { L: FnMut(C::Item<'_>)->I + 'static, { self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| { - input.for_each(|time, data| { - output.session(&time).give_iterator(data.drain().flat_map(&mut logic)); + input.for_each_time(|time, data| { + output.session(&time) + .give_iterator(data.flat_map(|d| d.drain()).flat_map(&mut logic)); }); }) } diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index 339b51576..6b50a42c5 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -61,10 +61,10 @@ impl OkErr for StreamCore { let mut output1_handle = output1.activate(); let mut output2_handle = output2.activate(); - input.for_each(|time, data| { + input.activate().for_each_time(|time, data| { let mut out1 = output1_handle.session(&time); let mut out2 = output2_handle.session(&time); - for datum in data.drain() { + for datum in data.flat_map(|d| d.drain()) { match logic(datum) { Ok(datum) => out1.give(datum), Err(datum) => out2.give(datum), diff --git a/timely/src/dataflow/operators/core/rc.rs b/timely/src/dataflow/operators/core/rc.rs index 4b996a3f9..f1bd9abb3 100644 --- a/timely/src/dataflow/operators/core/rc.rs +++ b/timely/src/dataflow/operators/core/rc.rs @@ -28,10 +28,11 @@ impl SharedStream for StreamCore { fn shared(&self) -> StreamCore> { self.unary(Pipeline, "Shared", move |_, _| { move |input, output| { - input.for_each(|time, data| { - output - .session(&time) - .give_container(&mut Rc::new(std::mem::take(data))); + input.for_each_time(|time, data| { + let mut session = output.session(&time); + for data in data { + session.give_container(&mut Rc::new(std::mem::take(data))); + } }); } }) @@ -54,15 +55,15 @@ mod test { .concatenate([ shared.unary(Pipeline, "read shared 1", |_, _| { move |input, output| { - input.for_each(|time, data| { - output.session(&time).give(data.as_ptr() as usize); + input.for_each_time(|time, data| { + output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize)); }); } }), shared.unary(Pipeline, "read shared 2", |_, _| { move |input, output| { - input.for_each(|time, data| { - output.session(&time).give(data.as_ptr() as usize); + input.for_each_time(|time, data| { + output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize)); }); } }), diff --git a/timely/src/dataflow/operators/core/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs index 86ec5ae69..3e17a09e0 100644 --- a/timely/src/dataflow/operators/core/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -56,12 +56,14 @@ impl Reclock for StreamCore { self.binary_notify(clock, Pipeline, Pipeline, "Reclock", vec![], move |input1, input2, output, notificator| { // stash each data input with its timestamp. - input1.for_each(|cap, data| { - stash.push((cap.time().clone(), std::mem::take(data))); + input1.for_each_time(|cap, data| { + for data in data { + stash.push((cap.time().clone(), std::mem::take(data))); + } }); // request notification at time, to flush stash. - input2.for_each(|time, _data| { + input2.for_each_time(|time, _data| { notificator.notify_at(time.retain()); }); diff --git a/timely/src/dataflow/operators/count.rs b/timely/src/dataflow/operators/count.rs index 264c72cd6..e99f1000c 100644 --- a/timely/src/dataflow/operators/count.rs +++ b/timely/src/dataflow/operators/count.rs @@ -53,8 +53,10 @@ impl, D: Data> Accumulate for Strea let mut accums = HashMap::new(); self.unary_notify(Pipeline, "Accumulate", vec![], move |input, output, notificator| { - input.for_each(|time, data| { - logic(accums.entry(time.time().clone()).or_insert_with(|| default.clone()), data); + input.for_each_time(|time, data| { + for data in data { + logic(accums.entry(time.time().clone()).or_insert_with(|| default.clone()), data); + } notificator.notify_at(time.retain()); }); diff --git a/timely/src/dataflow/operators/delay.rs b/timely/src/dataflow/operators/delay.rs index 02febfa27..df41d9680 100644 --- a/timely/src/dataflow/operators/delay.rs +++ b/timely/src/dataflow/operators/delay.rs @@ -29,8 +29,8 @@ pub trait Delay { /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .delay(|data, time| *data) - /// .sink(Pipeline, "example", |input| { - /// input.for_each(|time, data| { + /// .sink(Pipeline, "example", |(input, frontier)| { + /// input.for_each_time(|time, data| { /// println!("data at time: {:?}", time); /// }); /// }); @@ -56,8 +56,8 @@ pub trait Delay { /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .delay(|data, time| *data) - /// .sink(Pipeline, "example", |input| { - /// input.for_each(|time, data| { + /// .sink(Pipeline, "example", |(input, frontier)| { + /// input.for_each_time(|time, data| { /// println!("data at time: {:?}", time); /// }); /// }); @@ -84,8 +84,8 @@ pub trait Delay { /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .delay_batch(|time| time + 1) - /// .sink(Pipeline, "example", |input| { - /// input.for_each(|time, data| { + /// .sink(Pipeline, "example", |(input, frontier)| { + /// input.for_each_time(|time, data| { /// println!("data at time: {:?}", time); /// }); /// }); @@ -98,8 +98,8 @@ impl, D: Data> Delay for StreamG::Timestamp+'static>(&self, mut func: L) -> Self { let mut elements = HashMap::new(); self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { - input.for_each(|time, data| { - for datum in data.drain(..) { + input.for_each_time(|time, data| { + for datum in data.flat_map(|d| d.drain(..)) { let new_time = func(&datum, &time); assert!(time.time().less_equal(&new_time)); elements.entry(new_time.clone()) @@ -126,12 +126,12 @@ impl, D: Data> Delay for StreamG::Timestamp+'static>(&self, mut func: L) -> Self { let mut elements = HashMap::new(); self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { - input.for_each(|time, data| { + input.for_each_time(|time, data| { let new_time = func(&time); assert!(time.time().less_equal(&new_time)); elements.entry(new_time.clone()) .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() }) - .push(std::mem::take(data)); + .extend(data.map(std::mem::take)); }); // for each available notification, send corresponding set diff --git a/timely/src/dataflow/operators/filter.rs b/timely/src/dataflow/operators/filter.rs index c2486e9eb..03e05522d 100644 --- a/timely/src/dataflow/operators/filter.rs +++ b/timely/src/dataflow/operators/filter.rs @@ -25,10 +25,11 @@ pub trait Filter { impl Filter for Stream { fn filterbool+'static>(&self, mut predicate: P) -> Stream { self.unary(Pipeline, "Filter", move |_,_| move |input, output| { - input.for_each(|time, data| { - data.retain(|x| predicate(x)); - if !data.is_empty() { - output.session(&time).give_container(data); + input.for_each_time(|time, data| { + let mut session = output.session(&time); + for data in data { + data.retain(&mut predicate); + session.give_container(data); } }); }) diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 88c43965a..ba7330dcb 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -5,10 +5,10 @@ use std::rc::Rc; use std::cell::RefCell; +use std::collections::VecDeque; use crate::progress::Timestamp; use crate::progress::ChangeBatch; -use crate::progress::frontier::MutableAntichain; use crate::progress::operate::PortConnectivity; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::channels::pushers::Counter as PushCounter; @@ -21,6 +21,25 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder}; use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; +#[must_use] +pub struct InputSession<'a, T: Timestamp, C, P: Pull>> { + input: &'a mut InputHandleCore, +} + +impl<'a, T: Timestamp, C: Accountable, P: Pull>> InputSession<'a, T, C, P> { + /// Iterates through distinct capabilities and the lists of containers associated with each. + pub fn for_each_time(self, logic: F) where F: FnMut(InputCapability, std::slice::IterMut::), C: Default { + self.input.for_each_time(logic) + } + /// Iterates through pairs of capability and container. + /// + /// The `for_each_time` method is equivalent, but groups containers by capability and is preferred, + /// in that it often leads to grouping work by capability, including the creation of output sessions. + pub fn for_each(self, logic: F) where F: FnMut(InputCapability, &mut C) { + self.input.for_each(logic) + } +} + /// Handle to an operator's input stream. pub struct InputHandleCore>> { pull_counter: PullCounter, @@ -30,24 +49,16 @@ pub struct InputHandleCore>> { /// Each timestamp received through this input may only produce output timestamps /// greater or equal to the input timestamp subjected to at least one of these summaries. summaries: Rc>>, + /// Staged capabilities and containers. + staging: VecDeque<(InputCapability, C)>, + staged: Vec, } -/// Handle to an operator's input stream, specialized to vectors. -pub type InputHandle = InputHandleCore, P>; - -/// Handle to an operator's input stream and frontier. -pub struct FrontieredInputHandleCore<'a, T: Timestamp, C: 'a, P: Pull>+'a> { - /// The underlying input handle. - pub handle: &'a mut InputHandleCore, - /// The frontier as reported by timely progress tracking. - pub frontier: &'a MutableAntichain, -} - -/// Handle to an operator's input stream and frontier, specialized to vectors. -pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec, P>; - impl>> InputHandleCore { + /// Activates an input handle with a session that reorders inputs and must be drained. + pub fn activate(&mut self) -> InputSession<'_, T, C, P> { InputSession { input: self } } + /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`. /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. /// Returns `None` when there's no more data available. @@ -59,79 +70,30 @@ impl>> InputHandleCore, &mut C)>(&mut self, mut logic: F) { + /// The `for_each_time` method is equivalent, but groups containers by capability and is preferred, + /// in that it often leads to grouping work by capability, including the creation of output sessions. + pub fn for_each(&mut self, mut logic: F) where F: FnMut(InputCapability, &mut C) { + while let Some((cap, data)) = self.next() { logic(cap, data); } + } + /// Iterates through distinct capabilities and the lists of containers associated with each. + pub fn for_each_time(&mut self, mut logic: F) where F: FnMut(InputCapability, std::slice::IterMut::), C: Default { while let Some((cap, data)) = self.next() { - logic(cap, data); + let data = std::mem::take(data); + self.staging.push_back((cap, data)); } - } - -} - -impl<'a, T: Timestamp, C: Accountable, P: Pull>+'a> FrontieredInputHandleCore<'a, T, C, P> { - /// Allocate a new frontiered input handle. - pub fn new(handle: &'a mut InputHandleCore, frontier: &'a MutableAntichain) -> Self { - FrontieredInputHandleCore { - handle, - frontier, + self.staging.make_contiguous().sort_by(|x,y| x.0.time().cmp(&y.0.time())); + + while let Some((cap, data)) = self.staging.pop_front() { + self.staged.push(data); + let more = self.staging.iter().take_while(|(c,_)| c.time() == cap.time()).count(); + self.staged.extend(self.staging.drain(..more).map(|(_,d)| d)); + logic(cap, self.staged.iter_mut()); + // Could return these back to the input .. + self.staged.clear(); } } - - /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`. - /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. - /// Returns `None` when there's no more data available. - #[inline] - pub fn next(&mut self) -> Option<(InputCapability, &mut C)> { - self.handle.next() - } - - /// Repeatedly calls `logic` till exhaustion of the available input data. - /// `logic` receives a capability and an input buffer. - /// - /// # Examples - /// ``` - /// use timely::dataflow::operators::ToStream; - /// use timely::dataflow::operators::generic::Operator; - /// use timely::dataflow::channels::pact::Pipeline; - /// - /// timely::example(|scope| { - /// (0..10).to_stream(scope) - /// .unary(Pipeline, "example", |_cap,_info| |input, output| { - /// input.for_each(|cap, data| { - /// output.session(&cap).give_container(data); - /// }); - /// }); - /// }); - /// ``` - #[inline] - pub fn for_each, &mut C)>(&mut self, logic: F) { - self.handle.for_each(logic) - } - - /// Inspect the frontier associated with this input. - #[inline] - pub fn frontier(&self) -> &'a MutableAntichain { - self.frontier - } } pub fn _access_pull_counter>>(input: &mut InputHandleCore) -> &mut PullCounter { @@ -149,6 +111,8 @@ pub fn new_input_handle>>( pull_counter, internal, summaries, + staging: Default::default(), + staged: Default::default(), } } @@ -212,10 +176,10 @@ impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .unary::, _, _, _>(Pipeline, "example", |_cap, _info| |input, output| { - /// input.for_each(|cap, data| { + /// input.for_each_time(|cap, data| { /// let time = cap.time().clone() + 1; /// output.session_with_builder(&cap.delayed(&time)) - /// .give_container(data); + /// .give_containers(data); /// }); /// }); /// }); @@ -246,10 +210,10 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .unary(Pipeline, "example", |_cap, _info| |input, output| { - /// input.for_each(|cap, data| { + /// input.for_each_time(|cap, data| { /// let time = cap.time().clone() + 1; /// output.session(&cap.delayed(&time)) - /// .give_container(data); + /// .give_containers(data); /// }); /// }); /// }); diff --git a/timely/src/dataflow/operators/generic/mod.rs b/timely/src/dataflow/operators/generic/mod.rs index e54f5e423..0a9bf72c1 100644 --- a/timely/src/dataflow/operators/generic/mod.rs +++ b/timely/src/dataflow/operators/generic/mod.rs @@ -8,7 +8,7 @@ mod handles; mod notificator; mod operator_info; -pub use self::handles::{InputHandle, InputHandleCore, FrontieredInputHandle, FrontieredInputHandleCore, OutputHandle, OutputHandleCore, OutputWrapper}; +pub use self::handles::{InputHandleCore, OutputHandle, OutputHandleCore, OutputWrapper}; pub use self::notificator::{Notificator, FrontierNotificator}; pub use self::operator::{Operator, source}; diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 5eb6bdfe4..de9b0e3ef 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -55,8 +55,8 @@ impl<'a, T: Timestamp> Notificator<'a, T> { /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .unary_notify(Pipeline, "example", Some(0), |input, output, notificator| { - /// input.for_each(|cap, data| { - /// output.session(&cap).give_container(data); + /// input.for_each_time(|cap, data| { + /// output.session(&cap).give_containers(data); /// let time = cap.time().clone() + 1; /// notificator.notify_at(cap.delayed(&time)); /// }); @@ -191,16 +191,16 @@ fn notificator_delivers_notifications_in_topo_order() { /// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { /// let mut notificator = FrontierNotificator::default(); /// let mut stash = HashMap::new(); -/// move |input1, input2, output| { -/// while let Some((time, data)) = input1.next() { -/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..)); +/// move |(input1, frontier1), (input2, frontier2), output| { +/// input1.for_each_time(|time, data| { +/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); /// notificator.notify_at(time.retain()); -/// } -/// while let Some((time, data)) = input2.next() { -/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..)); +/// }); +/// input2.for_each_time(|time, data| { +/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); /// notificator.notify_at(time.retain()); -/// } -/// notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _| { +/// }); +/// notificator.for_each(&[frontier1, frontier2], |time, _| { /// if let Some(mut vec) = stash.remove(time.time()) { /// output.session(&time).give_iterator(vec.drain(..)); /// } @@ -263,13 +263,13 @@ impl FrontierNotificator { /// (0..10).to_stream(scope) /// .unary_frontier(Pipeline, "example", |_, _| { /// let mut notificator = FrontierNotificator::default(); - /// move |input, output| { - /// input.for_each(|cap, data| { - /// output.session(&cap).give_container(data); + /// move |(input, frontier), output| { + /// input.for_each_time(|cap, data| { + /// output.session(&cap).give_containers(data); /// let time = cap.time().clone() + 1; /// notificator.notify_at(cap.delayed(&time)); /// }); - /// notificator.for_each(&[input.frontier()], |cap, _| { + /// notificator.for_each(&[frontier], |cap, _| { /// println!("done with time: {:?}", cap.time()); /// }); /// } @@ -393,14 +393,14 @@ impl FrontierNotificator { /// (0..10).to_stream(scope) /// .unary_frontier(Pipeline, "example", |_, _| { /// let mut notificator = FrontierNotificator::default(); - /// move |input, output| { - /// input.for_each(|cap, data| { - /// output.session(&cap).give_container(data); + /// move |(input, frontier), output| { + /// input.for_each_time(|cap, data| { + /// output.session(&cap).give_containers(data); /// let time = cap.time().clone() + 1; /// notificator.notify_at(cap.delayed(&time)); /// assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1); /// }); - /// notificator.for_each(&[input.frontier()], |cap, _| { + /// notificator.for_each(&[frontier], |cap, _| { /// println!("done with time: {:?}", cap.time()); /// }); /// } diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 79f6076e4..5599e81eb 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -1,10 +1,11 @@ //! Methods to construct generic streaming and blocking unary operators. +use crate::progress::frontier::MutableAntichain; use crate::dataflow::channels::pushers::Tee; use crate::dataflow::channels::pact::ParallelizationContract; -use crate::dataflow::operators::generic::handles::{InputHandleCore, FrontieredInputHandleCore, OutputHandleCore}; +use crate::dataflow::operators::generic::handles::{InputSession, OutputHandleCore}; use crate::dataflow::operators::capability::Capability; use crate::dataflow::{Scope, StreamCore}; @@ -34,16 +35,16 @@ pub trait Operator { /// let mut cap = Some(default_cap.delayed(&12)); /// let mut notificator = FrontierNotificator::default(); /// let mut stash = HashMap::new(); - /// move |input, output| { + /// move |(input, frontier), output| { /// if let Some(ref c) = cap.take() { /// output.session(&c).give(12); /// } - /// while let Some((time, data)) = input.next() { + /// input.for_each_time(|time, data| { /// stash.entry(time.time().clone()) /// .or_insert(Vec::new()) - /// .extend(data.drain(..)); - /// } - /// notificator.for_each(&[input.frontier()], |time, _not| { + /// .extend(data.flat_map(|d| d.drain(..))); + /// }); + /// notificator.for_each(&[frontier], |time, _not| { /// if let Some(mut vec) = stash.remove(time.time()) { /// output.session(&time).give_iterator(vec.drain(..)); /// } @@ -57,7 +58,7 @@ pub trait Operator { where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, + L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain), &mut OutputHandleCore>)+'static, P: ParallelizationContract; @@ -76,8 +77,8 @@ pub trait Operator { /// (0u64..10) /// .to_stream(scope) /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| { - /// input.for_each(|time, data| { - /// output.session(&time).give_container(data); + /// input.for_each_time(|time, data| { + /// output.session(&time).give_containers(data); /// notificator.notify_at(time.retain()); /// }); /// notificator.for_each(|time, _cnt, _not| { @@ -87,7 +88,7 @@ pub trait Operator { /// }); /// ``` fn unary_notify, + L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, &mut OutputHandleCore>, &mut Notificator)+'static, P: ParallelizationContract> @@ -112,9 +113,9 @@ pub trait Operator { /// if let Some(ref c) = cap.take() { /// output.session(&c).give(100); /// } - /// while let Some((time, data)) = input.next() { - /// output.session(&time).give_container(data); - /// } + /// input.for_each_time(|time, data| { + /// output.session(&time).give_containers(data); + /// }); /// } /// }); /// }); @@ -123,7 +124,7 @@ pub trait Operator { where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, + L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, &mut OutputHandleCore>)+'static, P: ParallelizationContract; @@ -145,16 +146,16 @@ pub trait Operator { /// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { /// let mut notificator = FrontierNotificator::default(); /// let mut stash = HashMap::new(); - /// move |input1, input2, output| { - /// while let Some((time, data)) = input1.next() { - /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..)); + /// move |(input1, frontier1), (input2, frontier2), output| { + /// input1.for_each_time(|time, data| { + /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); /// notificator.notify_at(time.retain()); - /// } - /// while let Some((time, data)) = input2.next() { - /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..)); + /// }); + /// input2.for_each_time(|time, data| { + /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..))); /// notificator.notify_at(time.retain()); - /// } - /// notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| { + /// }); + /// notificator.for_each(&[frontier1, frontier2], |time, _not| { /// if let Some(mut vec) = stash.remove(time.time()) { /// output.session(&time).give_iterator(vec.drain(..)); /// } @@ -180,8 +181,8 @@ pub trait Operator { C2: Container, CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, - &mut FrontieredInputHandleCore, + L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain), + (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain), &mut OutputHandleCore>)+'static, P1: ParallelizationContract, P2: ParallelizationContract; @@ -203,12 +204,12 @@ pub trait Operator { /// let (in2_handle, in2) = scope.new_input(); /// /// in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| { - /// input1.for_each(|time, data| { - /// output.session(&time).give_container(data); + /// input1.for_each_time(|time, data| { + /// output.session(&time).give_containers(data); /// notificator.notify_at(time.retain()); /// }); - /// input2.for_each(|time, data| { - /// output.session(&time).give_container(data); + /// input2.for_each_time(|time, data| { + /// output.session(&time).give_containers(data); /// notificator.notify_at(time.retain()); /// }); /// notificator.for_each(|time, _cnt, _not| { @@ -229,8 +230,8 @@ pub trait Operator { /// ``` fn binary_notify, - &mut InputHandleCore, + L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, + InputSession<'_, G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore>, &mut Notificator)+'static, P1: ParallelizationContract, @@ -257,12 +258,8 @@ pub trait Operator { /// if let Some(ref c) = cap.take() { /// output.session(&c).give(100); /// } - /// while let Some((time, data)) = input1.next() { - /// output.session(&time).give_container(data); - /// } - /// while let Some((time, data)) = input2.next() { - /// output.session(&time).give_container(data); - /// } + /// input1.for_each_time(|time, data| output.session(&time).give_containers(data)); + /// input2.for_each_time(|time, data| output.session(&time).give_containers(data)); /// } /// }).inspect(|x| println!("{:?}", x)); /// }); @@ -272,8 +269,8 @@ pub trait Operator { C2: Container, CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut InputHandleCore, + L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, + InputSession<'_, G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore>)+'static, P1: ParallelizationContract, P2: ParallelizationContract; @@ -292,18 +289,18 @@ pub trait Operator { /// timely::example(|scope| { /// (0u64..10) /// .to_stream(scope) - /// .sink(Pipeline, "example", |input| { - /// while let Some((time, data)) = input.next() { - /// for datum in data.iter() { + /// .sink(Pipeline, "example", |(input, frontier)| { + /// input.for_each_time(|time, data| { + /// for datum in data.flatten() { /// println!("{:?}:\t{:?}", time, datum); /// } - /// } + /// }); /// }); /// }); /// ``` fn sink(&self, pact: P, name: &str, logic: L) where - L: FnMut(&mut FrontieredInputHandleCore)+'static, + L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain))+'static, P: ParallelizationContract; } @@ -313,7 +310,7 @@ impl Operator for StreamCore { where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, + L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain), &mut OutputHandleCore>)+'static, P: ParallelizationContract { @@ -328,9 +325,8 @@ impl Operator for StreamCore { let capability = capabilities.pop().unwrap(); let mut logic = constructor(capability, operator_info); move |frontiers| { - let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]); let mut output_handle = output.activate(); - logic(&mut input_handle, &mut output_handle); + logic((input.activate(), &frontiers[0]), &mut output_handle); } }); @@ -338,7 +334,7 @@ impl Operator for StreamCore { } fn unary_notify, + L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, &mut OutputHandleCore>, &mut Notificator)+'static, P: ParallelizationContract> @@ -350,10 +346,10 @@ impl Operator for StreamCore { notificator.notify_at(capability.delayed(&time)); } - move |input, output| { - let frontier = &[input.frontier()]; - let notificator = &mut Notificator::new(frontier, &mut notificator); - logic(input.handle, output, notificator); + move |(input, frontier), output| { + let frontiers = &[frontier]; + let notificator = &mut Notificator::new(frontiers, &mut notificator); + logic(input, output, notificator); } }) } @@ -362,7 +358,7 @@ impl Operator for StreamCore { where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, + L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, &mut OutputHandleCore>)+'static, P: ParallelizationContract { @@ -377,10 +373,7 @@ impl Operator for StreamCore { // `capabilities` should be a single-element vector. let capability = capabilities.pop().unwrap(); let mut logic = constructor(capability, operator_info); - move |_frontiers| { - let mut output_handle = output.activate(); - logic(&mut input, &mut output_handle); - } + move |_frontiers| logic(input.activate(), &mut output.activate()) }); stream @@ -391,8 +384,8 @@ impl Operator for StreamCore { C2: Container, CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut FrontieredInputHandleCore, - &mut FrontieredInputHandleCore, + L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain), + (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain), &mut OutputHandleCore>)+'static, P1: ParallelizationContract, P2: ParallelizationContract { @@ -409,10 +402,8 @@ impl Operator for StreamCore { let capability = capabilities.pop().unwrap(); let mut logic = constructor(capability, operator_info); move |frontiers| { - let mut input1_handle = FrontieredInputHandleCore::new(&mut input1, &frontiers[0]); - let mut input2_handle = FrontieredInputHandleCore::new(&mut input2, &frontiers[1]); let mut output_handle = output.activate(); - logic(&mut input1_handle, &mut input2_handle, &mut output_handle); + logic((input1.activate(), &frontiers[0]), (input2.activate(), &frontiers[1]), &mut output_handle); } }); @@ -421,8 +412,8 @@ impl Operator for StreamCore { fn binary_notify, - &mut InputHandleCore, + L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, + InputSession<'_, G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore>, &mut Notificator)+'static, P1: ParallelizationContract, @@ -435,10 +426,10 @@ impl Operator for StreamCore { notificator.notify_at(capability.delayed(&time)); } - move |input1, input2, output| { - let frontiers = &[input1.frontier(), input2.frontier()]; + move |(input1, frontier1), (input2, frontier2), output| { + let frontiers = &[frontier1, frontier2]; let notificator = &mut Notificator::new(frontiers, &mut notificator); - logic(input1.handle, input2.handle, output, notificator); + logic(input1, input2, output, notificator); } }) @@ -450,8 +441,8 @@ impl Operator for StreamCore { C2: Container, CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut InputHandleCore, - &mut InputHandleCore, + L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, + InputSession<'_, G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore>)+'static, P1: ParallelizationContract, P2: ParallelizationContract { @@ -470,7 +461,7 @@ impl Operator for StreamCore { let mut logic = constructor(capability, operator_info); move |_frontiers| { let mut output_handle = output.activate(); - logic(&mut input1, &mut input2, &mut output_handle); + logic(input1.activate(), input2.activate(), &mut output_handle); } }); @@ -479,7 +470,7 @@ impl Operator for StreamCore { fn sink(&self, pact: P, name: &str, mut logic: L) where - L: FnMut(&mut FrontieredInputHandleCore)+'static, + L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain))+'static, P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); @@ -487,8 +478,7 @@ impl Operator for StreamCore { builder.build(|_capabilities| { move |frontiers| { - let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]); - logic(&mut input_handle); + logic((input.activate(), &frontiers[0])); } }); } diff --git a/timely/src/dataflow/operators/map.rs b/timely/src/dataflow/operators/map.rs index 5adab6e10..97d0712ac 100644 --- a/timely/src/dataflow/operators/map.rs +++ b/timely/src/dataflow/operators/map.rs @@ -54,9 +54,12 @@ pub trait Map { impl Map for Stream { fn map_in_place(&self, mut logic: L) -> Stream { self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| { - input.for_each(|time, data| { - for datum in data.iter_mut() { logic(datum); } - output.session(&time).give_container(data); + input.for_each_time(|time, data| { + let mut session = output.session(&time); + for data in data { + for datum in data.iter_mut() { logic(datum); } + session.give_container(data); + } }) }) } diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index 6cfc140c1..2634b729c 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -173,12 +173,11 @@ impl Sequencer { .sink( Exchange::new(|x: &(usize, usize, T)| x.0 as u64), "SequenceOutput", - move |input| { + move |(input, frontier)| { // grab each command and queue it up - input.for_each(|time, data| { - recvd.reserve(data.len()); - for (worker, counter, element) in data.drain(..) { + input.for_each_time(|time, data| { + for (worker, counter, element) in data.flat_map(|d| d.drain(..)) { recvd.push(((*time.time(), worker, counter), element)); } }); @@ -194,7 +193,7 @@ impl Sequencer { } // determine how many (which) elements to read from `recvd`. - let count = recvd.iter().filter(|&((ref time, _, _), _)| !input.frontier().less_equal(time)).count(); + let count = recvd.iter().filter(|&((ref time, _, _), _)| !frontier.less_equal(time)).count(); let iter = recvd.drain(..count); if let Some(recv_queue) = recv_weak.upgrade() { diff --git a/timely/tests/gh_523.rs b/timely/tests/gh_523.rs index 19108330f..589420081 100644 --- a/timely/tests/gh_523.rs +++ b/timely/tests/gh_523.rs @@ -12,10 +12,12 @@ fn gh_523() { .input_from(&mut input) .unary(Pipeline, "Test", move |_, _| { move |input, output| { - input.for_each(|cap, data| { + input.for_each_time(|cap, data| { let mut session = output.session(&cap); - session.give_container(&mut Vec::new()); - session.give_container(data); + for data in data { + session.give_container(&mut Vec::new()); + session.give_container(data); + } }); } }) diff --git a/timely/tests/shape_scaling.rs b/timely/tests/shape_scaling.rs index ba2268fb9..75cbf141d 100644 --- a/timely/tests/shape_scaling.rs +++ b/timely/tests/shape_scaling.rs @@ -37,9 +37,9 @@ fn operator_scaling(scale: u64) { move |_frontiers| { for (input, output) in handles.iter_mut() { let mut output = output.activate(); - input.for_each(|time, data| { + input.activate().for_each_time(|time, data| { let mut output = output.session_with_builder(&time); - for datum in data.drain(..) { + for datum in data.flat_map(|d| d.drain(..)) { output.give(datum); } });