@@ -30,7 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
3030use crate :: difference:: Semigroup ;
3131use crate :: lattice:: Lattice ;
3232use crate :: trace:: { self , Trace , TraceReader , Batch , BatchReader , Batcher , Builder , Cursor } ;
33- use crate :: trace:: implementations:: { KeyBatcher , KeySpine , ValBatcher , ValSpine } ;
33+ use crate :: trace:: implementations:: { KeyBatcher , KeyBuilder , KeySpine , ValBatcher , ValBuilder , ValSpine } ;
3434
3535use trace:: wrappers:: enter:: { TraceEnter , BatchEnter , } ;
3636use trace:: wrappers:: enter_at:: TraceEnter as TraceEnterAt ;
@@ -289,7 +289,7 @@ where
289289 T1 : TraceReader + Clone + ' static ,
290290{
291291 /// A direct implementation of `ReduceCore::reduce_abelian`.
292- pub fn reduce_abelian < L , K , V , T2 > ( & self , name : & str , mut logic : L ) -> Arranged < G , TraceAgent < T2 > >
292+ pub fn reduce_abelian < L , K , V , Bu , T2 > ( & self , name : & str , mut logic : L ) -> Arranged < G , TraceAgent < T2 > >
293293 where
294294 for < ' a > T1 :: Key < ' a > : IntoOwned < ' a , Owned = K > ,
295295 T2 : for < ' a > Trace < Key < ' a > = T1 :: Key < ' a > , Time =T1 :: Time > +' static ,
@@ -298,10 +298,11 @@ where
298298 for < ' a > T2 :: Val < ' a > : IntoOwned < ' a , Owned = V > ,
299299 T2 :: Diff : Abelian ,
300300 T2 :: Batch : Batch ,
301- <T2 :: Builder as Builder >:: Input : Container + PushInto < ( ( K , V ) , T2 :: Time , T2 :: Diff ) > ,
301+ Bu : Builder < Time =G :: Timestamp , Output = T2 :: Batch > ,
302+ Bu :: Input : Container + PushInto < ( ( K , V ) , T2 :: Time , T2 :: Diff ) > ,
302303 L : FnMut ( T1 :: Key < ' _ > , & [ ( T1 :: Val < ' _ > , T1 :: Diff ) ] , & mut Vec < ( V , T2 :: Diff ) > ) +' static ,
303304 {
304- self . reduce_core :: < _ , K , V , T2 > ( name, move |key, input, output, change| {
305+ self . reduce_core :: < _ , K , V , Bu , T2 > ( name, move |key, input, output, change| {
305306 if !input. is_empty ( ) {
306307 logic ( key, input, change) ;
307308 }
@@ -311,19 +312,20 @@ where
311312 }
312313
313314 /// A direct implementation of `ReduceCore::reduce_core`.
314- pub fn reduce_core < L , K , V , T2 > ( & self , name : & str , logic : L ) -> Arranged < G , TraceAgent < T2 > >
315+ pub fn reduce_core < L , K , V , Bu , T2 > ( & self , name : & str , logic : L ) -> Arranged < G , TraceAgent < T2 > >
315316 where
316317 for < ' a > T1 :: Key < ' a > : IntoOwned < ' a , Owned = K > ,
317318 T2 : for < ' a > Trace < Key < ' a > =T1 :: Key < ' a > , Time =T1 :: Time > +' static ,
318319 K : Ord + ' static ,
319320 V : Data ,
320321 for < ' a > T2 :: Val < ' a > : IntoOwned < ' a , Owned = V > ,
321322 T2 :: Batch : Batch ,
322- <T2 :: Builder as Builder >:: Input : Container + PushInto < ( ( K , V ) , T2 :: Time , T2 :: Diff ) > ,
323+ Bu : Builder < Time =G :: Timestamp , Output = T2 :: Batch > ,
324+ Bu :: Input : Container + PushInto < ( ( K , V ) , T2 :: Time , T2 :: Diff ) > ,
323325 L : FnMut ( T1 :: Key < ' _ > , & [ ( T1 :: Val < ' _ > , T1 :: Diff ) ] , & mut Vec < ( V , T2 :: Diff ) > , & mut Vec < ( V , T2 :: Diff ) > ) +' static ,
324326 {
325327 use crate :: operators:: reduce:: reduce_trace;
326- reduce_trace :: < _ , _ , _ , _ , V , _ > ( self , name, logic)
328+ reduce_trace :: < _ , _ , Bu , _ , _ , V , _ > ( self , name, logic)
327329 }
328330}
329331
@@ -353,23 +355,23 @@ where
353355 G :: Timestamp : Lattice ,
354356{
355357 /// Arranges updates into a shared trace.
356- fn arrange < Ba , Tr > ( & self ) -> Arranged < G , TraceAgent < Tr > >
358+ fn arrange < Ba , Bu , Tr > ( & self ) -> Arranged < G , TraceAgent < Tr > >
357359 where
358360 Ba : Batcher < Input =C , Time =G :: Timestamp > + ' static ,
361+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
359362 Tr : Trace < Time =G :: Timestamp > + ' static ,
360363 Tr :: Batch : Batch ,
361- Tr :: Builder : Builder < Input =Ba :: Output > ,
362364 {
363- self . arrange_named :: < Ba , Tr > ( "Arrange" )
365+ self . arrange_named :: < Ba , Bu , Tr > ( "Arrange" )
364366 }
365367
366368 /// Arranges updates into a shared trace, with a supplied name.
367- fn arrange_named < Ba , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
369+ fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
368370 where
369371 Ba : Batcher < Input =C , Time =G :: Timestamp > + ' static ,
372+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
370373 Tr : Trace < Time =G :: Timestamp > + ' static ,
371374 Tr :: Batch : Batch ,
372- Tr :: Builder : Builder < Input =Ba :: Output > ,
373375 ;
374376}
375377
@@ -381,15 +383,15 @@ where
381383 V : ExchangeData ,
382384 R : ExchangeData + Semigroup ,
383385{
384- fn arrange_named < Ba , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
386+ fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
385387 where
386388 Ba : Batcher < Input =Vec < ( ( K , V ) , G :: Timestamp , R ) > , Time =G :: Timestamp > + ' static ,
389+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
387390 Tr : Trace < Time =G :: Timestamp > + ' static ,
388391 Tr :: Batch : Batch ,
389- Tr :: Builder : Builder < Input =Ba :: Output > ,
390392 {
391393 let exchange = Exchange :: new ( move |update : & ( ( K , V ) , G :: Timestamp , R ) | ( update. 0 ) . 0 . hashed ( ) . into ( ) ) ;
392- arrange_core :: < _ , _ , Ba , _ > ( & self . inner , exchange, name)
394+ arrange_core :: < _ , _ , Ba , Bu , _ > ( & self . inner , exchange, name)
393395 }
394396}
395397
@@ -398,16 +400,16 @@ where
398400/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
399401/// It uses the supplied parallelization contract to distribute the data, which does not need to
400402/// be consistently by key (though this is the most common).
401- pub fn arrange_core < G , P , Ba , Tr > ( stream : & StreamCore < G , Ba :: Input > , pact : P , name : & str ) -> Arranged < G , TraceAgent < Tr > >
403+ pub fn arrange_core < G , P , Ba , Bu , Tr > ( stream : & StreamCore < G , Ba :: Input > , pact : P , name : & str ) -> Arranged < G , TraceAgent < Tr > >
402404where
403405 G : Scope ,
404406 G :: Timestamp : Lattice ,
405407 P : ParallelizationContract < G :: Timestamp , Ba :: Input > ,
406408 Ba : Batcher < Time =G :: Timestamp > + ' static ,
407409 Ba :: Input : Container ,
410+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
408411 Tr : Trace < Time =G :: Timestamp > +' static ,
409412 Tr :: Batch : Batch ,
410- Tr :: Builder : Builder < Input =Ba :: Output > ,
411413{
412414 // The `Arrange` operator is tasked with reacting to an advancing input
413415 // frontier by producing the sequence of batches whose lower and upper
@@ -515,7 +517,7 @@ where
515517 }
516518
517519 // Extract updates not in advance of `upper`.
518- let batch = batcher. seal :: < Tr :: Builder > ( upper. clone ( ) ) ;
520+ let batch = batcher. seal :: < Bu > ( upper. clone ( ) ) ;
519521
520522 writer. insert ( batch. clone ( ) , Some ( capability. time ( ) . clone ( ) ) ) ;
521523
@@ -543,7 +545,7 @@ where
543545 }
544546 else {
545547 // Announce progress updates, even without data.
546- let _batch = batcher. seal :: < Tr :: Builder > ( input. frontier ( ) . frontier ( ) . to_owned ( ) ) ;
548+ let _batch = batcher. seal :: < Bu > ( input. frontier ( ) . frontier ( ) . to_owned ( ) ) ;
547549 writer. seal ( input. frontier ( ) . frontier ( ) . to_owned ( ) ) ;
548550 }
549551
@@ -562,15 +564,15 @@ impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, V
562564where
563565 G :: Timestamp : Lattice +Ord ,
564566{
565- fn arrange_named < Ba , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
567+ fn arrange_named < Ba , Bu , Tr > ( & self , name : & str ) -> Arranged < G , TraceAgent < Tr > >
566568 where
567569 Ba : Batcher < Input =Vec < ( ( K , ( ) ) , G :: Timestamp , R ) > , Time =G :: Timestamp > + ' static ,
570+ Bu : Builder < Time =G :: Timestamp , Input =Ba :: Output , Output = Tr :: Batch > ,
568571 Tr : Trace < Time =G :: Timestamp > + ' static ,
569572 Tr :: Batch : Batch ,
570- Tr :: Builder : Builder < Input =Ba :: Output > ,
571573 {
572574 let exchange = Exchange :: new ( move |update : & ( ( K , ( ) ) , G :: Timestamp , R ) | ( update. 0 ) . 0 . hashed ( ) . into ( ) ) ;
573- arrange_core :: < _ , _ , Ba , _ > ( & self . map ( |k| ( k, ( ) ) ) . inner , exchange, name)
575+ arrange_core :: < _ , _ , Ba , Bu , _ > ( & self . map ( |k| ( k, ( ) ) ) . inner , exchange, name)
574576 }
575577}
576578
@@ -601,7 +603,7 @@ where
601603 }
602604
603605 fn arrange_by_key_named ( & self , name : & str ) -> Arranged < G , TraceAgent < ValSpine < K , V , G :: Timestamp , R > > > {
604- self . arrange_named :: < ValBatcher < _ , _ , _ , _ > , _ > ( name)
606+ self . arrange_named :: < ValBatcher < _ , _ , _ , _ > , ValBuilder < _ , _ , _ , _ > , _ > ( name)
605607 }
606608}
607609
@@ -636,6 +638,6 @@ where
636638
637639 fn arrange_by_self_named ( & self , name : & str ) -> Arranged < G , TraceAgent < KeySpine < K , G :: Timestamp , R > > > {
638640 self . map ( |k| ( k, ( ) ) )
639- . arrange_named :: < KeyBatcher < _ , _ , _ > , _ > ( name)
641+ . arrange_named :: < KeyBatcher < _ , _ , _ > , KeyBuilder < _ , _ , _ > , _ > ( name)
640642 }
641643}
0 commit comments