Skip to content

Commit 75697fa

Browse files
committed
Connectivity as a type alias
1 parent 0d2b21f commit 75697fa

File tree

7 files changed

+25
-20
lines changed

7 files changed

+25
-20
lines changed

timely/src/dataflow/operators/core/input.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
77

88
use crate::scheduling::{Schedule, Activator};
99

10-
use crate::progress::frontier::Antichain;
1110
use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
1211
use crate::progress::Source;
12+
use crate::progress::operate::Connectivity;
1313

1414
use crate::{Container, Data};
1515
use crate::communication::Push;
@@ -205,7 +205,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
205205
fn inputs(&self) -> usize { 0 }
206206
fn outputs(&self) -> usize { 1 }
207207

208-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
208+
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {
209209
self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
210210
(Vec::new(), self.shared_progress.clone())
211211
}

timely/src/dataflow/operators/core/unordered_input.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder};
77

88
use crate::scheduling::{Schedule, ActivateOnDrop};
99

10-
use crate::progress::frontier::Antichain;
1110
use crate::progress::{Operate, operate::SharedProgress, Timestamp};
1211
use crate::progress::Source;
1312
use crate::progress::ChangeBatch;
13+
use crate::progress::operate::Connectivity;
1414

1515
use crate::dataflow::channels::pushers::{Counter, Tee};
1616
use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession};
@@ -134,7 +134,7 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
134134
fn inputs(&self) -> usize { 0 }
135135
fn outputs(&self) -> usize { 1 }
136136

137-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
137+
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {
138138
let mut borrow = self.internal.borrow_mut();
139139
for (time, count) in borrow.drain() {
140140
self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::scheduling::{Schedule, Activations};
1212

1313
use crate::progress::{Source, Target};
1414
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15+
use crate::progress::operate::Connectivity;
1516

1617
use crate::Container;
1718
use crate::dataflow::{StreamCore, Scope};
@@ -60,7 +61,7 @@ pub struct OperatorBuilder<G: Scope> {
6061
global: usize,
6162
address: Rc<[usize]>, // path to the operator (ending with index).
6263
shape: OperatorShape,
63-
summary: Vec<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>,
64+
summary: Connectivity<<G::Timestamp as Timestamp>::Summary>,
6465
}
6566

6667
impl<G: Scope> OperatorBuilder<G> {
@@ -188,7 +189,7 @@ where
188189
logic: L,
189190
shared_progress: Rc<RefCell<SharedProgress<T>>>,
190191
activations: Rc<RefCell<Activations>>,
191-
summary: Vec<Vec<Antichain<T::Summary>>>,
192+
summary: Connectivity<T::Summary>,
192193
}
193194

194195
impl<T, L> Schedule for OperatorCore<T, L>
@@ -213,7 +214,7 @@ where
213214
fn outputs(&self) -> usize { self.shape.outputs }
214215

215216
// announce internal topology as fully connected, and hold all default capabilities.
216-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
217+
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {
217218

218219
// Request the operator to be scheduled at least once.
219220
self.activations.borrow_mut().activate(&self.address[..]);

timely/src/logging.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
2222
use crate::Container;
2323
use crate::container::CapacityContainerBuilder;
2424
use crate::dataflow::operators::capture::{Event, EventPusher};
25+
use crate::progress::operate::Connectivity;
2526

2627
/// Logs events as a timely stream, with progress statements.
2728
pub struct BatchLogger<P, C> where P: EventPusher<Duration, C> {
@@ -76,7 +77,7 @@ pub struct OperatesSummaryEvent<TS> {
7677
/// Worker-unique identifier for the operator.
7778
pub id: usize,
7879
/// Timestamp action summaries for (input, output) pairs.
79-
pub summary: Vec<Vec<crate::progress::Antichain<TS>>>,
80+
pub summary: Connectivity<TS>,
8081
}
8182

8283
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]

timely/src/progress/operate.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub trait Operate<T: Timestamp> : Schedule {
4444
///
4545
/// The default behavior is to indicate that timestamps on any input can emerge unchanged on
4646
/// any output, and no initial capabilities are held.
47-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>);
47+
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>);
4848

4949
/// Signals that external frontiers have been set.
5050
///
@@ -58,6 +58,9 @@ pub trait Operate<T: Timestamp> : Schedule {
5858
fn notify_me(&self) -> bool { true }
5959
}
6060

61+
/// Operator internal connectivity, from inputs to outputs.
62+
pub type Connectivity<TS> = Vec<Vec<Antichain<TS>>>;
63+
6164
/// Progress information shared between parent and child.
6265
#[derive(Debug)]
6366
pub struct SharedProgress<T: Timestamp> {

timely/src/progress/reachability.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ use crate::progress::Timestamp;
7979
use crate::progress::{Source, Target};
8080
use crate::progress::ChangeBatch;
8181
use crate::progress::{Location, Port};
82-
82+
use crate::progress::operate::Connectivity;
8383
use crate::progress::frontier::{Antichain, MutableAntichain};
8484
use crate::progress::timestamp::PathSummary;
8585

@@ -132,7 +132,7 @@ pub struct Builder<T: Timestamp> {
132132
/// Indexed by operator index, then input port, then output port. This is the
133133
/// same format returned by `get_internal_summary`, as if we simply appended
134134
/// all of the summaries for the hosted nodes.
135-
pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
135+
pub nodes: Vec<Connectivity<T::Summary>>,
136136
/// Direct connections from sources to targets.
137137
///
138138
/// Edges do not affect timestamps, so we only need to know the connectivity.
@@ -156,7 +156,7 @@ impl<T: Timestamp> Builder<T> {
156156
/// Add links internal to operators.
157157
///
158158
/// This method overwrites any existing summary, instead of anything more sophisticated.
159-
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) {
159+
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Connectivity<T::Summary>) {
160160

161161
// Assert that all summaries exist.
162162
debug_assert_eq!(inputs, summary.len());
@@ -195,7 +195,7 @@ impl<T: Timestamp> Builder<T> {
195195
/// default summaries (a serious liveness issue).
196196
///
197197
/// The optional logger information is baked into the resulting tracker.
198-
pub fn build(self, logger: Option<logging::TrackerLogger<T>>) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>) {
198+
pub fn build(self, logger: Option<logging::TrackerLogger<T>>) -> (Tracker<T>, Connectivity<T::Summary>) {
199199

200200
if !self.is_acyclic() {
201201
println!("Cycle detected without timestamp increment");
@@ -361,7 +361,7 @@ pub struct Tracker<T:Timestamp> {
361361
/// Indexed by operator index, then input port, then output port. This is the
362362
/// same format returned by `get_internal_summary`, as if we simply appended
363363
/// all of the summaries for the hosted nodes.
364-
nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
364+
nodes: Vec<Connectivity<T::Summary>>,
365365
/// Direct connections from sources to targets.
366366
///
367367
/// Edges do not affect timestamps, so we only need to know the connectivity.
@@ -503,7 +503,7 @@ impl<T:Timestamp> Tracker<T> {
503503
/// output port.
504504
///
505505
/// If the optional logger is provided, it will be used to log various tracker events.
506-
pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Vec<Vec<Antichain<T::Summary>>>) {
506+
pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Connectivity<T::Summary>) {
507507

508508
// Allocate buffer space for each input and input port.
509509
let mut per_operator =
@@ -732,7 +732,7 @@ impl<T:Timestamp> Tracker<T> {
732732
/// Graph locations may be missing from the output, in which case they have no
733733
/// paths to scope outputs.
734734
fn summarize_outputs<T: Timestamp>(
735-
nodes: &[Vec<Vec<Antichain<T::Summary>>>],
735+
nodes: &[Connectivity<T::Summary>],
736736
edges: &[Vec<Vec<Target>>],
737737
) -> HashMap<Location, Vec<Antichain<T::Summary>>>
738738
{

timely/src/progress/subgraph.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::scheduling::activate::Activations;
1919
use crate::progress::frontier::{Antichain, MutableAntichain, MutableAntichainFilter};
2020
use crate::progress::{Timestamp, Operate, operate::SharedProgress};
2121
use crate::progress::{Location, Port, Source, Target};
22-
22+
use crate::progress::operate::Connectivity;
2323
use crate::progress::ChangeBatch;
2424
use crate::progress::broadcast::Progcaster;
2525
use crate::progress::reachability;
@@ -270,7 +270,7 @@ where
270270
progcaster: Progcaster<TInner>,
271271

272272
shared_progress: Rc<RefCell<SharedProgress<TOuter>>>,
273-
scope_summary: Vec<Vec<Antichain<TInner::Summary>>>,
273+
scope_summary: Connectivity<TInner::Summary>,
274274

275275
progress_mode: ProgressMode,
276276
}
@@ -546,7 +546,7 @@ where
546546

547547
// produces connectivity summaries from inputs to outputs, and reports initial internal
548548
// capabilities on each of the outputs (projecting capabilities from contained scopes).
549-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<TOuter::Summary>>>, Rc<RefCell<SharedProgress<TOuter>>>) {
549+
fn get_internal_summary(&mut self) -> (Connectivity<TOuter::Summary>, Rc<RefCell<SharedProgress<TOuter>>>) {
550550

551551
// double-check that child 0 (the outside world) is correctly shaped.
552552
assert_eq!(self.children[0].outputs, self.inputs());
@@ -614,7 +614,7 @@ struct PerOperatorState<T: Timestamp> {
614614

615615
shared_progress: Rc<RefCell<SharedProgress<T>>>,
616616

617-
internal_summary: Vec<Vec<Antichain<T::Summary>>>, // cached result from get_internal_summary.
617+
internal_summary: Connectivity<T::Summary>, // cached result from get_internal_summary.
618618

619619
logging: Option<Logger>,
620620
}

0 commit comments

Comments
 (0)