Skip to content

Commit 51212fe

Browse files
authored
Add cease to output handles (#496)
In certain situations, a handle survives longer than we would like to wait for a flush to happen. In this cases, an explicit call to cease can help to indicate to the rest of the system that no more data follows immediately, which is equivalent to dropping the handle. Specifically, in async code the handle can be long-lived and survive await points, which makes it more important to signal momentary completion to the system. Signed-off-by: Moritz Hoffmann <[email protected]> Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 01438ae commit 51212fe

File tree

1 file changed

+5
-0
lines changed

1 file changed

+5
-0
lines changed

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,11 @@ impl<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>> OutputHandleCore
221221
assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability");
222222
self.push_buffer.session(cap.time())
223223
}
224+
225+
/// Flushes all pending data and indicate that no more data immediately follows.
226+
pub fn cease(&mut self) {
227+
self.push_buffer.cease();
228+
}
224229
}
225230

226231
impl<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>> Drop for OutputHandleCore<'a, T, C, P> {

0 commit comments

Comments
 (0)