Skip to content

Commit 89a7b50

Browse files
committed
Introduce PortConnectivity as BTreeMap
1 parent 75697fa commit 89a7b50

File tree

8 files changed

+90
-87
lines changed

8 files changed

+90
-87
lines changed

timely/src/dataflow/operators/capability.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use std::cell::RefCell;
2727
use std::fmt::{self, Debug};
2828

2929
use crate::order::PartialOrder;
30-
use crate::progress::Antichain;
3130
use crate::progress::Timestamp;
3231
use crate::progress::ChangeBatch;
32+
use crate::progress::operate::PortConnectivity;
3333
use crate::scheduling::Activations;
3434
use crate::dataflow::channels::pullers::counter::ConsumedGuard;
3535

@@ -238,26 +238,26 @@ pub struct InputCapability<T: Timestamp> {
238238
/// Output capability buffers, for use in minting capabilities.
239239
internal: CapabilityUpdates<T>,
240240
/// Timestamp summaries for each output.
241-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
241+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
242242
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
243243
consumed_guard: ConsumedGuard<T>,
244244
}
245245

246246
impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
247247
fn time(&self) -> &T { self.time() }
248248
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
249-
let borrow = self.summaries.borrow();
250-
self.internal.borrow().iter().enumerate().any(|(index, rc)| {
249+
let internal_borrow = self.internal.borrow();
250+
self.summaries.borrow().iter().any(|(output, summary)| {
251251
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
252-
Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default()
252+
Rc::ptr_eq(&internal_borrow[*output], query_buffer) && summary.len() == 1 && summary[0] == Default::default()
253253
})
254254
}
255255
}
256256

257257
impl<T: Timestamp> InputCapability<T> {
258258
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
259259
/// the provided [`ChangeBatch`].
260-
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>, guard: ConsumedGuard<T>) -> Self {
260+
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
261261
InputCapability {
262262
internal,
263263
summaries,
@@ -281,10 +281,10 @@ impl<T: Timestamp> InputCapability<T> {
281281
/// Delays capability for a specific output port.
282282
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
283283
use crate::progress::timestamp::PathSummary;
284-
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
284+
if self.summaries.borrow()[&output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
285285
Capability::new(new_time.clone(), self.internal.borrow()[output_port].clone())
286286
} else {
287-
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[output_port], self.time());
287+
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[&output_port], self.time());
288288
}
289289
}
290290

@@ -305,11 +305,11 @@ impl<T: Timestamp> InputCapability<T> {
305305
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
306306
use crate::progress::timestamp::PathSummary;
307307
let self_time = self.time().clone();
308-
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
308+
if self.summaries.borrow()[&output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
309309
Capability::new(self_time, self.internal.borrow()[output_port].clone())
310310
}
311311
else {
312-
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[output_port], self_time);
312+
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[&output_port], self_time);
313313
}
314314
}
315315
}

timely/src/dataflow/operators/core/feedback.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C> {
113113
let summary = handle.summary;
114114
let mut output = handle.output;
115115

116-
let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]);
116+
let connection = Some((0, Antichain::from_elem(summary.clone()))).into_iter().collect();
117+
let mut input = builder.new_input_connection(self, Pipeline, connection);
117118

118119
builder.build(move |_capability| move |_frontier| {
119120
let mut output = output.activate();

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations};
1212

1313
use crate::progress::{Source, Target};
1414
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15-
use crate::progress::operate::Connectivity;
15+
use crate::progress::operate::{Connectivity, PortConnectivity};
1616

1717
use crate::Container;
1818
use crate::dataflow::{StreamCore, Scope};
@@ -108,12 +108,12 @@ impl<G: Scope> OperatorBuilder<G> {
108108
pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> P::Puller
109109
where
110110
P: ParallelizationContract<G::Timestamp, C> {
111-
let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs];
111+
let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default()))).collect();
112112
self.new_input_connection(stream, pact, connection)
113113
}
114114

115115
/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
116-
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> P::Puller
116+
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: PortConnectivity<<G::Timestamp as Timestamp>::Summary>) -> P::Puller
117117
where
118118
P: ParallelizationContract<G::Timestamp, C> {
119119

@@ -133,21 +133,20 @@ impl<G: Scope> OperatorBuilder<G> {
133133
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
134134
pub fn new_output<C: Container>(&mut self) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
135135

136-
let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs];
136+
let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default()))).collect();
137137
self.new_output_connection(connection)
138138
}
139139

140140
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
141-
pub fn new_output_connection<C: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
141+
pub fn new_output_connection<C: Container>(&mut self, connection: PortConnectivity<<G::Timestamp as Timestamp>::Summary>) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
142142

143143
let (targets, registrar) = Tee::<G::Timestamp,C>::new();
144144
let source = Source::new(self.index, self.shape.outputs);
145145
let stream = StreamCore::new(source, registrar, self.scope.clone());
146146

147147
self.shape.outputs += 1;
148-
assert_eq!(self.shape.inputs, connection.len());
149-
for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) {
150-
summary.push(entry);
148+
for (input, entry) in connection.into_iter() {
149+
self.summary[input].insert(self.shape.outputs - 1, entry);
151150
}
152151

153152
(targets, stream)

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::dataflow::operators::capability::Capability;
2020
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
2121
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
2222
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
23-
23+
use crate::progress::operate::PortConnectivity;
2424
use crate::logging::TimelyLogger as Logger;
2525

2626
use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
@@ -33,7 +33,7 @@ pub struct OperatorBuilder<G: Scope> {
3333
consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
3434
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
3535
/// For each input, a shared list of summaries to each output.
36-
summaries: Vec<Rc<RefCell<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>>>,
36+
summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
3737
produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
3838
logging: Option<Logger>,
3939
}
@@ -64,7 +64,7 @@ impl<G: Scope> OperatorBuilder<G> {
6464
where
6565
P: ParallelizationContract<G::Timestamp, C> {
6666

67-
let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect();
67+
let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default()))).collect();
6868
self.new_input_connection(stream, pact, connection)
6969
}
7070

@@ -76,7 +76,7 @@ impl<G: Scope> OperatorBuilder<G> {
7676
///
7777
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
7878
/// antichain indicating that there is no connection from the input to the output.
79-
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> InputHandleCore<G::Timestamp, C, P::Puller>
79+
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: PortConnectivity<<G::Timestamp as Timestamp>::Summary>) -> InputHandleCore<G::Timestamp, C, P::Puller>
8080
where
8181
P: ParallelizationContract<G::Timestamp, C> {
8282

@@ -94,7 +94,7 @@ impl<G: Scope> OperatorBuilder<G> {
9494

9595
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
9696
pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
97-
let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect();
97+
let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default()))).collect();
9898
self.new_output_connection(connection)
9999
}
100100

@@ -108,7 +108,7 @@ impl<G: Scope> OperatorBuilder<G> {
108108
/// antichain indicating that there is no connection from the input to the output.
109109
pub fn new_output_connection<CB: ContainerBuilder>(
110110
&mut self,
111-
connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>
111+
connection: PortConnectivity<<G::Timestamp as Timestamp>::Summary>
112112
) -> (
113113
OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
114114
StreamCore<G, CB::Container>
@@ -122,8 +122,9 @@ impl<G: Scope> OperatorBuilder<G> {
122122
let mut buffer = PushBuffer::new(PushCounter::new(tee));
123123
self.produced.push(buffer.inner().produced().clone());
124124

125-
for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
126-
summary.borrow_mut().push(connection.clone());
125+
for (input, summary) in connection.into_iter() {
126+
let mut borrow = self.summaries[input].borrow_mut();
127+
borrow.insert(self.shape().outputs() - 1, summary);
127128
}
128129

129130
(OutputWrapper::new(buffer, internal), stream)

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
use std::rc::Rc;
77
use std::cell::RefCell;
88

9-
use crate::progress::Antichain;
109
use crate::progress::Timestamp;
1110
use crate::progress::ChangeBatch;
1211
use crate::progress::frontier::MutableAntichain;
12+
use crate::progress::operate::PortConnectivity;
1313
use crate::dataflow::channels::pullers::Counter as PullCounter;
1414
use crate::dataflow::channels::pushers::Counter as PushCounter;
1515
use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
@@ -30,7 +30,7 @@ pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
3030
///
3131
/// Each timestamp received through this input may only produce output timestamps
3232
/// greater or equal to the input timestamp subjected to at least one of these summaries.
33-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
33+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
3434
logging: Option<Logger>,
3535
}
3636

@@ -148,8 +148,8 @@ pub fn _access_pull_counter<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
148148
/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
149149
pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
150150
pull_counter: PullCounter<T, C, P>,
151-
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
152-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
151+
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
152+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
153153
logging: Option<Logger>
154154
) -> InputHandleCore<T, C, P> {
155155
InputHandleCore {

timely/src/progress/operate.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ pub trait Operate<T: Timestamp> : Schedule {
5959
}
6060

6161
/// Operator internal connectivity, from inputs to outputs.
62-
pub type Connectivity<TS> = Vec<Vec<Antichain<TS>>>;
62+
pub type Connectivity<TS> = Vec<PortConnectivity<TS>>;
63+
/// Internal connectivity from one port to any number of opposing ports.
64+
pub type PortConnectivity<TS> = std::collections::BTreeMap<usize, Antichain<TS>>;
6365

6466
/// Progress information shared between parent and child.
6567
#[derive(Debug)]

0 commit comments

Comments
 (0)