3333
3434use std:: collections:: HashMap ;
3535use std:: ops:: Mul ;
36+ use std:: time:: Instant ;
3637
37- use timely:: dataflow:: Scope ;
38+ use timely:: container:: { CapacityContainerBuilder , ContainerBuilder } ;
39+ use timely:: dataflow:: { Scope , ScopeParent , StreamCore } ;
3840use timely:: dataflow:: channels:: pact:: { Pipeline , Exchange } ;
41+ use timely:: dataflow:: channels:: pushers:: buffer:: Session ;
42+ use timely:: dataflow:: channels:: pushers:: { Counter as PushCounter , Tee } ;
3943use timely:: dataflow:: operators:: Operator ;
4044use timely:: progress:: Antichain ;
45+ use timely:: progress:: frontier:: AntichainRef ;
4146
4247use differential_dataflow:: { ExchangeData , Collection , AsCollection , Hashable } ;
4348use differential_dataflow:: difference:: { Monoid , Semigroup } ;
@@ -88,18 +93,36 @@ where
8893 DOut : Clone +' static ,
8994 S : FnMut ( & K , & V , Tr :: Val < ' _ > ) ->DOut +' static ,
9095{
91- let output_func = move |k : & K , v1 : & V , v2 : Tr :: Val < ' _ > , initial : & G :: Timestamp , time : & G :: Timestamp , diff1 : & R , diff2 : & Tr :: Diff | {
92- let diff = diff1. clone ( ) * diff2. clone ( ) ;
93- let dout = ( output_func ( k, v1, v2) , time. clone ( ) ) ;
94- Some ( ( dout, initial. clone ( ) , diff) )
96+ let output_func = move |session : & mut SessionFor < G , _ > , k : & K , v1 : & V , v2 : Tr :: Val < ' _ > , initial : & G :: Timestamp , diff1 : & R , output : & mut Vec < ( G :: Timestamp , Tr :: Diff ) > | {
97+ for ( time, diff2) in output. drain ( ..) {
98+ let diff = diff1. clone ( ) * diff2. clone ( ) ;
99+ let dout = ( output_func ( k, v1, v2) , time. clone ( ) ) ;
100+ session. give ( ( dout, initial. clone ( ) , diff) ) ;
101+ }
95102 } ;
96- half_join_internal_unsafe ( stream, arrangement, frontier_func, comparison, |_timer, _count| false , output_func)
103+ half_join_internal_unsafe :: < _ , _ , _ , _ , _ , _ , _ , _ , _ , CapacityContainerBuilder < Vec < _ > > > ( stream, arrangement, frontier_func, comparison, |_timer, _count| false , output_func)
104+ . as_collection ( )
97105}
98106
107+ /// A session with lifetime `'a` in a scope `G` with a container builder `CB`.
108+ ///
109+ /// This is a shorthand primarily for the reson of readability.
110+ type SessionFor < ' a , G , CB > =
111+ Session < ' a ,
112+ <G as ScopeParent >:: Timestamp ,
113+ CB ,
114+ PushCounter <
115+ <G as ScopeParent >:: Timestamp ,
116+ <CB as ContainerBuilder >:: Container ,
117+ Tee < <G as ScopeParent >:: Timestamp , <CB as ContainerBuilder >:: Container >
118+ >
119+ > ;
120+
99121/// An unsafe variant of `half_join` where the `output_func` closure takes
100- /// additional arguments for `time` and `diff` as input and returns an iterator
101- /// over `(data, time, diff)` triplets. This allows for more flexibility, but
102- /// is more error-prone.
122+ /// additional arguments a vector of `time` and `diff` tuples as input and
123+ /// writes its outputs at a container builder. The container builder
124+ /// can, but isn't required to, accept `(data, time, diff)` triplets.
125+ /// This allows for more flexibility, but is more error-prone.
103126///
104127/// This operator responds to inputs of the form
105128///
@@ -110,7 +133,7 @@ where
110133/// where `initial_time` is less or equal to `time1`, and produces as output
111134///
112135/// ```ignore
113- /// output_func(key, val1, val2, initial_time, lub(time1, time2), diff1, diff2)
136+ /// output_func(session, key, val1, val2, initial_time, diff1, &[ lub(time1, time2), diff2] )
114137/// ```
115138///
116139/// for each `((key, val2), time2, diff2)` present in `arrangement`, where
@@ -120,14 +143,14 @@ where
120143/// yield control, as a function of the elapsed time and the number of matched
121144/// records. Note this is not the number of *output* records, owing mainly to
122145/// the number of matched records being easiest to record with low overhead.
123- pub fn half_join_internal_unsafe < G , K , V , R , Tr , FF , CF , DOut , ROut , Y , I , S > (
146+ pub fn half_join_internal_unsafe < G , K , V , R , Tr , FF , CF , Y , S , CB > (
124147 stream : & Collection < G , ( K , V , G :: Timestamp ) , R > ,
125148 mut arrangement : Arranged < G , Tr > ,
126149 frontier_func : FF ,
127150 comparison : CF ,
128151 yield_function : Y ,
129152 mut output_func : S ,
130- ) -> Collection < G , DOut , ROut >
153+ ) -> StreamCore < G , CB :: Container >
131154where
132155 G : Scope < Timestamp = Tr :: Time > ,
133156 K : Hashable + ExchangeData ,
@@ -136,11 +159,9 @@ where
136159 Tr : for < ' a > TraceReader < Key < ' a > : IntoOwned < ' a , Owned = K > > +Clone +' static ,
137160 FF : Fn ( & G :: Timestamp , & mut Antichain < G :: Timestamp > ) + ' static ,
138161 CF : Fn ( Tr :: TimeGat < ' _ > , & Tr :: Time ) -> bool + ' static ,
139- DOut : Clone +' static ,
140- ROut : Semigroup + ' static ,
141162 Y : Fn ( std:: time:: Instant , usize ) -> bool + ' static ,
142- I : IntoIterator < Item = ( DOut , G :: Timestamp , ROut ) > ,
143- S : FnMut ( & K , & V , Tr :: Val < ' _ > , & G :: Timestamp , & G :: Timestamp , & R , & Tr :: Diff ) -> I + ' static ,
163+ S : FnMut ( & mut SessionFor < G , CB > , & K , & V , Tr :: Val < ' _ > , & G :: Timestamp , & R , & mut Vec < ( G :: Timestamp , Tr :: Diff ) > ) + ' static ,
164+ CB : ContainerBuilder ,
144165{
145166 // No need to block physical merging for this operator.
146167 arrangement. trace . set_physical_compaction ( Antichain :: new ( ) . borrow ( ) ) ;
@@ -165,7 +186,7 @@ where
165186 input1. for_each ( |capability, data| {
166187 stash. entry ( capability. retain ( ) )
167188 . or_insert ( Vec :: new ( ) )
168- . extend ( data. drain ( .. ) )
189+ . append ( data)
169190 } ) ;
170191
171192 // Drain input batches; although we do not observe them, we want access to the input
@@ -192,43 +213,22 @@ where
192213 yielded = yielded || yield_function ( timer, work) ;
193214 if !yielded && !input2. frontier . less_equal ( capability. time ( ) ) {
194215
195- let mut session = output. session ( capability) ;
196-
197- // Sort requests by key for in-order cursor traversal.
198- consolidate_updates ( proposals) ;
199-
200- let ( mut cursor, storage) = trace. cursor ( ) ;
216+ let frontier = input2. frontier . frontier ( ) ;
201217
202- // Process proposals one at a time, stopping if we should yield.
203- for & mut ( ( ref key, ref val1, ref time) , ref initial, ref mut diff1) in proposals. iter_mut ( ) {
204- // Use TOTAL ORDER to allow the release of `time`.
205- yielded = yielded || yield_function ( timer, work) ;
206- if !yielded && !input2. frontier . frontier ( ) . iter ( ) . any ( |t| comparison ( <Tr :: TimeGat < ' _ > as IntoOwned >:: borrow_as ( t) , initial) ) {
207- use differential_dataflow:: trace:: cursor:: IntoOwned ;
208- cursor. seek_key ( & storage, IntoOwned :: borrow_as ( key) ) ;
209- if cursor. get_key ( & storage) == Some ( IntoOwned :: borrow_as ( key) ) {
210- while let Some ( val2) = cursor. get_val ( & storage) {
211- cursor. map_times ( & storage, |t, d| {
212- if comparison ( t, initial) {
213- let mut t = t. into_owned ( ) ;
214- t. join_assign ( time) ;
215- output_buffer. push ( ( t, d. into_owned ( ) ) )
216- }
217- } ) ;
218- consolidate ( & mut output_buffer) ;
219- work += output_buffer. len ( ) ;
220- for ( time, diff2) in output_buffer. drain ( ..) {
221- for dout in output_func ( & key, val1, val2, initial, & time, & diff1, & diff2) {
222- session. give ( dout) ;
223- }
224- }
225- cursor. step_val ( & storage) ;
226- }
227- cursor. rewind_vals ( & storage) ;
228- }
229- * diff1 = R :: zero ( ) ;
230- }
231- }
218+ // Update yielded: We can only go from false to {false, true} as
219+ // we're checking that `!yielded` holds before entering this block.
220+ yielded = process_proposals :: < G , _ , _ , _ , _ , _ , _ , _ , _ > (
221+ & comparison,
222+ & yield_function,
223+ & mut output_func,
224+ & mut output_buffer,
225+ timer,
226+ & mut work,
227+ trace,
228+ proposals,
229+ output. session_with_builder ( capability) ,
230+ frontier
231+ ) ;
232232
233233 proposals. retain ( |ptd| !ptd. 2 . is_zero ( ) ) ;
234234
@@ -243,7 +243,7 @@ where
243243 stash_additions
244244 . entry ( capability. delayed ( & antichain[ 0 ] ) )
245245 . or_insert ( Vec :: new ( ) )
246- . extend ( proposals. drain ( .. ) ) ;
246+ . append ( proposals) ;
247247 }
248248 else if antichain. len ( ) > 1 {
249249 // Any remaining times should peel off elements from `proposals`.
@@ -290,5 +290,76 @@ where
290290 arrangement_trace = None ;
291291 }
292292 }
293- } ) . as_collection ( )
293+ } )
294+ }
295+
296+ /// Outlined inner loop for `half_join_internal_unsafe` for reasons of performance.
297+ ///
298+ /// Gives Rust/LLVM the opportunity to inline the loop body instead of inlining the loop and
299+ /// leaving all calls in the loop body outlined.
300+ ///
301+ /// Consumes proposals until the yield function returns `true` or all proposals are processed.
302+ /// Leaves a zero diff in place for all proposals that were processed.
303+ ///
304+ /// Returns `true` if the operator should yield.
305+ fn process_proposals < G , Tr , CF , Y , S , CB , K , V , R > (
306+ comparison : & CF ,
307+ yield_function : & Y ,
308+ output_func : & mut S ,
309+ mut output_buffer : & mut Vec < ( <Tr as TraceReader >:: Time , <Tr as TraceReader >:: Diff ) > ,
310+ timer : Instant ,
311+ work : & mut usize ,
312+ trace : & mut Tr ,
313+ proposals : & mut Vec < ( ( K , V , <Tr as TraceReader >:: Time ) , <Tr as TraceReader >:: Time , R ) > ,
314+ mut session : SessionFor < G , CB > ,
315+ frontier : AntichainRef < <Tr as TraceReader >:: Time >
316+ ) -> bool
317+ where
318+ G : Scope < Timestamp = Tr :: Time > ,
319+ Tr : for < ' a > TraceReader < Key < ' a > : IntoOwned < ' a , Owned = K > > ,
320+ CF : Fn ( Tr :: TimeGat < ' _ > , & Tr :: Time ) -> bool + ' static ,
321+ Y : Fn ( Instant , usize ) -> bool + ' static ,
322+ S : FnMut ( & mut SessionFor < G , CB > , & K , & V , Tr :: Val < ' _ > , & G :: Timestamp , & R , & mut Vec < ( G :: Timestamp , Tr :: Diff ) > ) + ' static ,
323+ CB : ContainerBuilder ,
324+ K : Ord ,
325+ V : Ord ,
326+ R : Monoid ,
327+ {
328+ // Sort requests by key for in-order cursor traversal.
329+ consolidate_updates ( proposals) ;
330+
331+ let ( mut cursor, storage) = trace. cursor ( ) ;
332+ let mut yielded = false ;
333+
334+ // Process proposals one at a time, stopping if we should yield.
335+ for ( ( ref key, ref val1, ref time) , ref initial, ref mut diff1) in proposals. iter_mut ( ) {
336+ // Use TOTAL ORDER to allow the release of `time`.
337+ yielded = yielded || yield_function ( timer, * work) ;
338+ if !yielded && !frontier. iter ( ) . any ( |t| comparison ( <Tr :: TimeGat < ' _ > as IntoOwned >:: borrow_as ( t) , initial) ) {
339+ use differential_dataflow:: trace:: cursor:: IntoOwned ;
340+ cursor. seek_key ( & storage, IntoOwned :: borrow_as ( key) ) ;
341+ if cursor. get_key ( & storage) == Some ( IntoOwned :: borrow_as ( key) ) {
342+ while let Some ( val2) = cursor. get_val ( & storage) {
343+ cursor. map_times ( & storage, |t, d| {
344+ if comparison ( t, initial) {
345+ let mut t = t. into_owned ( ) ;
346+ t. join_assign ( time) ;
347+ output_buffer. push ( ( t, d. into_owned ( ) ) )
348+ }
349+ } ) ;
350+ consolidate ( & mut output_buffer) ;
351+ * work += output_buffer. len ( ) ;
352+ output_func ( & mut session, key, val1, val2, initial, diff1, & mut output_buffer) ;
353+ // Defensive clear; we'd expect `output_func` to clear the buffer.
354+ // TODO: Should we assert it is empty?
355+ output_buffer. clear ( ) ;
356+ cursor. step_val ( & storage) ;
357+ }
358+ cursor. rewind_vals ( & storage) ;
359+ }
360+ * diff1 = R :: zero ( ) ;
361+ }
362+ }
363+
364+ yielded
294365}
0 commit comments