Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 80 additions & 34 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,24 @@

use std::collections::HashMap;
use std::ops::Mul;
use std::time::Instant;

use timely::container::{CapacityContainerBuilder, ContainerBuilder};
use timely::dataflow::{Scope, ScopeParent, StreamCore};
use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter as PushCounter, Tee};
use timely::dataflow::operators::Operator;
use timely::dataflow::{Scope, ScopeParent, StreamCore};
use timely::progress::Antichain;
use timely::progress::frontier::AntichainRef;

use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
use differential_dataflow::difference::{Monoid, Semigroup};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
use differential_dataflow::trace::cursor::IntoOwned;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};

/// A binary equijoin that responds to updates on only its first input.
///
Expand Down Expand Up @@ -102,6 +104,7 @@ where
.as_collection()
}

/// Utility type for a session in scope `G` with a container builder `CB`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose we aim for a better comment here. Yes it is a utility type, but that does not clarify what it is for, how you should hold it, etc. In particular, I'd stress that this is just a shortening of a type for readability. I think potentially a better (imo) comment would just be

/// A shorthand for an otherwise complex type describing a session with lifetime `'a` in a scope `G` with a container builder `CB`.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe flipped around a bit to be

/// A session with lifetime `'a` in a scope `G` with a container builder `CB`.
///
/// This is a shorthand primarily for purposes of readability.

type SessionFor<'a, G, CB> =
Session<'a,
<G as ScopeParent>::Timestamp,
Expand Down Expand Up @@ -207,40 +210,23 @@ where
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.less_equal(capability.time()) {

let mut session = output.session_with_builder(capability);

// Sort requests by key for in-order cursor traversal.
consolidate_updates(proposals);

let (mut cursor, storage) = trace.cursor();
let frontier = input2.frontier.frontier();

// Process proposals one at a time, stopping if we should yield.
for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(<Tr::TimeGat<'_> as IntoOwned>::borrow_as(t), initial)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
let mut t = t.into_owned();
t.join_assign(time);
output_buffer.push((t, d.into_owned()))
}
});
consolidate(&mut output_buffer);
work += output_buffer.len();
output_func(&mut session, key, val1, val2, initial, diff1, &mut output_buffer);
output_buffer.clear();
cursor.step_val(&storage);
}
cursor.rewind_vals(&storage);
}
*diff1 = R::zero();
}
}
yielded |= process_proposals::<G, _, _, _, _, _, _, _, _>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to avoid relying short-circuited evaluation if at all possible. Mostly from a clarity point of view.

&comparison,
&yield_function,
&mut output_func,
&mut output_buffer,
timer,
&mut work,
trace,
proposals,
output.session_with_builder(capability),
frontier
);

proposals.retain(|ptd| !ptd.2.is_zero());

Expand Down Expand Up @@ -304,3 +290,63 @@ where
}
})
}

/// Process proposals one at a time, yielding if necessary.
///
/// Returns `true` if the operator should yield.
///
/// Utility function for `half_join_internal_unsafe`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new layout confuses me, and I think it's partly that the comment used to be on a for-loop, where I could understand what we were talking about wrt "one at a time, yielding if necessary". Now it's a method, and .. it's totally unclear what this means.

This might just be Rust being awkward around refactoring, where the absolute wall of captured state is .. necessary, but utterly inexplicable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature and constraints are almost (20 lines) as long as the method body (30 lines).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tl;dr is that this probably needs a real refactoring, which I'm happy to do, whereas this is more a tear-out. My understanding is that this is for performance more than anything, because Rust/LLVM is unwilling to perform the intended inlining in complex code blocks?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like with the type above, perhaps the best documentation here is just

/// Outlining of the inner loop of `half_join_internal_unsafe` for reasons of performance.

fn process_proposals<G, Tr, CF, Y, S, CB, K, V, R>(
comparison: &CF,
yield_function: &Y,
output_func: &mut S,
mut output_buffer: &mut Vec<(<Tr as TraceReader>::Time, <Tr as TraceReader>::Diff)>,
timer: Instant,
work: &mut usize,
trace: &mut Tr,
proposals: &mut Vec<((K, V, <Tr as TraceReader>::Time), <Tr as TraceReader>::Time, R)>,
mut session: SessionFor<G, CB>,
frontier: AntichainRef<<Tr as TraceReader>::Time>
) -> bool
where
G: Scope<Timestamp = Tr::Time>,
Tr: for<'a> TraceReader<Key<'a> : IntoOwned<'a, Owned = K>>,
CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
Y: Fn(Instant, usize) -> bool + 'static,
S: FnMut(&mut SessionFor<G, CB>, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
CB: ContainerBuilder,
R: Monoid,
{
let (mut cursor, storage) = trace.cursor();
let mut yielded = false;

// Process proposals one at a time, stopping if we should yield.
for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, *work);
if !yielded && !frontier.iter().any(|t| comparison(<Tr::TimeGat<'_> as IntoOwned>::borrow_as(t), initial)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
let mut t = t.into_owned();
t.join_assign(time);
output_buffer.push((t, d.into_owned()))
}
});
consolidate(&mut output_buffer);
*work += output_buffer.len();
output_func(&mut session, key, val1, val2, initial, diff1, &mut output_buffer);
output_buffer.clear();
cursor.step_val(&storage);
}
cursor.rewind_vals(&storage);
}
*diff1 = R::zero();
}
}

yielded
}