Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 126 additions & 55 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@

use std::collections::HashMap;
use std::ops::Mul;
use std::time::Instant;

use timely::dataflow::Scope;
use timely::container::{CapacityContainerBuilder, ContainerBuilder};
use timely::dataflow::{Scope, ScopeParent, StreamCore};
use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter as PushCounter, Tee};
use timely::dataflow::operators::Operator;
use timely::progress::Antichain;
use timely::progress::frontier::AntichainRef;

use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{Monoid, Semigroup};
Expand Down Expand Up @@ -88,18 +93,36 @@ where
DOut: Clone+'static,
S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static,
{
let output_func = move |k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, time: &G::Timestamp, diff1: &R, diff2: &Tr::Diff| {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
let output_func = move |session: &mut SessionFor<G, _>, k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, diff1: &R, output: &mut Vec<(G::Timestamp, Tr::Diff)>| {
for (time, diff2) in output.drain(..) {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
session.give((dout, initial.clone(), diff));
}
};
half_join_internal_unsafe(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func)
half_join_internal_unsafe::<_, _, _, _, _, _,_,_,_, CapacityContainerBuilder<Vec<_>>>(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func)
.as_collection()
}

/// A session with lifetime `'a` in a scope `G` with a container builder `CB`.
///
/// This is a shorthand primarily for the reson of readability.
type SessionFor<'a, G, CB> =
Session<'a,
<G as ScopeParent>::Timestamp,
CB,
PushCounter<
<G as ScopeParent>::Timestamp,
<CB as ContainerBuilder>::Container,
Tee<<G as ScopeParent>::Timestamp, <CB as ContainerBuilder>::Container>
>
>;

/// An unsafe variant of `half_join` where the `output_func` closure takes
/// additional arguments for `time` and `diff` as input and returns an iterator
/// over `(data, time, diff)` triplets. This allows for more flexibility, but
/// is more error-prone.
/// additional arguments a vector of `time` and `diff` tuples as input and
/// writes its outputs at a container builder. The container builder
/// can, but isn't required to, accept `(data, time, diff)` triplets.
/// This allows for more flexibility, but is more error-prone.
///
/// This operator responds to inputs of the form
///
Expand All @@ -110,7 +133,7 @@ where
/// where `initial_time` is less or equal to `time1`, and produces as output
///
/// ```ignore
/// output_func(key, val1, val2, initial_time, lub(time1, time2), diff1, diff2)
/// output_func(session, key, val1, val2, initial_time, diff1, &[lub(time1, time2), diff2])
/// ```
///
/// for each `((key, val2), time2, diff2)` present in `arrangement`, where
Expand All @@ -120,14 +143,14 @@ where
/// yield control, as a function of the elapsed time and the number of matched
/// records. Note this is not the number of *output* records, owing mainly to
/// the number of matched records being easiest to record with low overhead.
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, DOut, ROut, Y, I, S>(
pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, Y, S, CB>(
stream: &Collection<G, (K, V, G::Timestamp), R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
yield_function: Y,
mut output_func: S,
) -> Collection<G, DOut, ROut>
) -> StreamCore<G, CB::Container>
where
G: Scope<Timestamp = Tr::Time>,
K: Hashable + ExchangeData,
Expand All @@ -136,11 +159,9 @@ where
Tr: for<'a> TraceReader<Key<'a> : IntoOwned<'a, Owned = K>>+Clone+'static,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
DOut: Clone+'static,
ROut: Semigroup + 'static,
Y: Fn(std::time::Instant, usize) -> bool + 'static,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&K, &V, Tr::Val<'_>, &G::Timestamp, &G::Timestamp, &R, &Tr::Diff)-> I + 'static,
S: FnMut(&mut SessionFor<G, CB>, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
CB: ContainerBuilder,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand All @@ -165,7 +186,7 @@ where
input1.for_each(|capability, data| {
stash.entry(capability.retain())
.or_insert(Vec::new())
.extend(data.drain(..))
.append(data)
});

// Drain input batches; although we do not observe them, we want access to the input
Expand All @@ -192,43 +213,22 @@ where
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.less_equal(capability.time()) {

let mut session = output.session(capability);

// Sort requests by key for in-order cursor traversal.
consolidate_updates(proposals);

let (mut cursor, storage) = trace.cursor();
let frontier = input2.frontier.frontier();

// Process proposals one at a time, stopping if we should yield.
for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(<Tr::TimeGat<'_> as IntoOwned>::borrow_as(t), initial)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
let mut t = t.into_owned();
t.join_assign(time);
output_buffer.push((t, d.into_owned()))
}
});
consolidate(&mut output_buffer);
work += output_buffer.len();
for (time, diff2) in output_buffer.drain(..) {
for dout in output_func(&key, val1, val2, initial, &time, &diff1, &diff2) {
session.give(dout);
}
}
cursor.step_val(&storage);
}
cursor.rewind_vals(&storage);
}
*diff1 = R::zero();
}
}
// Update yielded: We can only go from false to {false, true} as
// we're checking that `!yielded` holds before entering this block.
yielded = process_proposals::<G, _, _, _, _, _, _, _, _>(
&comparison,
&yield_function,
&mut output_func,
&mut output_buffer,
timer,
&mut work,
trace,
proposals,
output.session_with_builder(capability),
frontier
);

proposals.retain(|ptd| !ptd.2.is_zero());

Expand All @@ -243,7 +243,7 @@ where
stash_additions
.entry(capability.delayed(&antichain[0]))
.or_insert(Vec::new())
.extend(proposals.drain(..));
.append(proposals);
}
else if antichain.len() > 1 {
// Any remaining times should peel off elements from `proposals`.
Expand Down Expand Up @@ -290,5 +290,76 @@ where
arrangement_trace = None;
}
}
}).as_collection()
})
}

/// Outlined inner loop for `half_join_internal_unsafe` for reasons of performance.
///
/// Gives Rust/LLVM the opportunity to inline the loop body instead of inlining the loop and
/// leaving all calls in the loop body outlined.
///
/// Consumes proposals until the yield function returns `true` or all proposals are processed.
/// Leaves a zero diff in place for all proposals that were processed.
///
/// Returns `true` if the operator should yield.
fn process_proposals<G, Tr, CF, Y, S, CB, K, V, R>(
comparison: &CF,
yield_function: &Y,
output_func: &mut S,
mut output_buffer: &mut Vec<(<Tr as TraceReader>::Time, <Tr as TraceReader>::Diff)>,
timer: Instant,
work: &mut usize,
trace: &mut Tr,
proposals: &mut Vec<((K, V, <Tr as TraceReader>::Time), <Tr as TraceReader>::Time, R)>,
mut session: SessionFor<G, CB>,
frontier: AntichainRef<<Tr as TraceReader>::Time>
) -> bool
where
G: Scope<Timestamp = Tr::Time>,
Tr: for<'a> TraceReader<Key<'a> : IntoOwned<'a, Owned = K>>,
CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
Y: Fn(Instant, usize) -> bool + 'static,
S: FnMut(&mut SessionFor<G, CB>, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
CB: ContainerBuilder,
K: Ord,
V: Ord,
R: Monoid,
{
// Sort requests by key for in-order cursor traversal.
consolidate_updates(proposals);

let (mut cursor, storage) = trace.cursor();
let mut yielded = false;

// Process proposals one at a time, stopping if we should yield.
for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, *work);
if !yielded && !frontier.iter().any(|t| comparison(<Tr::TimeGat<'_> as IntoOwned>::borrow_as(t), initial)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
let mut t = t.into_owned();
t.join_assign(time);
output_buffer.push((t, d.into_owned()))
}
});
consolidate(&mut output_buffer);
*work += output_buffer.len();
output_func(&mut session, key, val1, val2, initial, diff1, &mut output_buffer);
// Defensive clear; we'd expect `output_func` to clear the buffer.
// TODO: Should we assert it is empty?
output_buffer.clear();
cursor.step_val(&storage);
}
cursor.rewind_vals(&storage);
}
*diff1 = R::zero();
}
}

yielded
}