Skip to content
42 changes: 27 additions & 15 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use std::cell::RefCell;
use std::fmt::{self, Debug};

use crate::order::PartialOrder;
use crate::progress::Antichain;
use crate::progress::Timestamp;
use crate::progress::ChangeBatch;
use crate::progress::operate::PortConnectivity;
use crate::scheduling::Activations;
use crate::dataflow::channels::pullers::counter::ConsumedGuard;

Expand Down Expand Up @@ -238,26 +238,28 @@ pub struct InputCapability<T: Timestamp> {
/// Output capability buffers, for use in minting capabilities.
internal: CapabilityUpdates<T>,
/// Timestamp summaries for each output.
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
consumed_guard: ConsumedGuard<T>,
}

impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
fn time(&self) -> &T { self.time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
let borrow = self.summaries.borrow();
self.internal.borrow().iter().enumerate().any(|(index, rc)| {
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default()
})
let summaries_borrow = self.summaries.borrow();
let internal_borrow = self.internal.borrow();
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
let result = summaries_borrow.iter_ports().any(|(port, path)| {
Rc::ptr_eq(&internal_borrow[port], query_buffer) && path.len() == 1 && path[0] == Default::default()
});
result
}
}

impl<T: Timestamp> InputCapability<T> {
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
/// the provided [`ChangeBatch`].
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>, guard: ConsumedGuard<T>) -> Self {
pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
InputCapability {
internal,
summaries,
Expand All @@ -281,10 +283,15 @@ impl<T: Timestamp> InputCapability<T> {
/// Delays capability for a specific output port.
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
use crate::progress::timestamp::PathSummary;
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
} else {
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[output_port], self.time());
if let Some(path) = self.summaries.borrow().get(output_port) {
if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
} else {
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
}
}
else {
panic!("Attempted to delay a capability for a disconnected output");
}
}

Expand All @@ -305,11 +312,16 @@ impl<T: Timestamp> InputCapability<T> {
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
use crate::progress::timestamp::PathSummary;
let self_time = self.time().clone();
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
if let Some(path) = self.summaries.borrow().get(output_port) {
if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
}
else {
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time);
}
}
else {
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[output_port], self_time);
panic!("Attempted to retain a capability for a disconnected output");
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/feedback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C> {
let summary = handle.summary;
let mut output = handle.output;

let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]);
let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]);

builder.build(move |_capability| move |_frontier| {
let mut output = output.activate();
Expand Down
5 changes: 2 additions & 3 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};

use crate::scheduling::{Schedule, Activator};

use crate::progress::frontier::Antichain;
use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
use crate::progress::Source;

use crate::progress::operate::Connectivity;
use crate::{Container, Data};
use crate::communication::Push;
use crate::dataflow::{Scope, ScopeParent, StreamCore};
Expand Down Expand Up @@ -205,7 +204,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
fn inputs(&self) -> usize { 0 }
fn outputs(&self) -> usize { 1 }

fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
(Vec::new(), Rc::clone(&self.shared_progress))
}
Expand Down
5 changes: 2 additions & 3 deletions timely/src/dataflow/operators/core/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use crate::container::{ContainerBuilder, CapacityContainerBuilder};

use crate::scheduling::{Schedule, ActivateOnDrop};

use crate::progress::frontier::Antichain;
use crate::progress::{Operate, operate::SharedProgress, Timestamp};
use crate::progress::Source;
use crate::progress::ChangeBatch;

use crate::progress::operate::Connectivity;
use crate::dataflow::channels::pushers::{Counter, Tee};
use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession};

Expand Down Expand Up @@ -134,7 +133,7 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
fn inputs(&self) -> usize { 0 }
fn outputs(&self) -> usize { 1 }

fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
let mut borrow = self.internal.borrow_mut();
for (time, count) in borrow.drain() {
self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
Expand Down
45 changes: 25 additions & 20 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations};

use crate::progress::{Source, Target};
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};

use crate::progress::operate::{Connectivity, PortConnectivity};
use crate::Container;
use crate::dataflow::{StreamCore, Scope};
use crate::dataflow::channels::pushers::Tee;
Expand Down Expand Up @@ -60,7 +60,7 @@ pub struct OperatorBuilder<G: Scope> {
global: usize,
address: Rc<[usize]>, // path to the operator (ending with index).
shape: OperatorShape,
summary: Vec<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>,
summary: Connectivity<<G::Timestamp as Timestamp>::Summary>,
}

impl<G: Scope> OperatorBuilder<G> {
Expand Down Expand Up @@ -105,48 +105,53 @@ impl<G: Scope> OperatorBuilder<G> {

/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> P::Puller
where
P: ParallelizationContract<G::Timestamp, C> {
let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs];
where
P: ParallelizationContract<G::Timestamp, C>
{
let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default())));
self.new_input_connection(stream, pact, connection)
}

/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> P::Puller
pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> P::Puller
where
P: ParallelizationContract<G::Timestamp, C> {

P: ParallelizationContract<G::Timestamp, C>,
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
{
let channel_id = self.scope.new_identifier();
let logging = self.scope.logging();
let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging);
let target = Target::new(self.index, self.shape.inputs);
stream.connect_to(target, sender, channel_id);

self.shape.inputs += 1;
assert_eq!(self.shape.outputs, connection.len());
self.summary.push(connection);
let connectivity: PortConnectivity<_> = connection.into_iter().collect();
assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
self.summary.push(connectivity);

receiver
}

/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
pub fn new_output<C: Container>(&mut self) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {

let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs];
let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default())));
self.new_output_connection(connection)
}

/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
pub fn new_output_connection<C: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {

pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (Tee<G::Timestamp, C>, StreamCore<G, C>)
where
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
{
let new_output = self.shape.outputs;
self.shape.outputs += 1;
let (targets, registrar) = Tee::<G::Timestamp,C>::new();
let source = Source::new(self.index, self.shape.outputs);
let source = Source::new(self.index, new_output);
let stream = StreamCore::new(source, registrar, self.scope.clone());

self.shape.outputs += 1;
assert_eq!(self.shape.inputs, connection.len());
for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) {
summary.push(entry);
for (input, entry) in connection {
self.summary[input].add_port(new_output, entry);
}

(targets, stream)
Expand Down Expand Up @@ -188,7 +193,7 @@ where
logic: L,
shared_progress: Rc<RefCell<SharedProgress<T>>>,
activations: Rc<RefCell<Activations>>,
summary: Vec<Vec<Antichain<T::Summary>>>,
summary: Connectivity<T::Summary>,
}

impl<T, L> Schedule for OperatorCore<T, L>
Expand All @@ -213,7 +218,7 @@ where
fn outputs(&self) -> usize { self.shape.outputs }

// announce internal topology as fully connected, and hold all default capabilities.
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>) {

// Request the operator to be scheduled at least once.
self.activations.borrow_mut().activate(&self.address[..]);
Expand Down
35 changes: 18 additions & 17 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::dataflow::operators::capability::Capability;
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
use crate::dataflow::operators::generic::builder_raw::OperatorShape;

use crate::progress::operate::PortConnectivity;
use crate::logging::TimelyLogger as Logger;

use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
Expand All @@ -33,7 +33,7 @@ pub struct OperatorBuilder<G: Scope> {
consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
/// For each input, a shared list of summaries to each output.
summaries: Vec<Rc<RefCell<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>>>,
summaries: Vec<Rc<RefCell<PortConnectivity<<G::Timestamp as Timestamp>::Summary>>>>,
produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
logging: Option<Logger>,
}
Expand Down Expand Up @@ -64,7 +64,7 @@ impl<G: Scope> OperatorBuilder<G> {
where
P: ParallelizationContract<G::Timestamp, C> {

let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect();
let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
self.new_input_connection(stream, pact, connection)
}

Expand All @@ -76,25 +76,26 @@ impl<G: Scope> OperatorBuilder<G> {
///
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
/// antichain indicating that there is no connection from the input to the output.
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> InputHandleCore<G::Timestamp, C, P::Puller>
where
P: ParallelizationContract<G::Timestamp, C> {

pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> InputHandleCore<G::Timestamp, C, P::Puller>
where
P: ParallelizationContract<G::Timestamp, C>,
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
{
let puller = self.builder.new_input_connection(stream, pact, connection.clone());

let input = PullCounter::new(puller);
self.frontier.push(MutableAntichain::new());
self.consumed.push(Rc::clone(input.consumed()));

let shared_summary = Rc::new(RefCell::new(connection));
let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
self.summaries.push(Rc::clone(&shared_summary));

new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone())
}

/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect();
let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
self.new_output_connection(connection)
}

Expand All @@ -106,14 +107,14 @@ impl<G: Scope> OperatorBuilder<G> {
///
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
/// antichain indicating that there is no connection from the input to the output.
pub fn new_output_connection<CB: ContainerBuilder>(
&mut self,
connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>
) -> (
pub fn new_output_connection<CB: ContainerBuilder, I>(&mut self, connection: I) -> (
OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
StreamCore<G, CB::Container>
) {

)
where
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
{
let new_output = self.shape().outputs();
let (tee, stream) = self.builder.new_output_connection(connection.clone());

let internal = Rc::new(RefCell::new(ChangeBatch::new()));
Expand All @@ -122,8 +123,8 @@ impl<G: Scope> OperatorBuilder<G> {
let mut buffer = PushBuffer::new(PushCounter::new(tee));
self.produced.push(Rc::clone(buffer.inner().produced()));

for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
summary.borrow_mut().push(connection.clone());
for (input, entry) in connection {
self.summaries[input].borrow_mut().add_port(new_output, entry);
}

(OutputWrapper::new(buffer, internal), stream)
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
use std::rc::Rc;
use std::cell::RefCell;

use crate::progress::Antichain;
use crate::progress::Timestamp;
use crate::progress::ChangeBatch;
use crate::progress::frontier::MutableAntichain;
use crate::progress::operate::PortConnectivity;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
Expand All @@ -30,7 +30,7 @@ pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
///
/// Each timestamp received through this input may only produce output timestamps
/// greater or equal to the input timestamp subjected to at least one of these summaries.
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
logging: Option<Logger>,
}

Expand Down Expand Up @@ -149,7 +149,7 @@ pub fn _access_pull_counter<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
pull_counter: PullCounter<T, C, P>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
logging: Option<Logger>
) -> InputHandleCore<T, C, P> {
InputHandleCore {
Expand Down
3 changes: 2 additions & 1 deletion timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
use crate::Container;
use crate::container::CapacityContainerBuilder;
use crate::dataflow::operators::capture::{Event, EventPusher};
use crate::progress::operate::Connectivity;

/// Logs events as a timely stream, with progress statements.
pub struct BatchLogger<P, C> where P: EventPusher<Duration, C> {
Expand Down Expand Up @@ -76,7 +77,7 @@ pub struct OperatesSummaryEvent<TS> {
/// Worker-unique identifier for the operator.
pub id: usize,
/// Timestamp action summaries for (input, output) pairs.
pub summary: Vec<Vec<crate::progress::Antichain<TS>>>,
pub summary: Connectivity<TS>,
}

#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
Expand Down
Loading