diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 8ba993726..6235572e6 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -37,19 +37,19 @@ use crate::dataflow::channels::pullers::counter::ConsumedGuard; pub trait CapabilityTrait { /// The timestamp associated with the capability. fn time(&self) -> &T; - fn valid_for_output(&self, query_buffer: &Rc>>) -> bool; + fn valid_for_output(&self, query_buffer: &Rc>>, port: usize) -> bool; } impl> CapabilityTrait for &C { fn time(&self) -> &T { (**self).time() } - fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { - (**self).valid_for_output(query_buffer) + fn valid_for_output(&self, query_buffer: &Rc>>, port: usize) -> bool { + (**self).valid_for_output(query_buffer, port) } } impl> CapabilityTrait for &mut C { fn time(&self) -> &T { (**self).time() } - fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { - (**self).valid_for_output(query_buffer) + fn valid_for_output(&self, query_buffer: &Rc>>, port: usize) -> bool { + (**self).valid_for_output(query_buffer, port) } } @@ -66,7 +66,7 @@ pub struct Capability { impl CapabilityTrait for Capability { fn time(&self) -> &T { &self.time } - fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { + fn valid_for_output(&self, query_buffer: &Rc>>, _port: usize) -> bool { Rc::ptr_eq(&self.internal, query_buffer) } } @@ -227,9 +227,9 @@ impl Error for DowngradeError {} /// A shared list of shared output capability buffers. type CapabilityUpdates = Rc>>>>>; -/// An capability of an input port. +/// An capability of an input port. /// -/// Holding onto this capability will implicitly holds onto a capability for all the outputs +/// Holding onto this capability will implicitly holds onto a capability for all the outputs /// ports this input is connected to, after the connection summaries have been applied. /// /// This input capability supplies a `retain_for_output(self)` method which consumes the input @@ -245,14 +245,12 @@ pub struct InputCapability { impl CapabilityTrait for InputCapability { fn time(&self) -> &T { self.time() } - fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { + fn valid_for_output(&self, query_buffer: &Rc>>, port: usize) -> bool { let summaries_borrow = self.summaries.borrow(); let internal_borrow = self.internal.borrow(); // To be valid, the output buffer must match and the timestamp summary needs to be the default. - let result = summaries_borrow.iter_ports().any(|(port, path)| { - Rc::ptr_eq(&internal_borrow[port], query_buffer) && path.len() == 1 && path[0] == Default::default() - }); - result + Rc::ptr_eq(&internal_borrow[port], query_buffer) && + summaries_borrow.get(port).map_or(false, |path| path.elements() == [Default::default()]) } } @@ -353,8 +351,8 @@ pub struct ActivateCapability { impl CapabilityTrait for ActivateCapability { fn time(&self) -> &T { self.capability.time() } - fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { - self.capability.valid_for_output(query_buffer) + fn valid_for_output(&self, query_buffer: &Rc>>, port: usize) -> bool { + self.capability.valid_for_output(query_buffer, port) } } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 66957ddf5..c3d02a780 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -127,7 +127,7 @@ impl OperatorBuilder { self.summaries[input].borrow_mut().add_port(new_output, entry); } - (OutputWrapper::new(buffer, internal), stream) + (OutputWrapper::new(buffer, internal, new_output), stream) } /// Creates an operator implementation from supplied logic constructor. diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 154ebeddb..16d8a149c 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -169,14 +169,16 @@ pub fn new_input_handle>>( pub struct OutputWrapper>> { push_buffer: Buffer>, internal_buffer: Rc>>, + port: usize, } impl>> OutputWrapper { /// Creates a new output wrapper from a push buffer. - pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>) -> Self { + pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>, port: usize) -> Self { OutputWrapper { push_buffer, internal_buffer, + port, } } /// Borrows the push buffer into a handle, which can be used to send records. @@ -187,6 +189,7 @@ impl>> Out OutputHandleCore { push_buffer: &mut self.push_buffer, internal_buffer: &self.internal_buffer, + port: self.port, } } } @@ -195,6 +198,7 @@ impl>> Out pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push>+'a> { push_buffer: &'a mut Buffer>, internal_buffer: &'a Rc>>, + port: usize, } /// Handle specialized to `Vec`-based container. @@ -225,7 +229,7 @@ impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> /// }); /// ``` pub fn session_with_builder<'b, CT: CapabilityTrait>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter> where 'a: 'b { - debug_assert!(cap.valid_for_output(self.internal_buffer), "Attempted to open output session with invalid capability"); + debug_assert!(cap.valid_for_output(self.internal_buffer, self.port), "Attempted to open output session with invalid capability"); self.push_buffer.session_with_builder(cap.time()) }