Skip to content

Commit d0ea86f

Browse files
Linear connectivity (#651)
* Connectivity as a type alias * Introduce PortConnectivity as BTreeMap * Introduce tests; catch further issues * Respond to review feedback
1 parent 0d2b21f commit d0ea86f

File tree

12 files changed

+209
-120
lines changed

12 files changed

+209
-120
lines changed

timely/src/dataflow/operators/capability.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use std::cell::RefCell;
2727
use std::fmt::{self, Debug};
2828

2929
use crate::order::PartialOrder;
30-
use crate::progress::Antichain;
3130
use crate::progress::Timestamp;
3231
use crate::progress::ChangeBatch;
32+
use crate::progress::operate::PortConnectivity;
3333
use crate::scheduling::Activations;
3434
use crate::dataflow::channels::pullers::counter::ConsumedGuard;
3535

@@ -238,26 +238,26 @@ pub struct InputCapability<T: Timestamp> {
238238
/// Output capability buffers, for use in minting capabilities.
239239
internal: CapabilityUpdates<T>,
240240
/// Timestamp summaries for each output.
241-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
241+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
242242
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
243243
consumed_guard: ConsumedGuard<T>,
244244
}
245245

246246
impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
247247
fn time(&self) -> &T { self.time() }
248248
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
249-
let borrow = self.summaries.borrow();
250-
self.internal.borrow().iter().enumerate().any(|(index, rc)| {
249+
let internal_borrow = self.internal.borrow();
250+
self.summaries.borrow().iter().any(|(output, summary)| {
251251
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
252-
Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default()
252+
Rc::ptr_eq(&internal_borrow[*output], query_buffer) && summary.len() == 1 && summary[0] == Default::default()
253253
})
254254
}
255255
}
256256

257257
impl<T: Timestamp> InputCapability<T> {
258258
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
259259
/// the provided [`ChangeBatch`].
260-
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>, guard: ConsumedGuard<T>) -> Self {
260+
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
261261
InputCapability {
262262
internal,
263263
summaries,
@@ -281,10 +281,10 @@ impl<T: Timestamp> InputCapability<T> {
281281
/// Delays capability for a specific output port.
282282
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
283283
use crate::progress::timestamp::PathSummary;
284-
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
284+
if self.summaries.borrow()[&output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
285285
Capability::new(new_time.clone(), self.internal.borrow()[output_port].clone())
286286
} else {
287-
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[output_port], self.time());
287+
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[&output_port], self.time());
288288
}
289289
}
290290

@@ -305,11 +305,11 @@ impl<T: Timestamp> InputCapability<T> {
305305
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
306306
use crate::progress::timestamp::PathSummary;
307307
let self_time = self.time().clone();
308-
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
308+
if self.summaries.borrow()[&output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
309309
Capability::new(self_time, self.internal.borrow()[output_port].clone())
310310
}
311311
else {
312-
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[output_port], self_time);
312+
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[&output_port], self_time);
313313
}
314314
}
315315
}

timely/src/dataflow/operators/core/feedback.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C> {
113113
let summary = handle.summary;
114114
let mut output = handle.output;
115115

116-
let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]);
116+
let connection = Some((0, Antichain::from_elem(summary.clone()))).into_iter().collect();
117+
let mut input = builder.new_input_connection(self, Pipeline, connection);
117118

118119
builder.build(move |_capability| move |_frontier| {
119120
let mut output = output.activate();

timely/src/dataflow/operators/core/input.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
77

88
use crate::scheduling::{Schedule, Activator};
99

10-
use crate::progress::frontier::Antichain;
1110
use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
1211
use crate::progress::Source;
12+
use crate::progress::operate::Connectivity;
1313

1414
use crate::{Container, Data};
1515
use crate::communication::Push;
@@ -205,7 +205,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
205205
fn inputs(&self) -> usize { 0 }
206206
fn outputs(&self) -> usize { 1 }
207207

208-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
208+
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {
209209
self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
210210
(Vec::new(), self.shared_progress.clone())
211211
}

timely/src/dataflow/operators/core/unordered_input.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder};
77

88
use crate::scheduling::{Schedule, ActivateOnDrop};
99

10-
use crate::progress::frontier::Antichain;
1110
use crate::progress::{Operate, operate::SharedProgress, Timestamp};
1211
use crate::progress::Source;
1312
use crate::progress::ChangeBatch;
13+
use crate::progress::operate::Connectivity;
1414

1515
use crate::dataflow::channels::pushers::{Counter, Tee};
1616
use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession};
@@ -134,7 +134,7 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
134134
fn inputs(&self) -> usize { 0 }
135135
fn outputs(&self) -> usize { 1 }
136136

137-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
137+
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {
138138
let mut borrow = self.internal.borrow_mut();
139139
for (time, count) in borrow.drain() {
140140
self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::scheduling::{Schedule, Activations};
1212

1313
use crate::progress::{Source, Target};
1414
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15+
use crate::progress::operate::{Connectivity, PortConnectivity};
1516

1617
use crate::Container;
1718
use crate::dataflow::{StreamCore, Scope};
@@ -60,7 +61,7 @@ pub struct OperatorBuilder<G: Scope> {
6061
global: usize,
6162
address: Rc<[usize]>, // path to the operator (ending with index).
6263
shape: OperatorShape,
63-
summary: Vec<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>,
64+
summary: Connectivity<<G::Timestamp as Timestamp>::Summary>,
6465
}
6566

6667
impl<G: Scope> OperatorBuilder<G> {
@@ -107,12 +108,12 @@ impl<G: Scope> OperatorBuilder<G> {
107108
pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> P::Puller
108109
where
109110
P: ParallelizationContract<G::Timestamp, C> {
110-
let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs];
111+
let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default()))).collect();
111112
self.new_input_connection(stream, pact, connection)
112113
}
113114

114115
/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
115-
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> P::Puller
116+
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: PortConnectivity<<G::Timestamp as Timestamp>::Summary>) -> P::Puller
116117
where
117118
P: ParallelizationContract<G::Timestamp, C> {
118119

@@ -123,7 +124,6 @@ impl<G: Scope> OperatorBuilder<G> {
123124
stream.connect_to(target, sender, channel_id);
124125

125126
self.shape.inputs += 1;
126-
assert_eq!(self.shape.outputs, connection.len());
127127
self.summary.push(connection);
128128

129129
receiver
@@ -132,21 +132,20 @@ impl<G: Scope> OperatorBuilder<G> {
132132
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
133133
pub fn new_output<C: Container>(&mut self) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
134134

135-
let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs];
135+
let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default()))).collect();
136136
self.new_output_connection(connection)
137137
}
138138

139139
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
140-
pub fn new_output_connection<C: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
140+
pub fn new_output_connection<C: Container>(&mut self, connection: PortConnectivity<<G::Timestamp as Timestamp>::Summary>) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
141141

142142
let (targets, registrar) = Tee::<G::Timestamp,C>::new();
143143
let source = Source::new(self.index, self.shape.outputs);
144144
let stream = StreamCore::new(source, registrar, self.scope.clone());
145145

146146
self.shape.outputs += 1;
147-
assert_eq!(self.shape.inputs, connection.len());
148-
for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) {
149-
summary.push(entry);
147+
for (input, entry) in connection.into_iter() {
148+
self.summary[input].insert(self.shape.outputs - 1, entry);
150149
}
151150

152151
(targets, stream)
@@ -188,7 +187,7 @@ where
188187
logic: L,
189188
shared_progress: Rc<RefCell<SharedProgress<T>>>,
190189
activations: Rc<RefCell<Activations>>,
191-
summary: Vec<Vec<Antichain<T::Summary>>>,
190+
summary: Connectivity<T::Summary>,
192191
}
193192

194193
impl<T, L> Schedule for OperatorCore<T, L>
@@ -213,7 +212,7 @@ where
213212
fn outputs(&self) -> usize { self.shape.outputs }
214213

215214
// announce internal topology as fully connected, and hold all default capabilities.
216-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
215+
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {
217216

218217
// Request the operator to be scheduled at least once.
219218
self.activations.borrow_mut().activate(&self.address[..]);

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::dataflow::operators::capability::Capability;
2020
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
2121
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
2222
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
23-
23+
use crate::progress::operate::PortConnectivity;
2424
use crate::logging::TimelyLogger as Logger;
2525

2626
use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
@@ -33,7 +33,7 @@ pub struct OperatorBuilder<G: Scope> {
3333
consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
3434
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
3535
/// For each input, a shared list of summaries to each output.
36-
summaries: Vec<Rc<RefCell<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>>>,
36+
summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
3737
produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
3838
logging: Option<Logger>,
3939
}
@@ -64,7 +64,7 @@ impl<G: Scope> OperatorBuilder<G> {
6464
where
6565
P: ParallelizationContract<G::Timestamp, C> {
6666

67-
let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect();
67+
let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default()))).collect();
6868
self.new_input_connection(stream, pact, connection)
6969
}
7070

@@ -76,7 +76,7 @@ impl<G: Scope> OperatorBuilder<G> {
7676
///
7777
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
7878
/// antichain indicating that there is no connection from the input to the output.
79-
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> InputHandleCore<G::Timestamp, C, P::Puller>
79+
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: PortConnectivity<<G::Timestamp as Timestamp>::Summary>) -> InputHandleCore<G::Timestamp, C, P::Puller>
8080
where
8181
P: ParallelizationContract<G::Timestamp, C> {
8282

@@ -94,7 +94,7 @@ impl<G: Scope> OperatorBuilder<G> {
9494

9595
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
9696
pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
97-
let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect();
97+
let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default()))).collect();
9898
self.new_output_connection(connection)
9999
}
100100

@@ -108,7 +108,7 @@ impl<G: Scope> OperatorBuilder<G> {
108108
/// antichain indicating that there is no connection from the input to the output.
109109
pub fn new_output_connection<CB: ContainerBuilder>(
110110
&mut self,
111-
connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>
111+
connection: PortConnectivity<<G::Timestamp as Timestamp>::Summary>
112112
) -> (
113113
OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
114114
StreamCore<G, CB::Container>
@@ -122,8 +122,9 @@ impl<G: Scope> OperatorBuilder<G> {
122122
let mut buffer = PushBuffer::new(PushCounter::new(tee));
123123
self.produced.push(buffer.inner().produced().clone());
124124

125-
for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
126-
summary.borrow_mut().push(connection.clone());
125+
for (input, summary) in connection.into_iter() {
126+
let mut borrow = self.summaries[input].borrow_mut();
127+
borrow.insert(self.shape().outputs() - 1, summary);
127128
}
128129

129130
(OutputWrapper::new(buffer, internal), stream)

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
use std::rc::Rc;
77
use std::cell::RefCell;
88

9-
use crate::progress::Antichain;
109
use crate::progress::Timestamp;
1110
use crate::progress::ChangeBatch;
1211
use crate::progress::frontier::MutableAntichain;
12+
use crate::progress::operate::PortConnectivity;
1313
use crate::dataflow::channels::pullers::Counter as PullCounter;
1414
use crate::dataflow::channels::pushers::Counter as PushCounter;
1515
use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
@@ -30,7 +30,7 @@ pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
3030
///
3131
/// Each timestamp received through this input may only produce output timestamps
3232
/// greater or equal to the input timestamp subjected to at least one of these summaries.
33-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
33+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
3434
logging: Option<Logger>,
3535
}
3636

@@ -148,8 +148,8 @@ pub fn _access_pull_counter<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
148148
/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
149149
pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
150150
pull_counter: PullCounter<T, C, P>,
151-
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
152-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
151+
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
152+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
153153
logging: Option<Logger>
154154
) -> InputHandleCore<T, C, P> {
155155
InputHandleCore {

timely/src/logging.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
2222
use crate::Container;
2323
use crate::container::CapacityContainerBuilder;
2424
use crate::dataflow::operators::capture::{Event, EventPusher};
25+
use crate::progress::operate::Connectivity;
2526

2627
/// Logs events as a timely stream, with progress statements.
2728
pub struct BatchLogger<P, C> where P: EventPusher<Duration, C> {
@@ -76,7 +77,7 @@ pub struct OperatesSummaryEvent<TS> {
7677
/// Worker-unique identifier for the operator.
7778
pub id: usize,
7879
/// Timestamp action summaries for (input, output) pairs.
79-
pub summary: Vec<Vec<crate::progress::Antichain<TS>>>,
80+
pub summary: Connectivity<TS>,
8081
}
8182

8283
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]

timely/src/progress/operate.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub trait Operate<T: Timestamp> : Schedule {
4444
///
4545
/// The default behavior is to indicate that timestamps on any input can emerge unchanged on
4646
/// any output, and no initial capabilities are held.
47-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>);
47+
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>);
4848

4949
/// Signals that external frontiers have been set.
5050
///
@@ -58,6 +58,11 @@ pub trait Operate<T: Timestamp> : Schedule {
5858
fn notify_me(&self) -> bool { true }
5959
}
6060

61+
/// Operator internal connectivity, from inputs to outputs.
62+
pub type Connectivity<TS> = Vec<PortConnectivity<TS>>;
63+
/// Internal connectivity from one port to any number of opposing ports.
64+
pub type PortConnectivity<TS> = std::collections::BTreeMap<usize, Antichain<TS>>;
65+
6166
/// Progress information shared between parent and child.
6267
#[derive(Debug)]
6368
pub struct SharedProgress<T: Timestamp> {

0 commit comments

Comments
 (0)