Skip to content

Commit 05e2392

Browse files
committed
emit data with the same capability they came in with
Signed-off-by: Petros Angelatos <[email protected]>
1 parent 7710555 commit 05e2392

File tree

1 file changed

+2
-6
lines changed

1 file changed

+2
-6
lines changed

timely/src/dataflow/operators/core/partition.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::dataflow::channels::pact::Pipeline;
66
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
77
use crate::dataflow::operators::InputCapability;
88
use crate::dataflow::{Scope, StreamCore};
9-
use crate::{Data, PartialOrder};
9+
use crate::Data;
1010

1111
/// Partition a stream of records into multiple streams.
1212
pub trait Partition<G: Scope, C: Container> {
@@ -78,11 +78,7 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
7878
todo.sort_unstable_by(|a, b| a.0.cmp(&b.0));
7979

8080
for (cap, mut data) in todo.drain(..) {
81-
let reset_sessions = match sessions_cap {
82-
Some(ref s_cap) => !PartialOrder::less_equal(s_cap.time(), cap.time()),
83-
None => true,
84-
};
85-
if reset_sessions {
81+
if sessions_cap.as_ref().map_or(true, |s_cap| s_cap.time() != cap.time()) {
8682
sessions = handles.iter_mut().map(SessionState::Handle).collect();
8783
sessions_cap = Some(cap);
8884
}

0 commit comments

Comments
 (0)