Skip to content

Commit 7341835

Browse files
Merge pull request #691 from frankmcsherry/avoid_validity_scan
Avoid capability validity scan with explicit index
2 parents 81e4001 + b499ea2 commit 7341835

File tree

3 files changed

+20
-18
lines changed

3 files changed

+20
-18
lines changed

timely/src/dataflow/operators/capability.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,19 @@ use crate::dataflow::channels::pullers::counter::ConsumedGuard;
3737
pub trait CapabilityTrait<T: Timestamp> {
3838
/// The timestamp associated with the capability.
3939
fn time(&self) -> &T;
40-
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool;
40+
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool;
4141
}
4242

4343
impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
4444
fn time(&self) -> &T { (**self).time() }
45-
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
46-
(**self).valid_for_output(query_buffer)
45+
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
46+
(**self).valid_for_output(query_buffer, port)
4747
}
4848
}
4949
impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
5050
fn time(&self) -> &T { (**self).time() }
51-
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
52-
(**self).valid_for_output(query_buffer)
51+
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
52+
(**self).valid_for_output(query_buffer, port)
5353
}
5454
}
5555

@@ -66,7 +66,7 @@ pub struct Capability<T: Timestamp> {
6666

6767
impl<T: Timestamp> CapabilityTrait<T> for Capability<T> {
6868
fn time(&self) -> &T { &self.time }
69-
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
69+
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, _port: usize) -> bool {
7070
Rc::ptr_eq(&self.internal, query_buffer)
7171
}
7272
}
@@ -227,9 +227,9 @@ impl Error for DowngradeError {}
227227
/// A shared list of shared output capability buffers.
228228
type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
229229

230-
/// An capability of an input port.
230+
/// An capability of an input port.
231231
///
232-
/// Holding onto this capability will implicitly holds onto a capability for all the outputs
232+
/// Holding onto this capability will implicitly holds onto a capability for all the outputs
233233
/// ports this input is connected to, after the connection summaries have been applied.
234234
///
235235
/// This input capability supplies a `retain_for_output(self)` method which consumes the input
@@ -245,14 +245,12 @@ pub struct InputCapability<T: Timestamp> {
245245

246246
impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
247247
fn time(&self) -> &T { self.time() }
248-
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
248+
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
249249
let summaries_borrow = self.summaries.borrow();
250250
let internal_borrow = self.internal.borrow();
251251
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
252-
let result = summaries_borrow.iter_ports().any(|(port, path)| {
253-
Rc::ptr_eq(&internal_borrow[port], query_buffer) && path.len() == 1 && path[0] == Default::default()
254-
});
255-
result
252+
Rc::ptr_eq(&internal_borrow[port], query_buffer) &&
253+
summaries_borrow.get(port).map_or(false, |path| path.elements() == [Default::default()])
256254
}
257255
}
258256

@@ -353,8 +351,8 @@ pub struct ActivateCapability<T: Timestamp> {
353351

354352
impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {
355353
fn time(&self) -> &T { self.capability.time() }
356-
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
357-
self.capability.valid_for_output(query_buffer)
354+
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
355+
self.capability.valid_for_output(query_buffer, port)
358356
}
359357
}
360358

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl<G: Scope> OperatorBuilder<G> {
127127
self.summaries[input].borrow_mut().add_port(new_output, entry);
128128
}
129129

130-
(OutputWrapper::new(buffer, internal), stream)
130+
(OutputWrapper::new(buffer, internal, new_output), stream)
131131
}
132132

133133
/// Creates an operator implementation from supplied logic constructor.

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,16 @@ pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
169169
pub struct OutputWrapper<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> {
170170
push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>,
171171
internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
172+
port: usize,
172173
}
173174

174175
impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputWrapper<T, CB, P> {
175176
/// Creates a new output wrapper from a push buffer.
176-
pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>) -> Self {
177+
pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
177178
OutputWrapper {
178179
push_buffer,
179180
internal_buffer,
181+
port,
180182
}
181183
}
182184
/// Borrows the push buffer into a handle, which can be used to send records.
@@ -187,6 +189,7 @@ impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Out
187189
OutputHandleCore {
188190
push_buffer: &mut self.push_buffer,
189191
internal_buffer: &self.internal_buffer,
192+
port: self.port,
190193
}
191194
}
192195
}
@@ -195,6 +198,7 @@ impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Out
195198
pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push<Message<T, CB::Container>>+'a> {
196199
push_buffer: &'a mut Buffer<T, CB, PushCounter<T, CB::Container, P>>,
197200
internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
201+
port: usize,
198202
}
199203

200204
/// Handle specialized to `Vec`-based container.
@@ -225,7 +229,7 @@ impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>>
225229
/// });
226230
/// ```
227231
pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter<T, CB::Container, P>> where 'a: 'b {
228-
debug_assert!(cap.valid_for_output(self.internal_buffer), "Attempted to open output session with invalid capability");
232+
debug_assert!(cap.valid_for_output(self.internal_buffer, self.port), "Attempted to open output session with invalid capability");
229233
self.push_buffer.session_with_builder(cap.time())
230234
}
231235

0 commit comments

Comments
 (0)