@@ -4,6 +4,7 @@ use timely_container::{Container, ContainerBuilder, PushInto};
44
55use crate :: dataflow:: channels:: pact:: Pipeline ;
66use crate :: dataflow:: operators:: generic:: builder_rc:: OperatorBuilder ;
7+ use crate :: dataflow:: operators:: InputCapability ;
78use crate :: dataflow:: { Scope , StreamCore } ;
89use crate :: Data ;
910
@@ -55,19 +56,38 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
5556 }
5657
5758 builder. build ( move |_| {
59+ let mut todo = vec ! [ ] ;
5860 move |_frontiers| {
5961 let mut handles = outputs. iter_mut ( ) . map ( |o| o. activate ( ) ) . collect :: < Vec < _ > > ( ) ;
60- input. for_each ( |time, data| {
61- let mut sessions = handles
62- . iter_mut ( )
63- . map ( |h| h. session_with_builder ( & time) )
64- . collect :: < Vec < _ > > ( ) ;
6562
63+ // The capability associated with each session in `sessions`.
64+ let mut sessions_cap: Option < InputCapability < G :: Timestamp > > = None ;
65+ let mut sessions = vec ! [ ] ;
66+
67+ while let Some ( ( cap, data) ) = input. next ( ) {
68+ todo. push ( ( cap, std:: mem:: take ( data) ) ) ;
69+ }
70+ todo. sort_unstable_by ( |a, b| a. 0 . cmp ( & b. 0 ) ) ;
71+
72+ for ( cap, mut data) in todo. drain ( ..) {
73+ if sessions_cap. as_ref ( ) . map_or ( true , |s_cap| s_cap. time ( ) != cap. time ( ) ) {
74+ sessions = handles. iter_mut ( ) . map ( |h| ( None , Some ( h) ) ) . collect ( ) ;
75+ sessions_cap = Some ( cap) ;
76+ }
6677 for datum in data. drain ( ) {
6778 let ( part, datum2) = route ( datum) ;
68- sessions[ part as usize ] . give ( datum2) ;
79+
80+ let session = match sessions[ part as usize ] {
81+ ( Some ( ref mut s) , _) => s,
82+ ( ref mut session_slot, ref mut handle) => {
83+ let handle = handle. take ( ) . unwrap ( ) ;
84+ let session = handle. session_with_builder ( sessions_cap. as_ref ( ) . unwrap ( ) ) ;
85+ session_slot. insert ( session)
86+ }
87+ } ;
88+ session. give ( datum2) ;
6989 }
70- } ) ;
90+ }
7191 }
7292 } ) ;
7393
0 commit comments