@@ -54,38 +54,29 @@ impl<G: Scope, C: Container + DrainContainer> Partition<G, C> for StreamCore<G,
5454 streams. push ( stream) ;
5555 }
5656
57- let mut caps = BTreeMap :: default ( ) ;
58-
5957 builder. build ( move |_| {
60- let mut todo: BTreeMap < _ , Vec < _ > > = Default :: default ( ) ;
6158 move |_frontiers| {
6259 let mut handles = outputs. iter_mut ( ) . map ( |o| o. activate ( ) ) . collect :: < Vec < _ > > ( ) ;
63- let mut parts = BTreeMap :: < u64 , Vec < _ > > :: default ( ) ;
64- while let Some ( ( cap, data) ) = input. next ( ) {
65- todo. entry ( cap. time ( ) . clone ( ) ) . or_default ( ) . push ( std:: mem:: take ( data) ) ;
66- caps. insert ( cap. time ( ) . clone ( ) , cap) ;
67- }
68-
69- while let Some ( ( time, dataz) ) = todo. pop_first ( ) {
70- let cap = caps. remove ( & time) . unwrap ( ) ;
71- for mut data in dataz. into_iter ( ) {
72- for datum in data. drain ( ) {
73- let ( part, datum) = route ( datum) ;
74- parts. entry ( part) . or_default ( ) . push ( datum) ;
75- }
60+ let mut targets = BTreeMap :: < u64 , Vec < _ > > :: default ( ) ;
61+ input. for_each_time ( |time, data| {
62+ // Sort data by intended output.
63+ for datum in data. flat_map ( |d| d. drain ( ) ) {
64+ let ( part, datum) = route ( datum) ;
65+ targets. entry ( part) . or_default ( ) . push ( datum) ;
7666 }
77- while let Some ( ( part, data) ) = parts. pop_first ( ) {
67+ // Form each intended output into a container and ship.
68+ while let Some ( ( part, data) ) = targets. pop_first ( ) {
7869 for datum in data. into_iter ( ) {
7970 c_build. push_into ( datum) ;
8071 while let Some ( container) = c_build. extract ( ) {
81- handles[ part as usize ] . give ( & cap , container) ;
72+ handles[ part as usize ] . give ( & time , container) ;
8273 }
8374 }
8475 while let Some ( container) = c_build. finish ( ) {
85- handles[ part as usize ] . give ( & cap , container) ;
76+ handles[ part as usize ] . give ( & time , container) ;
8677 }
8778 }
88- }
79+ } ) ;
8980 }
9081 } ) ;
9182
0 commit comments