@@ -58,12 +58,9 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
5858 builder. build ( move |_| {
5959 let mut todo = vec ! [ ] ;
6060 move |_frontiers| {
61- #[ derive( Default ) ]
6261 enum SessionState < H , S > {
6362 Handle ( H ) ,
6463 Session ( S ) ,
65- #[ default]
66- Invalid ,
6764 }
6865
6966 let mut handles = outputs. iter_mut ( ) . map ( |o| o. activate ( ) ) . collect :: < Vec < _ > > ( ) ;
@@ -79,28 +76,23 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
7976
8077 for ( cap, mut data) in todo. drain ( ..) {
8178 if sessions_cap. as_ref ( ) . map_or ( true , |s_cap| s_cap. time ( ) != cap. time ( ) ) {
82- sessions = handles. iter_mut ( ) . map ( SessionState :: Handle ) . collect ( ) ;
79+ sessions = handles
80+ . iter_mut ( )
81+ . map ( |h| Some ( SessionState :: Handle ( h) ) )
82+ . collect ( ) ;
8383 sessions_cap = Some ( cap) ;
8484 }
8585 for datum in data. drain ( ) {
8686 let ( part, datum2) = route ( datum) ;
8787
88- let session = match & mut sessions[ part as usize ] {
88+ let mut session = match sessions[ part as usize ] . take ( ) . unwrap ( ) {
8989 SessionState :: Session ( s) => s,
90- st @ SessionState :: Handle ( _) => {
91- let SessionState :: Handle ( handle) = std:: mem:: take ( st) else {
92- unreachable ! ( ) ;
93- } ;
94- let session = handle. session_with_builder ( sessions_cap. as_ref ( ) . unwrap ( ) ) ;
95- * st = SessionState :: Session ( session) ;
96- let SessionState :: Session ( session) = st else {
97- unreachable ! ( ) ;
98- } ;
99- session
90+ SessionState :: Handle ( handle) => {
91+ handle. session_with_builder ( sessions_cap. as_ref ( ) . unwrap ( ) )
10092 }
101- SessionState :: Invalid => unreachable ! ( ) ,
10293 } ;
10394 session. give ( datum2) ;
95+ sessions[ part as usize ] = Some ( SessionState :: Session ( session) ) ;
10496 }
10597 }
10698 }
0 commit comments