Skip to content

Commit 45ba04b

Browse files
committed
Extract inner loop of half_join to separate function
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 3320b14 commit 45ba04b

File tree

1 file changed

+80
-34
lines changed

1 file changed

+80
-34
lines changed

dogsdogsdogs/src/operators/half_join.rs

Lines changed: 80 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,24 @@
3333
3434
use std::collections::HashMap;
3535
use std::ops::Mul;
36+
use std::time::Instant;
3637

3738
use timely::container::{CapacityContainerBuilder, ContainerBuilder};
38-
use timely::dataflow::{Scope, ScopeParent, StreamCore};
3939
use timely::dataflow::channels::pact::{Pipeline, Exchange};
4040
use timely::dataflow::channels::pushers::buffer::Session;
4141
use timely::dataflow::channels::pushers::{Counter as PushCounter, Tee};
4242
use timely::dataflow::operators::Operator;
43+
use timely::dataflow::{Scope, ScopeParent, StreamCore};
4344
use timely::progress::Antichain;
45+
use timely::progress::frontier::AntichainRef;
4446

45-
use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
47+
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
4648
use differential_dataflow::difference::{Monoid, Semigroup};
4749
use differential_dataflow::lattice::Lattice;
4850
use differential_dataflow::operators::arrange::Arranged;
49-
use differential_dataflow::trace::{Cursor, TraceReader};
50-
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
5151
use differential_dataflow::trace::cursor::IntoOwned;
52+
use differential_dataflow::trace::{Cursor, TraceReader};
53+
use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
5254

5355
/// A binary equijoin that responds to updates on only its first input.
5456
///
@@ -102,6 +104,7 @@ where
102104
.as_collection()
103105
}
104106

107+
/// Utility type for a session in scope `G` with a container builder `CB`.
105108
type SessionFor<'a, G, CB> =
106109
Session<'a,
107110
<G as ScopeParent>::Timestamp,
@@ -207,40 +210,23 @@ where
207210
yielded = yielded || yield_function(timer, work);
208211
if !yielded && !input2.frontier.less_equal(capability.time()) {
209212

210-
let mut session = output.session_with_builder(capability);
211-
212213
// Sort requests by key for in-order cursor traversal.
213214
consolidate_updates(proposals);
214215

215-
let (mut cursor, storage) = trace.cursor();
216+
let frontier = input2.frontier.frontier();
216217

217-
// Process proposals one at a time, stopping if we should yield.
218-
for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
219-
// Use TOTAL ORDER to allow the release of `time`.
220-
yielded = yielded || yield_function(timer, work);
221-
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(<Tr::TimeGat<'_> as IntoOwned>::borrow_as(t), initial)) {
222-
use differential_dataflow::trace::cursor::IntoOwned;
223-
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
224-
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
225-
while let Some(val2) = cursor.get_val(&storage) {
226-
cursor.map_times(&storage, |t, d| {
227-
if comparison(t, initial) {
228-
let mut t = t.into_owned();
229-
t.join_assign(time);
230-
output_buffer.push((t, d.into_owned()))
231-
}
232-
});
233-
consolidate(&mut output_buffer);
234-
work += output_buffer.len();
235-
output_func(&mut session, key, val1, val2, initial, diff1, &mut output_buffer);
236-
output_buffer.clear();
237-
cursor.step_val(&storage);
238-
}
239-
cursor.rewind_vals(&storage);
240-
}
241-
*diff1 = R::zero();
242-
}
243-
}
218+
yielded |= process_proposals::<G, _, _, _, _, _, _, _, _>(
219+
&comparison,
220+
&yield_function,
221+
&mut output_func,
222+
&mut output_buffer,
223+
timer,
224+
&mut work,
225+
trace,
226+
proposals,
227+
output.session_with_builder(capability),
228+
frontier
229+
);
244230

245231
proposals.retain(|ptd| !ptd.2.is_zero());
246232

@@ -304,3 +290,63 @@ where
304290
}
305291
})
306292
}
293+
294+
/// Process proposals one at a time, yielding if necessary.
295+
///
296+
/// Returns `true` if the operator should yield.
297+
///
298+
/// Utility function for `half_join_internal_unsafe`.
299+
fn process_proposals<G, Tr, CF, Y, S, CB, K, V, R>(
300+
comparison: &CF,
301+
yield_function: &Y,
302+
output_func: &mut S,
303+
mut output_buffer: &mut Vec<(<Tr as TraceReader>::Time, <Tr as TraceReader>::Diff)>,
304+
timer: Instant,
305+
work: &mut usize,
306+
trace: &mut Tr,
307+
proposals: &mut Vec<((K, V, <Tr as TraceReader>::Time), <Tr as TraceReader>::Time, R)>,
308+
mut session: SessionFor<G, CB>,
309+
frontier: AntichainRef<<Tr as TraceReader>::Time>
310+
) -> bool
311+
where
312+
G: Scope<Timestamp = Tr::Time>,
313+
Tr: for<'a> TraceReader<Key<'a> : IntoOwned<'a, Owned = K>>,
314+
CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
315+
Y: Fn(Instant, usize) -> bool + 'static,
316+
S: FnMut(&mut SessionFor<G, CB>, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
317+
CB: ContainerBuilder,
318+
R: Monoid,
319+
{
320+
let (mut cursor, storage) = trace.cursor();
321+
let mut yielded = false;
322+
323+
// Process proposals one at a time, stopping if we should yield.
324+
for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
325+
// Use TOTAL ORDER to allow the release of `time`.
326+
yielded = yielded || yield_function(timer, *work);
327+
if !yielded && !frontier.iter().any(|t| comparison(<Tr::TimeGat<'_> as IntoOwned>::borrow_as(t), initial)) {
328+
use differential_dataflow::trace::cursor::IntoOwned;
329+
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
330+
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
331+
while let Some(val2) = cursor.get_val(&storage) {
332+
cursor.map_times(&storage, |t, d| {
333+
if comparison(t, initial) {
334+
let mut t = t.into_owned();
335+
t.join_assign(time);
336+
output_buffer.push((t, d.into_owned()))
337+
}
338+
});
339+
consolidate(&mut output_buffer);
340+
*work += output_buffer.len();
341+
output_func(&mut session, key, val1, val2, initial, diff1, &mut output_buffer);
342+
output_buffer.clear();
343+
cursor.step_val(&storage);
344+
}
345+
cursor.rewind_vals(&storage);
346+
}
347+
*diff1 = R::zero();
348+
}
349+
}
350+
351+
yielded
352+
}

0 commit comments

Comments
 (0)