Skip to content

Commit b42bbc2

Browse files
committed
Make PortConnectivity a struct, with sufficient methods
1 parent c0b643e commit b42bbc2

File tree

5 files changed

+96
-47
lines changed

5 files changed

+96
-47
lines changed

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl<G: Scope> OperatorBuilder<G> {
124124

125125
self.shape.inputs += 1;
126126
assert_eq!(self.shape.outputs, connection.len());
127-
self.summary.push(connection);
127+
self.summary.push(connection.into());
128128

129129
receiver
130130
}
@@ -146,7 +146,7 @@ impl<G: Scope> OperatorBuilder<G> {
146146
self.shape.outputs += 1;
147147
assert_eq!(self.shape.inputs, connection.len());
148148
for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) {
149-
summary.push(entry);
149+
summary.add_port(entry);
150150
}
151151

152152
(targets, stream)

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl<G: Scope> OperatorBuilder<G> {
8686
self.frontier.push(MutableAntichain::new());
8787
self.consumed.push(Rc::clone(input.consumed()));
8888

89-
let shared_summary = Rc::new(RefCell::new(connection));
89+
let shared_summary = Rc::new(RefCell::new(connection.into()));
9090
self.summaries.push(Rc::clone(&shared_summary));
9191

9292
new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone())
@@ -123,7 +123,7 @@ impl<G: Scope> OperatorBuilder<G> {
123123
self.produced.push(Rc::clone(buffer.inner().produced()));
124124

125125
for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
126-
summary.borrow_mut().push(connection.clone());
126+
summary.borrow_mut().add_port(connection.clone());
127127
}
128128

129129
(OutputWrapper::new(buffer, internal), stream)

timely/src/progress/operate.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,63 @@ pub trait Operate<T: Timestamp> : Schedule {
6161
/// Operator internal connectivity, from inputs to outputs.
6262
pub type Connectivity<TS> = Vec<PortConnectivity<TS>>;
6363
/// Internal connectivity from one port to any number of opposing ports.
64-
pub type PortConnectivity<TS> = Vec<Antichain<TS>>;
64+
#[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)]
65+
pub struct PortConnectivity<TS> {
66+
list: Vec<Antichain<TS>>,
67+
}
68+
69+
impl<TS> Default for PortConnectivity<TS> {
70+
fn default() -> Self {
71+
Self { list: Vec::new() }
72+
}
73+
}
74+
75+
impl<TS> PortConnectivity<TS> {
76+
/// Introduces default summaries for `0 .. count` ports.
77+
pub fn default_for(count: usize) -> Self {
78+
let mut list = Vec::with_capacity(count);
79+
for _ in 0 .. count { list.push(Default::default()) }
80+
Self { list }
81+
}
82+
/// Ensures an entry exists at `index` and returns a mutable reference to it.
83+
pub fn ensure(&mut self, index: usize) -> &mut Antichain<TS> {
84+
while self.next_port() <= index { self.add_port(Antichain::new()); }
85+
&mut self.list[index]
86+
}
87+
/// Inserts an element by reference, ensuring that the index exists.
88+
pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone {
89+
self.ensure(index).insert_ref(element)
90+
}
91+
/// Introduces a summary for the port `self.next_port()`.
92+
pub fn add_port(&mut self, summary: Antichain<TS>) {
93+
self.list.push(summary);
94+
}
95+
/// Borrowing iterator of port identifiers and antichains.
96+
pub fn iter_ports(&self) -> impl Iterator<Item = (usize, &Antichain<TS>)> {
97+
self.list.iter().enumerate()
98+
}
99+
/// Owning iterator of port identifiers and antichains.
100+
pub fn into_iter_ports(self) -> impl Iterator<Item = (usize, Antichain<TS>)> {
101+
self.list.into_iter().enumerate()
102+
}
103+
/// Announces the next output port identifier.
104+
pub fn next_port(&self) -> usize {
105+
self.list.len()
106+
}
107+
}
65108

109+
impl<TS> From<Vec<Antichain<TS>>> for PortConnectivity<TS> {
110+
fn from(list: Vec<Antichain<TS>>) -> Self {
111+
Self { list }
112+
}
113+
}
114+
115+
impl<TS> std::ops::Index<usize> for PortConnectivity<TS> {
116+
type Output = Antichain<TS>;
117+
fn index(&self, index: usize) -> &Self::Output {
118+
&self.list[index]
119+
}
120+
}
66121

67122
/// Progress information shared between parent and child.
68123
#[derive(Debug)]

timely/src/progress/reachability.rs

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
//! let mut builder = Builder::<usize>::new();
1818
//!
1919
//! // Each node with one input connected to one output.
20-
//! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
21-
//! builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
22-
//! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]);
20+
//! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
21+
//! builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
22+
//! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)].into()]);
2323
//!
2424
//! // Connect nodes in sequence, looping around to the first from the last.
2525
//! builder.add_edge(Source::new(0, 0), Target::new(1, 0));
@@ -80,7 +80,7 @@ use crate::progress::{Source, Target};
8080
use crate::progress::ChangeBatch;
8181
use crate::progress::{Location, Port};
8282
use crate::progress::operate::{Connectivity, PortConnectivity};
83-
use crate::progress::frontier::{Antichain, MutableAntichain};
83+
use crate::progress::frontier::MutableAntichain;
8484
use crate::progress::timestamp::PathSummary;
8585

8686

@@ -113,9 +113,9 @@ use crate::progress::timestamp::PathSummary;
113113
/// let mut builder = Builder::<usize>::new();
114114
///
115115
/// // Each node with one input connected to one output.
116-
/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
117-
/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
118-
/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]);
116+
/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
117+
/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
118+
/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)].into()]);
119119
///
120120
/// // Connect nodes in sequence, looping around to the first from the last.
121121
/// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
@@ -160,7 +160,7 @@ impl<T: Timestamp> Builder<T> {
160160

161161
// Assert that all summaries exist.
162162
debug_assert_eq!(inputs, summary.len());
163-
for x in summary.iter() { debug_assert_eq!(outputs, x.len()); }
163+
for x in summary.iter() { debug_assert_eq!(outputs, x.next_port()); }
164164

165165
while self.nodes.len() <= index {
166166
self.nodes.push(Vec::new());
@@ -224,9 +224,9 @@ impl<T: Timestamp> Builder<T> {
224224
/// let mut builder = Builder::<usize>::new();
225225
///
226226
/// // Each node with one input connected to one output.
227-
/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
228-
/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
229-
/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)]]);
227+
/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
228+
/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
229+
/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
230230
///
231231
/// // Connect nodes in sequence, looping around to the first from the last.
232232
/// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
@@ -253,8 +253,8 @@ impl<T: Timestamp> Builder<T> {
253253
///
254254
/// // Two inputs and outputs, only one of which advances.
255255
/// builder.add_node(0, 2, 2, vec![
256-
/// vec![Antichain::from_elem(0),Antichain::new(),],
257-
/// vec![Antichain::new(),Antichain::from_elem(1),],
256+
/// vec![Antichain::from_elem(0),Antichain::new(),].into(),
257+
/// vec![Antichain::new(),Antichain::from_elem(1),].into(),
258258
/// ]);
259259
///
260260
/// // Connect each output to the opposite input.
@@ -285,7 +285,7 @@ impl<T: Timestamp> Builder<T> {
285285
for (input, outputs) in summary.iter().enumerate() {
286286
let target = Location::new_target(index, input);
287287
in_degree.entry(target).or_insert(0);
288-
for (output, summaries) in outputs.iter().enumerate() {
288+
for (output, summaries) in outputs.iter_ports() {
289289
let source = Location::new_source(index, output);
290290
for summary in summaries.elements().iter() {
291291
if summary == &Default::default() {
@@ -322,7 +322,7 @@ impl<T: Timestamp> Builder<T> {
322322
}
323323
},
324324
Port::Target(port) => {
325-
for (output, summaries) in self.nodes[node][port].iter().enumerate() {
325+
for (output, summaries) in self.nodes[node][port].iter_ports() {
326326
let source = Location::new_source(node, output);
327327
for summary in summaries.elements().iter() {
328328
if summary == &Default::default() {
@@ -442,7 +442,7 @@ impl<T: Timestamp> PortInformation<T> {
442442
PortInformation {
443443
pointstamps: MutableAntichain::new(),
444444
implications: MutableAntichain::new(),
445-
output_summaries: Vec::new(),
445+
output_summaries: PortConnectivity::default(),
446446
}
447447
}
448448

@@ -514,7 +514,7 @@ impl<T:Timestamp> Tracker<T> {
514514
.collect::<Vec<_>>();
515515

516516
// Summary of scope inputs to scope outputs.
517-
let mut builder_summary = vec![vec![]; builder.shape[0].1];
517+
let mut builder_summary = vec![PortConnectivity::default(); builder.shape[0].1];
518518

519519
// Compile summaries from each location to each scope output.
520520
let output_summaries = summarize_outputs::<T>(&builder.nodes, &builder.edges);
@@ -598,7 +598,7 @@ impl<T:Timestamp> Tracker<T> {
598598

599599
for (time, diff) in changes {
600600
self.total_counts += diff;
601-
for (output, summaries) in operator.output_summaries.iter().enumerate() {
601+
for (output, summaries) in operator.output_summaries.iter_ports() {
602602
let output_changes = &mut self.output_changes[output];
603603
summaries
604604
.elements()
@@ -617,7 +617,7 @@ impl<T:Timestamp> Tracker<T> {
617617

618618
for (time, diff) in changes {
619619
self.total_counts += diff;
620-
for (output, summaries) in operator.output_summaries.iter().enumerate() {
620+
for (output, summaries) in operator.output_summaries.iter_ports() {
621621
let output_changes = &mut self.output_changes[output];
622622
summaries
623623
.elements()
@@ -658,7 +658,7 @@ impl<T:Timestamp> Tracker<T> {
658658

659659
for (time, diff) in changes {
660660
let nodes = &self.nodes[location.node][port_index];
661-
for (output_port, summaries) in nodes.iter().enumerate() {
661+
for (output_port, summaries) in nodes.iter_ports() {
662662
let source = Location { node: location.node, port: Port::Source(output_port) };
663663
for summary in summaries.elements().iter() {
664664
if let Some(new_time) = summary.results_in(&time) {
@@ -778,17 +778,14 @@ fn summarize_outputs<T: Timestamp>(
778778

779779
// Determine the current path summaries from the input port.
780780
let location = Location { node: location.node, port: Port::Target(input_port) };
781-
let antichains = results
782-
.entry(location)
783-
.and_modify(|antichains| antichains.reserve(output))
784-
.or_insert_with(|| Vec::with_capacity(output));
785-
786-
while antichains.len() <= output { antichains.push(Antichain::new()); }
781+
let antichains = results.entry(location).or_default();
782+
// TODO: This is redundant with `insert_ref` below.
783+
antichains.ensure(output);
787784

788785
// Combine each operator-internal summary to the output with `summary`.
789786
for operator_summary in summaries[output_port].elements().iter() {
790787
if let Some(combined) = operator_summary.followed_by(&summary) {
791-
if antichains[output].insert(combined.clone()) {
788+
if antichains.insert_ref(output, &combined) {
792789
worklist.push_back((location, output, combined));
793790
}
794791
}
@@ -803,14 +800,11 @@ fn summarize_outputs<T: Timestamp>(
803800

804801
// Each target should have (at most) one source.
805802
if let Some(&source) = reverse.get(&location) {
806-
let antichains = results
807-
.entry(source)
808-
.and_modify(|antichains| antichains.reserve(output))
809-
.or_insert_with(|| Vec::with_capacity(output));
810-
811-
while antichains.len() <= output { antichains.push(Antichain::new()); }
803+
let antichains = results.entry(source).or_default();
804+
// TODO: This is redundant with `insert_ref` below.
805+
antichains.ensure(output);
812806

813-
if antichains[output].insert(summary.clone()) {
807+
if antichains.insert_ref(output, &summary) {
814808
worklist.push_back((source, output, summary.clone()));
815809
}
816810
}

timely/src/progress/subgraph.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ use crate::logging::TimelySummaryLogger as SummaryLogger;
1616
use crate::scheduling::Schedule;
1717
use crate::scheduling::activate::Activations;
1818

19-
use crate::progress::frontier::{Antichain, MutableAntichain, MutableAntichainFilter};
19+
use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter};
2020
use crate::progress::{Timestamp, Operate, operate::SharedProgress};
2121
use crate::progress::{Location, Port, Source, Target};
22-
use crate::progress::operate::Connectivity;
22+
use crate::progress::operate::{Connectivity, PortConnectivity};
2323
use crate::progress::ChangeBatch;
2424
use crate::progress::broadcast::Progcaster;
2525
use crate::progress::reachability;
@@ -168,7 +168,7 @@ where
168168
let mut builder = reachability::Builder::new();
169169

170170
// Child 0 has `inputs` outputs and `outputs` inputs, not yet connected.
171-
let summary = (0..outputs).map(|_| (0..inputs).map(|_| Antichain::new()).collect()).collect();
171+
let summary = (0..outputs).map(|_| PortConnectivity::default_for(inputs)).collect();
172172
builder.add_node(0, outputs, inputs, summary);
173173
for (index, child) in self.children.iter().enumerate().skip(1) {
174174
builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
@@ -555,10 +555,10 @@ where
555555
// Note that we need to have `self.inputs()` elements in the summary
556556
// with each element containing `self.outputs()` antichains regardless
557557
// of how long `self.scope_summary` is
558-
let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()];
558+
let mut internal_summary = vec![PortConnectivity::default_for(self.outputs); self.inputs()];
559559
for (input_idx, input) in self.scope_summary.iter().enumerate() {
560-
for (output_idx, output) in input.iter().enumerate() {
561-
let antichain = &mut internal_summary[input_idx][output_idx];
560+
for (output_idx, output) in input.iter_ports() {
561+
let antichain = internal_summary[input_idx].ensure(output_idx);
562562
antichain.reserve(output.elements().len());
563563
antichain.extend(output.elements().iter().cloned().map(TInner::summarize));
564564
}
@@ -570,7 +570,7 @@ where
570570
"the internal summary should have as many elements as there are inputs",
571571
);
572572
debug_assert!(
573-
internal_summary.iter().all(|summary| summary.len() == self.outputs()),
573+
internal_summary.iter().all(|summary| summary.next_port() == self.outputs()),
574574
"each element of the internal summary should have as many elements as there are outputs",
575575
);
576576

@@ -671,7 +671,7 @@ impl<T: Timestamp> PerOperatorState<T> {
671671
inputs,
672672
);
673673
assert!(
674-
!internal_summary.iter().any(|x| x.len() != outputs),
674+
!internal_summary.iter().any(|x| x.next_port() != outputs),
675675
"operator summary had too few outputs",
676676
);
677677

0 commit comments

Comments
 (0)