Skip to content

Commit 7710555

Browse files
committed
process input in sorted time order
Doing so miniminzes the times sessions need to be reset with a different timestamp. Signed-off-by: Petros Angelatos <[email protected]>
1 parent 0449aa1 commit 7710555

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

timely/src/dataflow/operators/core/partition.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
5656
}
5757

5858
builder.build(move |_| {
59+
let mut todo = vec![];
5960
move |_frontiers| {
6061
#[derive(Default)]
6162
enum SessionState<H, S> {
@@ -72,6 +73,11 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
7273
let mut sessions = vec![];
7374

7475
while let Some((cap, data)) = input.next() {
76+
todo.push((cap, std::mem::take(data)));
77+
}
78+
todo.sort_unstable_by(|a, b| a.0.cmp(&b.0));
79+
80+
for (cap, mut data) in todo.drain(..) {
7581
let reset_sessions = match sessions_cap {
7682
Some(ref s_cap) => !PartialOrder::less_equal(s_cap.time(), cap.time()),
7783
None => true,

0 commit comments

Comments
 (0)