Skip to content

Commit 0449aa1

Browse files
committed
partition: lazily create and then reuse output sessions
The partition operator was previously unconditionally initializing sessions for all outputs every time it received an incoming container even though the container might have contained datums for only a small subset of the outputs. This PR makes it so sessions are only lazily initialized when the first datum for them is observed. Additionally, sessions are reused across incoming containers as long as the session's capability dominates the incoming ones. Signed-off-by: Petros Angelatos <[email protected]>
1 parent 2e6e54f commit 0449aa1

File tree

1 file changed

+41
-8
lines changed

1 file changed

+41
-8
lines changed

timely/src/dataflow/operators/core/partition.rs

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ use timely_container::{Container, ContainerBuilder, PushInto};
44

55
use crate::dataflow::channels::pact::Pipeline;
66
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7+
use crate::dataflow::operators::InputCapability;
78
use crate::dataflow::{Scope, StreamCore};
8-
use crate::Data;
9+
use crate::{Data, PartialOrder};
910

1011
/// Partition a stream of records into multiple streams.
1112
pub trait Partition<G: Scope, C: Container> {
@@ -56,18 +57,50 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
5657

5758
builder.build(move |_| {
5859
move |_frontiers| {
60+
#[derive(Default)]
61+
enum SessionState<H, S> {
62+
Handle(H),
63+
Session(S),
64+
#[default]
65+
Invalid,
66+
}
67+
5968
let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
60-
input.for_each(|time, data| {
61-
let mut sessions = handles
62-
.iter_mut()
63-
.map(|h| h.session_with_builder(&time))
64-
.collect::<Vec<_>>();
6569

70+
// The capability associated with each session in `sessions`.
71+
let mut sessions_cap: Option<InputCapability<G::Timestamp>> = None;
72+
let mut sessions = vec![];
73+
74+
while let Some((cap, data)) = input.next() {
75+
let reset_sessions = match sessions_cap {
76+
Some(ref s_cap) => !PartialOrder::less_equal(s_cap.time(), cap.time()),
77+
None => true,
78+
};
79+
if reset_sessions {
80+
sessions = handles.iter_mut().map(SessionState::Handle).collect();
81+
sessions_cap = Some(cap);
82+
}
6683
for datum in data.drain() {
6784
let (part, datum2) = route(datum);
68-
sessions[part as usize].give(datum2);
85+
86+
let session = match &mut sessions[part as usize] {
87+
SessionState::Session(s) => s,
88+
st @ SessionState::Handle(_) => {
89+
let SessionState::Handle(handle) = std::mem::take(st) else {
90+
unreachable!();
91+
};
92+
let session = handle.session_with_builder(sessions_cap.as_ref().unwrap());
93+
*st = SessionState::Session(session);
94+
let SessionState::Session(session) = st else {
95+
unreachable!();
96+
};
97+
session
98+
}
99+
SessionState::Invalid => unreachable!(),
100+
};
101+
session.give(datum2);
69102
}
70-
});
103+
}
71104
}
72105
});
73106

0 commit comments

Comments
 (0)