diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 43d81b00c..17648a194 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -33,11 +33,16 @@ use std::collections::HashMap; use std::ops::Mul; +use std::time::Instant; -use timely::dataflow::Scope; +use timely::container::{CapacityContainerBuilder, ContainerBuilder}; +use timely::dataflow::{Scope, ScopeParent, StreamCore}; use timely::dataflow::channels::pact::{Pipeline, Exchange}; +use timely::dataflow::channels::pushers::buffer::Session; +use timely::dataflow::channels::pushers::{Counter as PushCounter, Tee}; use timely::dataflow::operators::Operator; use timely::progress::Antichain; +use timely::progress::frontier::AntichainRef; use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{Monoid, Semigroup}; @@ -88,18 +93,36 @@ where DOut: Clone+'static, S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static, { - let output_func = move |k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| { - let diff = diff1.clone() * diff2.clone(); - let dout = (output_func(k, v1, v2), time.clone()); - Some((dout, initial.clone(), diff)) + let output_func = move |session: &mut SessionFor, k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, diff1: &R, output: &mut Vec<(G::Timestamp, Tr::Diff)>| { + for (time, diff2) in output.drain(..) { + let diff = diff1.clone() * diff2.clone(); + let dout = (output_func(k, v1, v2), time.clone()); + session.give((dout, initial.clone(), diff)); + } }; - half_join_internal_unsafe(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func) + half_join_internal_unsafe::<_, _, _, _, _, _,_,_,_, CapacityContainerBuilder>>(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func) + .as_collection() } +/// A session with lifetime `'a` in a scope `G` with a container builder `CB`. +/// +/// This is a shorthand primarily for the reson of readability. +type SessionFor<'a, G, CB> = + Session<'a, + ::Timestamp, + CB, + PushCounter< + ::Timestamp, + ::Container, + Tee<::Timestamp, ::Container> + > + >; + /// An unsafe variant of `half_join` where the `output_func` closure takes -/// additional arguments for `time` and `diff` as input and returns an iterator -/// over `(data, time, diff)` triplets. This allows for more flexibility, but -/// is more error-prone. +/// additional arguments a vector of `time` and `diff` tuples as input and +/// writes its outputs at a container builder. The container builder +/// can, but isn't required to, accept `(data, time, diff)` triplets. +/// This allows for more flexibility, but is more error-prone. /// /// This operator responds to inputs of the form /// @@ -110,7 +133,7 @@ where /// where `initial_time` is less or equal to `time1`, and produces as output /// /// ```ignore -/// output_func(key, val1, val2, initial_time, lub(time1, time2), diff1, diff2) +/// output_func(session, key, val1, val2, initial_time, diff1, &[lub(time1, time2), diff2]) /// ``` /// /// for each `((key, val2), time2, diff2)` present in `arrangement`, where @@ -120,14 +143,14 @@ where /// yield control, as a function of the elapsed time and the number of matched /// records. Note this is not the number of *output* records, owing mainly to /// the number of matched records being easiest to record with low overhead. -pub fn half_join_internal_unsafe( +pub fn half_join_internal_unsafe( stream: &Collection, mut arrangement: Arranged, frontier_func: FF, comparison: CF, yield_function: Y, mut output_func: S, -) -> Collection +) -> StreamCore where G: Scope, K: Hashable + ExchangeData, @@ -136,11 +159,9 @@ where Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>>+Clone+'static, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, - DOut: Clone+'static, - ROut: Semigroup + 'static, Y: Fn(std::time::Instant, usize) -> bool + 'static, - I: IntoIterator, - S: FnMut(&K, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static, + S: FnMut(&mut SessionFor, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static, + CB: ContainerBuilder, { // No need to block physical merging for this operator. arrangement.trace.set_physical_compaction(Antichain::new().borrow()); @@ -165,7 +186,7 @@ where input1.for_each(|capability, data| { stash.entry(capability.retain()) .or_insert(Vec::new()) - .extend(data.drain(..)) + .append(data) }); // Drain input batches; although we do not observe them, we want access to the input @@ -192,43 +213,22 @@ where yielded = yielded || yield_function(timer, work); if !yielded && !input2.frontier.less_equal(capability.time()) { - let mut session = output.session(capability); - - // Sort requests by key for in-order cursor traversal. - consolidate_updates(proposals); - - let (mut cursor, storage) = trace.cursor(); + let frontier = input2.frontier.frontier(); - // Process proposals one at a time, stopping if we should yield. - for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() { - // Use TOTAL ORDER to allow the release of `time`. - yielded = yielded || yield_function(timer, work); - if !yielded && !input2.frontier.frontier().iter().any(|t| comparison( as IntoOwned>::borrow_as(t), initial)) { - use differential_dataflow::trace::cursor::IntoOwned; - cursor.seek_key(&storage, IntoOwned::borrow_as(key)); - if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) { - while let Some(val2) = cursor.get_val(&storage) { - cursor.map_times(&storage, |t, d| { - if comparison(t, initial) { - let mut t = t.into_owned(); - t.join_assign(time); - output_buffer.push((t, d.into_owned())) - } - }); - consolidate(&mut output_buffer); - work += output_buffer.len(); - for (time, diff2) in output_buffer.drain(..) { - for dout in output_func(&key, val1, val2, initial, &time, &diff1, &diff2) { - session.give(dout); - } - } - cursor.step_val(&storage); - } - cursor.rewind_vals(&storage); - } - *diff1 = R::zero(); - } - } + // Update yielded: We can only go from false to {false, true} as + // we're checking that `!yielded` holds before entering this block. + yielded = process_proposals::( + &comparison, + &yield_function, + &mut output_func, + &mut output_buffer, + timer, + &mut work, + trace, + proposals, + output.session_with_builder(capability), + frontier + ); proposals.retain(|ptd| !ptd.2.is_zero()); @@ -243,7 +243,7 @@ where stash_additions .entry(capability.delayed(&antichain[0])) .or_insert(Vec::new()) - .extend(proposals.drain(..)); + .append(proposals); } else if antichain.len() > 1 { // Any remaining times should peel off elements from `proposals`. @@ -290,5 +290,76 @@ where arrangement_trace = None; } } - }).as_collection() + }) +} + +/// Outlined inner loop for `half_join_internal_unsafe` for reasons of performance. +/// +/// Gives Rust/LLVM the opportunity to inline the loop body instead of inlining the loop and +/// leaving all calls in the loop body outlined. +/// +/// Consumes proposals until the yield function returns `true` or all proposals are processed. +/// Leaves a zero diff in place for all proposals that were processed. +/// +/// Returns `true` if the operator should yield. +fn process_proposals( + comparison: &CF, + yield_function: &Y, + output_func: &mut S, + mut output_buffer: &mut Vec<(::Time, ::Diff)>, + timer: Instant, + work: &mut usize, + trace: &mut Tr, + proposals: &mut Vec<((K, V, ::Time), ::Time, R)>, + mut session: SessionFor, + frontier: AntichainRef<::Time> +) -> bool +where + G: Scope, + Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>>, + CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, + Y: Fn(Instant, usize) -> bool + 'static, + S: FnMut(&mut SessionFor, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static, + CB: ContainerBuilder, + K: Ord, + V: Ord, + R: Monoid, +{ + // Sort requests by key for in-order cursor traversal. + consolidate_updates(proposals); + + let (mut cursor, storage) = trace.cursor(); + let mut yielded = false; + + // Process proposals one at a time, stopping if we should yield. + for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() { + // Use TOTAL ORDER to allow the release of `time`. + yielded = yielded || yield_function(timer, *work); + if !yielded && !frontier.iter().any(|t| comparison( as IntoOwned>::borrow_as(t), initial)) { + use differential_dataflow::trace::cursor::IntoOwned; + cursor.seek_key(&storage, IntoOwned::borrow_as(key)); + if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) { + while let Some(val2) = cursor.get_val(&storage) { + cursor.map_times(&storage, |t, d| { + if comparison(t, initial) { + let mut t = t.into_owned(); + t.join_assign(time); + output_buffer.push((t, d.into_owned())) + } + }); + consolidate(&mut output_buffer); + *work += output_buffer.len(); + output_func(&mut session, key, val1, val2, initial, diff1, &mut output_buffer); + // Defensive clear; we'd expect `output_func` to clear the buffer. + // TODO: Should we assert it is empty? + output_buffer.clear(); + cursor.step_val(&storage); + } + cursor.rewind_vals(&storage); + } + *diff1 = R::zero(); + } + } + + yielded }