@@ -323,10 +323,9 @@ pub mod storage {
323323 impl < U : Update > ValStorage < U > {
324324
325325 /// Forms `Self` from sorted update tuples.
326- pub fn form < ' a > ( sorted : impl Iterator < Item = columnar:: Ref < ' a , Tuple < U > > > ) -> Self {
326+ pub fn form < ' a > ( mut sorted : impl Iterator < Item = columnar:: Ref < ' a , Tuple < U > > > ) -> Self {
327327
328328 let mut output = Self :: default ( ) ;
329- let mut sorted = sorted. peekable ( ) ;
330329
331330 if let Some ( ( key, val, time, diff) ) = sorted. next ( ) {
332331 output. keys . push ( key) ;
@@ -429,14 +428,11 @@ pub mod storage {
429428 impl < U : Update > KeyStorage < U > {
430429
431430 /// Forms `Self` from sorted update tuples.
432- pub fn form < ' a > ( sorted : impl Iterator < Item = columnar:: Ref < ' a , Tuple < U > > > ) -> Self {
431+ pub fn form < ' a > ( mut sorted : impl Iterator < Item = columnar:: Ref < ' a , Tuple < U > > > ) -> Self {
433432
434433 let mut keys: ContainerOf < U :: Key > = Default :: default ( ) ;
435434 let mut upds: Vecs < ( ContainerOf < U :: Time > , ContainerOf < U :: Diff > ) > = Default :: default ( ) ;
436435
437- // let mut output = Self::default();
438- let mut sorted = sorted. peekable ( ) ;
439-
440436 if let Some ( ( key, time, diff) ) = sorted. next ( ) {
441437 keys. push ( key) ;
442438 upds. values . push ( ( time, diff) ) ;
@@ -666,7 +662,6 @@ mod distributor {
666662 use std:: rc:: Rc ;
667663
668664 use columnar:: { Index , Len } ;
669- use timely:: container:: { ContainerBuilder , PushInto } ;
670665 use timely:: logging:: TimelyLogger ;
671666 use timely:: dataflow:: channels:: pushers:: { Exchange , exchange:: Distributor } ;
672667 use timely:: dataflow:: channels:: Message ;
@@ -675,34 +670,39 @@ mod distributor {
675670 use timely:: worker:: AsWorker ;
676671
677672 use crate :: layout:: ColumnarUpdate as Update ;
678- use crate :: { KeyColBuilder , KeyStorage } ;
673+ use crate :: KeyStorage ;
679674
680675 pub struct KeyDistributor < U : Update , H > {
681- builders : Vec < KeyColBuilder < U > > ,
676+ marker : std :: marker :: PhantomData < U > ,
682677 hashfunc : H ,
683678 }
684679
685680 impl < U : Update , H : for < ' a > FnMut ( columnar:: Ref < ' a , U :: Key > ) ->u64 > Distributor < KeyStorage < U > > for KeyDistributor < U , H > {
686681 fn partition < T : Clone , P : timely:: communication:: Push < Message < T , KeyStorage < U > > > > ( & mut self , container : & mut KeyStorage < U > , time : & T , pushers : & mut [ P ] ) {
687- // For each key, partition and copy (key, time, diff) into the appropriate self.builder.
688- for index in 0 .. container. keys . borrow ( ) . len ( ) {
689- let key = container. keys . borrow ( ) . get ( index) ;
690- let idx = ( ( self . hashfunc ) ( key) as usize ) % self . builders . len ( ) ;
691- for ( t, diff) in container. upds . borrow ( ) . get ( index) . into_index_iter ( ) {
692- self . builders [ idx] . push_into ( ( key, t, diff) ) ;
693- }
694- while let Some ( produced) = self . builders [ idx] . extract ( ) {
695- Message :: push_at ( produced, time. clone ( ) , & mut pushers[ idx] ) ;
696- }
682+
683+ use columnar:: { ContainerOf , Vecs , Container , Push } ;
684+ use crate :: Column ;
685+
686+ let in_keys = container. keys . borrow ( ) ;
687+ let in_upds = container. upds . borrow ( ) ;
688+
689+ // We build bespoke containers by determining the target for each key using `self.hashfunc`, and then copying in key and associated data.
690+ // We bypass the container builders, which do much work to go from tuples to columnar containers, and we save time by avoiding that round trip.
691+ let mut out_keys = vec ! [ ContainerOf :: <U :: Key >:: default ( ) ; pushers. len( ) ] ;
692+ let mut out_upds = vec ! [ Vecs :: <( ContainerOf :: <U :: Time >, ContainerOf :: <U :: Diff >) >:: default ( ) ; pushers. len( ) ] ;
693+ for index in 0 .. in_keys. len ( ) {
694+ let key = in_keys. get ( index) ;
695+ let idx = ( ( self . hashfunc ) ( key) as usize ) % pushers. len ( ) ;
696+ out_keys[ idx] . push ( key) ;
697+ out_upds[ idx] . extend_from_self ( in_upds, index..index+1 ) ;
697698 }
698- }
699- fn flush < T : Clone , P : timely:: communication:: Push < Message < T , KeyStorage < U > > > > ( & mut self , time : & T , pushers : & mut [ P ] ) {
700- for ( builder, pusher) in self . builders . iter_mut ( ) . zip ( pushers. iter_mut ( ) ) {
701- while let Some ( container) = builder. finish ( ) {
702- Message :: push_at ( container, time. clone ( ) , pusher) ;
703- }
699+
700+ for ( ( pusher, keys) , upds) in pushers. iter_mut ( ) . zip ( out_keys) . zip ( out_upds) {
701+ let mut container = KeyStorage { keys : Column :: Typed ( keys) , upds : Column :: Typed ( upds) } ;
702+ Message :: push_at ( & mut container, time. clone ( ) , pusher) ;
704703 }
705704 }
705+ fn flush < T : Clone , P : timely:: communication:: Push < Message < T , KeyStorage < U > > > > ( & mut self , _time : & T , _pushers : & mut [ P ] ) { }
706706 fn relax ( & mut self ) { }
707707 }
708708
@@ -726,7 +726,7 @@ mod distributor {
726726 let ( senders, receiver) = allocator. allocate :: < Message < T , KeyStorage < U > > > ( identifier, address) ;
727727 let senders = senders. into_iter ( ) . enumerate ( ) . map ( |( i, x) | LogPusher :: new ( x, allocator. index ( ) , i, identifier, logging. clone ( ) ) ) . collect :: < Vec < _ > > ( ) ;
728728 let distributor = KeyDistributor {
729- builders : std:: iter :: repeat_with ( Default :: default ) . take ( allocator . peers ( ) ) . collect ( ) ,
729+ marker : std:: marker :: PhantomData ,
730730 hashfunc : self . hashfunc ,
731731 } ;
732732 ( Exchange :: new ( senders, distributor) , LogPuller :: new ( receiver, allocator. index ( ) , identifier, logging. clone ( ) ) )
0 commit comments