@@ -587,7 +587,7 @@ pub mod dd_builder {
587587 use differential_dataflow:: trace:: implementations:: Layout ;
588588 use differential_dataflow:: trace:: implementations:: Update ;
589589 use differential_dataflow:: trace:: implementations:: BatchContainer ;
590- use differential_dataflow:: trace:: implementations:: ord_neu:: { OrdValBatch , val_batch:: OrdValStorage , OrdKeyBatch } ;
590+ use differential_dataflow:: trace:: implementations:: ord_neu:: { OrdValBatch , val_batch:: OrdValStorage , OrdKeyBatch , Vals , Upds , layers :: UpdsBuilder } ;
591591 use differential_dataflow:: trace:: implementations:: ord_neu:: key_batch:: OrdKeyStorage ;
592592 use crate :: Column ;
593593
@@ -604,44 +604,7 @@ pub mod dd_builder {
604604 ///
605605 /// This is public to allow container implementors to set and inspect their container.
606606 pub result : OrdValStorage < L > ,
607- singleton : Option < ( <L :: Target as Update >:: Time , <L :: Target as Update >:: Diff ) > ,
608- /// Counts the number of singleton optimizations we performed.
609- ///
610- /// This number allows us to correctly gauge the total number of updates reflected in a batch,
611- /// even though `updates.len()` may be much shorter than this amount.
612- singletons : usize ,
613- }
614-
615- impl < L : Layout > OrdValBuilder < L > {
616- /// Pushes a single update, which may set `self.singleton` rather than push.
617- ///
618- /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
619- /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
620- /// to encode a singleton update with an "absert" update: repeating the most recent offset.
621- /// This otherwise invalid state encodes "look back one element".
622- ///
623- /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
624- /// previously pushed update exactly. In that case, we do not push the update into `updates`.
625- /// The update tuple is retained in `self.singleton` in case we see another update and need
626- /// to recover the singleton to push it into `updates` to join the second update.
627- fn push_update ( & mut self , time : <L :: Target as Update >:: Time , diff : <L :: Target as Update >:: Diff ) {
628- // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
629- if self . result . times . last ( ) . map ( |t| t == <<L :: TimeContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & time) ) == Some ( true ) &&
630- self . result . diffs . last ( ) . map ( |d| d == <<L :: DiffContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & diff) ) == Some ( true )
631- {
632- assert ! ( self . singleton. is_none( ) ) ;
633- self . singleton = Some ( ( time, diff) ) ;
634- }
635- else {
636- // If we have pushed a single element, we need to copy it out to meet this one.
637- if let Some ( ( time, diff) ) = self . singleton . take ( ) {
638- self . result . times . push ( time) ;
639- self . result . diffs . push ( diff) ;
640- }
641- self . result . times . push ( time) ;
642- self . result . diffs . push ( diff) ;
643- }
644- }
607+ staging : UpdsBuilder < L :: TimeContainer , L :: DiffContainer > ,
645608 }
646609
647610 // The layout `L` determines the key, val, time, and diff types.
@@ -663,18 +626,13 @@ pub mod dd_builder {
663626 type Output = OrdValBatch < L > ;
664627
665628 fn with_capacity ( keys : usize , vals : usize , upds : usize ) -> Self {
666- // We don't introduce zero offsets as they will be introduced by the first `push` call.
667629 Self {
668630 result : OrdValStorage {
669631 keys : L :: KeyContainer :: with_capacity ( keys) ,
670- keys_offs : L :: OffsetContainer :: with_capacity ( keys + 1 ) ,
671- vals : L :: ValContainer :: with_capacity ( vals) ,
672- vals_offs : L :: OffsetContainer :: with_capacity ( vals + 1 ) ,
673- times : L :: TimeContainer :: with_capacity ( upds) ,
674- diffs : L :: DiffContainer :: with_capacity ( upds) ,
632+ vals : Vals :: with_capacity ( keys + 1 , vals) ,
633+ upds : Upds :: with_capacity ( vals + 1 , upds) ,
675634 } ,
676- singleton : None ,
677- singletons : 0 ,
635+ staging : UpdsBuilder :: default ( ) ,
678636 }
679637 }
680638
@@ -696,39 +654,40 @@ pub mod dd_builder {
696654 let time = <<L :: TimeContainer as BatchContainer >:: Owned as Columnar >:: into_owned ( time) ;
697655 let diff = <<L :: DiffContainer as BatchContainer >:: Owned as Columnar >:: into_owned ( diff) ;
698656
657+ // Pre-load the first update.
658+ if self . result . keys . is_empty ( ) {
659+ self . result . vals . vals . push ( & val) ;
660+ self . result . keys . push ( & key) ;
661+ self . staging . push ( time, diff) ;
662+ }
699663 // Perhaps this is a continuation of an already received key.
700- if self . result . keys . last ( ) . map ( |k| <<L :: KeyContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & key) . eq ( & k) ) . unwrap_or ( false ) {
664+ else if self . result . keys . last ( ) . map ( |k| <<L :: KeyContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & key) . eq ( & k) ) . unwrap_or ( false ) {
701665 // Perhaps this is a continuation of an already received value.
702- if self . result . vals . last ( ) . map ( |v| <<L :: ValContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & val) . eq ( & v) ) . unwrap_or ( false ) {
703- self . push_update ( time, diff) ;
666+ if self . result . vals . vals . last ( ) . map ( |v| <<L :: ValContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & val) . eq ( & v) ) . unwrap_or ( false ) {
667+ self . staging . push ( time, diff) ;
704668 } else {
705669 // New value; complete representation of prior value.
706- self . result . vals_offs . push ( self . result . times . len ( ) ) ;
707- if self . singleton . take ( ) . is_some ( ) { self . singletons += 1 ; }
708- self . push_update ( time, diff) ;
709- self . result . vals . push ( & val) ;
670+ self . staging . seal ( & mut self . result . upds ) ;
671+ self . staging . push ( time, diff) ;
672+ self . result . vals . vals . push ( & val) ;
710673 }
711674 } else {
712675 // New key; complete representation of prior key.
713- self . result . vals_offs . push ( self . result . times . len ( ) ) ;
714- if self . singleton . take ( ) . is_some ( ) { self . singletons += 1 ; }
715- self . result . keys_offs . push ( self . result . vals . len ( ) ) ;
716- self . push_update ( time, diff) ;
717- self . result . vals . push ( & val) ;
676+ self . staging . seal ( & mut self . result . upds ) ;
677+ self . staging . push ( time, diff) ;
678+ self . result . vals . offs . push ( self . result . vals . len ( ) ) ;
679+ self . result . vals . vals . push ( & val) ;
718680 self . result . keys . push ( & key) ;
719681 }
720682 }
721683 }
722684
723685 #[ inline( never) ]
724686 fn done ( mut self , description : Description < Self :: Time > ) -> OrdValBatch < L > {
725- // Record the final offsets
726- self . result . vals_offs . push ( self . result . times . len ( ) ) ;
727- // Remove any pending singleton, and if it was set increment our count.
728- if self . singleton . take ( ) . is_some ( ) { self . singletons += 1 ; }
729- self . result . keys_offs . push ( self . result . vals . len ( ) ) ;
687+ self . staging . seal ( & mut self . result . upds ) ;
688+ self . result . vals . offs . push ( self . result . vals . len ( ) ) ;
730689 OrdValBatch {
731- updates : self . result . times . len ( ) + self . singletons ,
690+ updates : self . staging . total ( ) ,
732691 storage : self . result ,
733692 description,
734693 }
@@ -752,44 +711,7 @@ pub mod dd_builder {
752711 ///
753712 /// This is public to allow container implementors to set and inspect their container.
754713 pub result : OrdKeyStorage < L > ,
755- singleton : Option < ( <L :: Target as Update >:: Time , <L :: Target as Update >:: Diff ) > ,
756- /// Counts the number of singleton optimizations we performed.
757- ///
758- /// This number allows us to correctly gauge the total number of updates reflected in a batch,
759- /// even though `updates.len()` may be much shorter than this amount.
760- singletons : usize ,
761- }
762-
763- impl < L : Layout > OrdKeyBuilder < L > {
764- /// Pushes a single update, which may set `self.singleton` rather than push.
765- ///
766- /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
767- /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
768- /// to encode a singleton update with an "absert" update: repeating the most recent offset.
769- /// This otherwise invalid state encodes "look back one element".
770- ///
771- /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
772- /// previously pushed update exactly. In that case, we do not push the update into `updates`.
773- /// The update tuple is retained in `self.singleton` in case we see another update and need
774- /// to recover the singleton to push it into `updates` to join the second update.
775- fn push_update ( & mut self , time : <L :: Target as Update >:: Time , diff : <L :: Target as Update >:: Diff ) {
776- // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
777- if self . result . times . last ( ) . map ( |t| t == <<L :: TimeContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & time) ) == Some ( true ) &&
778- self . result . diffs . last ( ) . map ( |d| d == <<L :: DiffContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & diff) ) == Some ( true )
779- {
780- assert ! ( self . singleton. is_none( ) ) ;
781- self . singleton = Some ( ( time, diff) ) ;
782- }
783- else {
784- // If we have pushed a single element, we need to copy it out to meet this one.
785- if let Some ( ( time, diff) ) = self . singleton . take ( ) {
786- self . result . times . push ( time) ;
787- self . result . diffs . push ( diff) ;
788- }
789- self . result . times . push ( time) ;
790- self . result . diffs . push ( diff) ;
791- }
792- }
714+ staging : UpdsBuilder < L :: TimeContainer , L :: DiffContainer > ,
793715 }
794716
795717 // The layout `L` determines the key, val, time, and diff types.
@@ -811,16 +733,12 @@ pub mod dd_builder {
811733 type Output = OrdKeyBatch < L > ;
812734
813735 fn with_capacity ( keys : usize , _vals : usize , upds : usize ) -> Self {
814- // We don't introduce zero offsets as they will be introduced by the first `push` call.
815736 Self {
816737 result : OrdKeyStorage {
817738 keys : L :: KeyContainer :: with_capacity ( keys) ,
818- keys_offs : L :: OffsetContainer :: with_capacity ( keys + 1 ) ,
819- times : L :: TimeContainer :: with_capacity ( upds) ,
820- diffs : L :: DiffContainer :: with_capacity ( upds) ,
739+ upds : Upds :: with_capacity ( keys + 1 , upds) ,
821740 } ,
822- singleton : None ,
823- singletons : 0 ,
741+ staging : UpdsBuilder :: default ( ) ,
824742 }
825743 }
826744
@@ -841,27 +759,28 @@ pub mod dd_builder {
841759 let time = <<L :: TimeContainer as BatchContainer >:: Owned as Columnar >:: into_owned ( time) ;
842760 let diff = <<L :: DiffContainer as BatchContainer >:: Owned as Columnar >:: into_owned ( diff) ;
843761
762+ // Pre-load the first update.
763+ if self . result . keys . is_empty ( ) {
764+ self . result . keys . push ( & key) ;
765+ self . staging . push ( time, diff) ;
766+ }
844767 // Perhaps this is a continuation of an already received key.
845- if self . result . keys . last ( ) . map ( |k| <<L :: KeyContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & key) . eq ( & k) ) . unwrap_or ( false ) {
846- self . push_update ( time, diff) ;
768+ else if self . result . keys . last ( ) . map ( |k| <<L :: KeyContainer as BatchContainer >:: ReadItem < ' _ > as IntoOwned >:: borrow_as ( & key) . eq ( & k) ) . unwrap_or ( false ) {
769+ self . staging . push ( time, diff) ;
847770 } else {
848771 // New key; complete representation of prior key.
849- self . result . keys_offs . push ( self . result . times . len ( ) ) ;
850- if self . singleton . take ( ) . is_some ( ) { self . singletons += 1 ; }
851- self . push_update ( time, diff) ;
772+ self . staging . seal ( & mut self . result . upds ) ;
773+ self . staging . push ( time, diff) ;
852774 self . result . keys . push ( & key) ;
853775 }
854776 }
855777 }
856778
857779 #[ inline( never) ]
858780 fn done ( mut self , description : Description < Self :: Time > ) -> OrdKeyBatch < L > {
859- // Record the final offsets
860- self . result . keys_offs . push ( self . result . times . len ( ) ) ;
861- // Remove any pending singleton, and if it was set increment our count.
862- if self . singleton . take ( ) . is_some ( ) { self . singletons += 1 ; }
781+ self . staging . seal ( & mut self . result . upds ) ;
863782 OrdKeyBatch {
864- updates : self . result . times . len ( ) + self . singletons ,
783+ updates : self . staging . total ( ) ,
865784 storage : self . result ,
866785 description,
867786 }
0 commit comments