Skip to content

Commit c0b643e

Browse files
committed
Introduce Connectivity and PortConnectivity type aliases
1 parent 94620a8 commit c0b643e

File tree

10 files changed

+39
-34
lines changed

10 files changed

+39
-34
lines changed

timely/src/dataflow/operators/capability.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use std::cell::RefCell;
2727
use std::fmt::{self, Debug};
2828

2929
use crate::order::PartialOrder;
30-
use crate::progress::Antichain;
3130
use crate::progress::Timestamp;
3231
use crate::progress::ChangeBatch;
32+
use crate::progress::operate::PortConnectivity;
3333
use crate::scheduling::Activations;
3434
use crate::dataflow::channels::pullers::counter::ConsumedGuard;
3535

@@ -238,7 +238,7 @@ pub struct InputCapability<T: Timestamp> {
238238
/// Output capability buffers, for use in minting capabilities.
239239
internal: CapabilityUpdates<T>,
240240
/// Timestamp summaries for each output.
241-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
241+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
242242
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
243243
consumed_guard: ConsumedGuard<T>,
244244
}
@@ -257,7 +257,7 @@ impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
257257
impl<T: Timestamp> InputCapability<T> {
258258
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
259259
/// the provided [`ChangeBatch`].
260-
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>, guard: ConsumedGuard<T>) -> Self {
260+
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
261261
InputCapability {
262262
internal,
263263
summaries,

timely/src/dataflow/operators/core/input.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
77

88
use crate::scheduling::{Schedule, Activator};
99

10-
use crate::progress::frontier::Antichain;
1110
use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
1211
use crate::progress::Source;
13-
12+
use crate::progress::operate::Connectivity;
1413
use crate::{Container, Data};
1514
use crate::communication::Push;
1615
use crate::dataflow::{Scope, ScopeParent, StreamCore};
@@ -205,7 +204,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
205204
fn inputs(&self) -> usize { 0 }
206205
fn outputs(&self) -> usize { 1 }
207206

208-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
207+
fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
209208
self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
210209
(Vec::new(), Rc::clone(&self.shared_progress))
211210
}

timely/src/dataflow/operators/core/unordered_input.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder};
77

88
use crate::scheduling::{Schedule, ActivateOnDrop};
99

10-
use crate::progress::frontier::Antichain;
1110
use crate::progress::{Operate, operate::SharedProgress, Timestamp};
1211
use crate::progress::Source;
1312
use crate::progress::ChangeBatch;
14-
13+
use crate::progress::operate::Connectivity;
1514
use crate::dataflow::channels::pushers::{Counter, Tee};
1615
use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession};
1716

@@ -134,7 +133,7 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
134133
fn inputs(&self) -> usize { 0 }
135134
fn outputs(&self) -> usize { 1 }
136135

137-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
136+
fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
138137
let mut borrow = self.internal.borrow_mut();
139138
for (time, count) in borrow.drain() {
140139
self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations};
1212

1313
use crate::progress::{Source, Target};
1414
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15-
15+
use crate::progress::operate::Connectivity;
1616
use crate::Container;
1717
use crate::dataflow::{StreamCore, Scope};
1818
use crate::dataflow::channels::pushers::Tee;
@@ -60,7 +60,7 @@ pub struct OperatorBuilder<G: Scope> {
6060
global: usize,
6161
address: Rc<[usize]>, // path to the operator (ending with index).
6262
shape: OperatorShape,
63-
summary: Vec<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>,
63+
summary: Connectivity<<G::Timestamp as Timestamp>::Summary>,
6464
}
6565

6666
impl<G: Scope> OperatorBuilder<G> {
@@ -188,7 +188,7 @@ where
188188
logic: L,
189189
shared_progress: Rc<RefCell<SharedProgress<T>>>,
190190
activations: Rc<RefCell<Activations>>,
191-
summary: Vec<Vec<Antichain<T::Summary>>>,
191+
summary: Connectivity<T::Summary>,
192192
}
193193

194194
impl<T, L> Schedule for OperatorCore<T, L>
@@ -213,7 +213,7 @@ where
213213
fn outputs(&self) -> usize { self.shape.outputs }
214214

215215
// announce internal topology as fully connected, and hold all default capabilities.
216-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
216+
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {
217217

218218
// Request the operator to be scheduled at least once.
219219
self.activations.borrow_mut().activate(&self.address[..]);

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::dataflow::operators::capability::Capability;
2020
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
2121
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
2222
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
23-
23+
use crate::progress::operate::PortConnectivity;
2424
use crate::logging::TimelyLogger as Logger;
2525

2626
use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
@@ -33,7 +33,7 @@ pub struct OperatorBuilder<G: Scope> {
3333
consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
3434
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
3535
/// For each input, a shared list of summaries to each output.
36-
summaries: Vec<Rc<RefCell<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>>>,
36+
summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
3737
produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
3838
logging: Option<Logger>,
3939
}

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
use std::rc::Rc;
77
use std::cell::RefCell;
88

9-
use crate::progress::Antichain;
109
use crate::progress::Timestamp;
1110
use crate::progress::ChangeBatch;
1211
use crate::progress::frontier::MutableAntichain;
12+
use crate::progress::operate::PortConnectivity;
1313
use crate::dataflow::channels::pullers::Counter as PullCounter;
1414
use crate::dataflow::channels::pushers::Counter as PushCounter;
1515
use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
@@ -30,7 +30,7 @@ pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
3030
///
3131
/// Each timestamp received through this input may only produce output timestamps
3232
/// greater or equal to the input timestamp subjected to at least one of these summaries.
33-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
33+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
3434
logging: Option<Logger>,
3535
}
3636

@@ -149,7 +149,7 @@ pub fn _access_pull_counter<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
149149
pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
150150
pull_counter: PullCounter<T, C, P>,
151151
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
152-
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
152+
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
153153
logging: Option<Logger>
154154
) -> InputHandleCore<T, C, P> {
155155
InputHandleCore {

timely/src/logging.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
2222
use crate::Container;
2323
use crate::container::CapacityContainerBuilder;
2424
use crate::dataflow::operators::capture::{Event, EventPusher};
25+
use crate::progress::operate::Connectivity;
2526

2627
/// Logs events as a timely stream, with progress statements.
2728
pub struct BatchLogger<P, C> where P: EventPusher<Duration, C> {
@@ -76,7 +77,7 @@ pub struct OperatesSummaryEvent<TS> {
7677
/// Worker-unique identifier for the operator.
7778
pub id: usize,
7879
/// Timestamp action summaries for (input, output) pairs.
79-
pub summary: Vec<Vec<crate::progress::Antichain<TS>>>,
80+
pub summary: Connectivity<TS>,
8081
}
8182

8283
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]

timely/src/progress/operate.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub trait Operate<T: Timestamp> : Schedule {
4444
///
4545
/// The default behavior is to indicate that timestamps on any input can emerge unchanged on
4646
/// any output, and no initial capabilities are held.
47-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>);
47+
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>);
4848

4949
/// Signals that external frontiers have been set.
5050
///
@@ -58,6 +58,12 @@ pub trait Operate<T: Timestamp> : Schedule {
5858
fn notify_me(&self) -> bool { true }
5959
}
6060

61+
/// Operator internal connectivity, from inputs to outputs.
62+
pub type Connectivity<TS> = Vec<PortConnectivity<TS>>;
63+
/// Internal connectivity from one port to any number of opposing ports.
64+
pub type PortConnectivity<TS> = Vec<Antichain<TS>>;
65+
66+
6167
/// Progress information shared between parent and child.
6268
#[derive(Debug)]
6369
pub struct SharedProgress<T: Timestamp> {

timely/src/progress/reachability.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ use crate::progress::Timestamp;
7979
use crate::progress::{Source, Target};
8080
use crate::progress::ChangeBatch;
8181
use crate::progress::{Location, Port};
82-
82+
use crate::progress::operate::{Connectivity, PortConnectivity};
8383
use crate::progress::frontier::{Antichain, MutableAntichain};
8484
use crate::progress::timestamp::PathSummary;
8585

@@ -132,7 +132,7 @@ pub struct Builder<T: Timestamp> {
132132
/// Indexed by operator index, then input port, then output port. This is the
133133
/// same format returned by `get_internal_summary`, as if we simply appended
134134
/// all of the summaries for the hosted nodes.
135-
pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
135+
pub nodes: Vec<Connectivity<T::Summary>>,
136136
/// Direct connections from sources to targets.
137137
///
138138
/// Edges do not affect timestamps, so we only need to know the connectivity.
@@ -156,7 +156,7 @@ impl<T: Timestamp> Builder<T> {
156156
/// Add links internal to operators.
157157
///
158158
/// This method overwrites any existing summary, instead of anything more sophisticated.
159-
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) {
159+
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Connectivity<T::Summary>) {
160160

161161
// Assert that all summaries exist.
162162
debug_assert_eq!(inputs, summary.len());
@@ -195,7 +195,7 @@ impl<T: Timestamp> Builder<T> {
195195
/// default summaries (a serious liveness issue).
196196
///
197197
/// The optional logger information is baked into the resulting tracker.
198-
pub fn build(self, logger: Option<logging::TrackerLogger<T>>) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>) {
198+
pub fn build(self, logger: Option<logging::TrackerLogger<T>>) -> (Tracker<T>, Connectivity<T::Summary>) {
199199

200200
if !self.is_acyclic() {
201201
println!("Cycle detected without timestamp increment");
@@ -361,7 +361,7 @@ pub struct Tracker<T:Timestamp> {
361361
/// Indexed by operator index, then input port, then output port. This is the
362362
/// same format returned by `get_internal_summary`, as if we simply appended
363363
/// all of the summaries for the hosted nodes.
364-
nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
364+
nodes: Vec<Connectivity<T::Summary>>,
365365
/// Direct connections from sources to targets.
366366
///
367367
/// Edges do not affect timestamps, so we only need to know the connectivity.
@@ -433,7 +433,7 @@ pub struct PortInformation<T: Timestamp> {
433433
/// Current implications of active pointstamps across the dataflow.
434434
pub implications: MutableAntichain<T>,
435435
/// Path summaries to each of the scope outputs.
436-
pub output_summaries: Vec<Antichain<T::Summary>>,
436+
pub output_summaries: PortConnectivity<T::Summary>,
437437
}
438438

439439
impl<T: Timestamp> PortInformation<T> {
@@ -503,7 +503,7 @@ impl<T:Timestamp> Tracker<T> {
503503
/// output port.
504504
///
505505
/// If the optional logger is provided, it will be used to log various tracker events.
506-
pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Vec<Vec<Antichain<T::Summary>>>) {
506+
pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Connectivity<T::Summary>) {
507507

508508
// Allocate buffer space for each input and input port.
509509
let mut per_operator =
@@ -732,9 +732,9 @@ impl<T:Timestamp> Tracker<T> {
732732
/// Graph locations may be missing from the output, in which case they have no
733733
/// paths to scope outputs.
734734
fn summarize_outputs<T: Timestamp>(
735-
nodes: &[Vec<Vec<Antichain<T::Summary>>>],
735+
nodes: &[Connectivity<T::Summary>],
736736
edges: &[Vec<Vec<Target>>],
737-
) -> HashMap<Location, Vec<Antichain<T::Summary>>>
737+
) -> HashMap<Location, PortConnectivity<T::Summary>>
738738
{
739739
// A reverse edge map, to allow us to walk back up the dataflow graph.
740740
let mut reverse = HashMap::new();
@@ -749,7 +749,7 @@ fn summarize_outputs<T: Timestamp>(
749749
}
750750
}
751751

752-
let mut results: HashMap<Location, Vec<Antichain<T::Summary>>> = HashMap::new();
752+
let mut results: HashMap<Location, PortConnectivity<T::Summary>> = HashMap::new();
753753
let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
754754

755755
let outputs =

timely/src/progress/subgraph.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::scheduling::activate::Activations;
1919
use crate::progress::frontier::{Antichain, MutableAntichain, MutableAntichainFilter};
2020
use crate::progress::{Timestamp, Operate, operate::SharedProgress};
2121
use crate::progress::{Location, Port, Source, Target};
22-
22+
use crate::progress::operate::Connectivity;
2323
use crate::progress::ChangeBatch;
2424
use crate::progress::broadcast::Progcaster;
2525
use crate::progress::reachability;
@@ -270,7 +270,7 @@ where
270270
progcaster: Progcaster<TInner>,
271271

272272
shared_progress: Rc<RefCell<SharedProgress<TOuter>>>,
273-
scope_summary: Vec<Vec<Antichain<TInner::Summary>>>,
273+
scope_summary: Connectivity<TInner::Summary>,
274274

275275
progress_mode: ProgressMode,
276276
}
@@ -546,7 +546,7 @@ where
546546

547547
// produces connectivity summaries from inputs to outputs, and reports initial internal
548548
// capabilities on each of the outputs (projecting capabilities from contained scopes).
549-
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<TOuter::Summary>>>, Rc<RefCell<SharedProgress<TOuter>>>) {
549+
fn get_internal_summary(&mut self) -> (Connectivity<TOuter::Summary>, Rc<RefCell<SharedProgress<TOuter>>>) {
550550

551551
// double-check that child 0 (the outside world) is correctly shaped.
552552
assert_eq!(self.children[0].outputs, self.inputs());
@@ -614,7 +614,7 @@ struct PerOperatorState<T: Timestamp> {
614614

615615
shared_progress: Rc<RefCell<SharedProgress<T>>>,
616616

617-
internal_summary: Vec<Vec<Antichain<T::Summary>>>, // cached result from get_internal_summary.
617+
internal_summary: Connectivity<T::Summary>, // cached result from get_internal_summary.
618618

619619
logging: Option<Logger>,
620620
}

0 commit comments

Comments
 (0)