Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ use crate::dataflow::channels::pullers::counter::ConsumedGuard;
pub trait CapabilityTrait<T: Timestamp> {
/// The timestamp associated with the capability.
fn time(&self) -> &T;
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool;
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool;
}

impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
fn time(&self) -> &T { (**self).time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
(**self).valid_for_output(query_buffer)
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
(**self).valid_for_output(query_buffer, port)
}
}
impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
fn time(&self) -> &T { (**self).time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
(**self).valid_for_output(query_buffer)
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
(**self).valid_for_output(query_buffer, port)
}
}

Expand All @@ -66,7 +66,7 @@ pub struct Capability<T: Timestamp> {

impl<T: Timestamp> CapabilityTrait<T> for Capability<T> {
fn time(&self) -> &T { &self.time }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, _port: usize) -> bool {
Rc::ptr_eq(&self.internal, query_buffer)
}
}
Expand Down Expand Up @@ -227,9 +227,9 @@ impl Error for DowngradeError {}
/// A shared list of shared output capability buffers.
type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;

/// An capability of an input port.
/// An capability of an input port.
///
/// Holding onto this capability will implicitly holds onto a capability for all the outputs
/// Holding onto this capability will implicitly holds onto a capability for all the outputs
/// ports this input is connected to, after the connection summaries have been applied.
///
/// This input capability supplies a `retain_for_output(self)` method which consumes the input
Expand All @@ -245,14 +245,12 @@ pub struct InputCapability<T: Timestamp> {

impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
fn time(&self) -> &T { self.time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
let summaries_borrow = self.summaries.borrow();
let internal_borrow = self.internal.borrow();
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
let result = summaries_borrow.iter_ports().any(|(port, path)| {
Rc::ptr_eq(&internal_borrow[port], query_buffer) && path.len() == 1 && path[0] == Default::default()
});
result
Rc::ptr_eq(&internal_borrow[port], query_buffer) &&
summaries_borrow.get(port).map_or(false, |path| path.elements() == [Default::default()])
}
}

Expand Down Expand Up @@ -353,8 +351,8 @@ pub struct ActivateCapability<T: Timestamp> {

impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {
fn time(&self) -> &T { self.capability.time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
self.capability.valid_for_output(query_buffer)
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
self.capability.valid_for_output(query_buffer, port)
}
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl<G: Scope> OperatorBuilder<G> {
self.summaries[input].borrow_mut().add_port(new_output, entry);
}

(OutputWrapper::new(buffer, internal), stream)
(OutputWrapper::new(buffer, internal, new_output), stream)
}

/// Creates an operator implementation from supplied logic constructor.
Expand Down
8 changes: 6 additions & 2 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,16 @@ pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
pub struct OutputWrapper<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> {
push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>,
internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
port: usize,
}

impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputWrapper<T, CB, P> {
/// Creates a new output wrapper from a push buffer.
pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>) -> Self {
pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
OutputWrapper {
push_buffer,
internal_buffer,
port,
}
}
/// Borrows the push buffer into a handle, which can be used to send records.
Expand All @@ -187,6 +189,7 @@ impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Out
OutputHandleCore {
push_buffer: &mut self.push_buffer,
internal_buffer: &self.internal_buffer,
port: self.port,
}
}
}
Expand All @@ -195,6 +198,7 @@ impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Out
pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push<Message<T, CB::Container>>+'a> {
push_buffer: &'a mut Buffer<T, CB, PushCounter<T, CB::Container, P>>,
internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
port: usize,
}

/// Handle specialized to `Vec`-based container.
Expand Down Expand Up @@ -225,7 +229,7 @@ impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>>
/// });
/// ```
pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter<T, CB::Container, P>> where 'a: 'b {
debug_assert!(cap.valid_for_output(self.internal_buffer), "Attempted to open output session with invalid capability");
debug_assert!(cap.valid_for_output(self.internal_buffer, self.port), "Attempted to open output session with invalid capability");
self.push_buffer.session_with_builder(cap.time())
}

Expand Down
Loading