Skip to content

Commit f5c85b9

Browse files
committed
eliminate moving in and out of sessions
Signed-off-by: Petros Angelatos <[email protected]>
1 parent c5b221e commit f5c85b9

File tree

1 file changed

+8
-5
lines changed

1 file changed

+8
-5
lines changed

timely/src/dataflow/operators/core/partition.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,18 +71,21 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
7171

7272
for (cap, mut data) in todo.drain(..) {
7373
if sessions_cap.as_ref().map_or(true, |s_cap| s_cap.time() != cap.time()) {
74-
sessions = handles.iter_mut().map(|h| Some(Err(h))).collect();
74+
sessions = handles.iter_mut().map(|h| (None, Some(h))).collect();
7575
sessions_cap = Some(cap);
7676
}
7777
for datum in data.drain() {
7878
let (part, datum2) = route(datum);
7979

80-
let mut session = match sessions[part as usize].take().unwrap() {
81-
Ok(s) => s,
82-
Err(handle) => handle.session_with_builder(sessions_cap.as_ref().unwrap()),
80+
let session = match sessions[part as usize] {
81+
(Some(ref mut s), _) => s,
82+
(ref mut session_slot, ref mut handle) => {
83+
let handle = handle.take().unwrap();
84+
let session = handle.session_with_builder(sessions_cap.as_ref().unwrap());
85+
session_slot.insert(session)
86+
}
8387
};
8488
session.give(datum2);
85-
sessions[part as usize] = Some(Ok(session));
8689
}
8790
}
8891
}

0 commit comments

Comments
 (0)