diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index c3865cb2b..924bd196a 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -40,6 +40,7 @@ impl Branch for Stream { condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, ) -> (Stream, Stream) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); + builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); let (mut output1, stream1) = builder.new_output(); @@ -95,6 +96,7 @@ pub trait BranchWhen: Sized { impl BranchWhen for StreamCore { fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); + builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); let (mut output1, stream1) = builder.new_output(); diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index 89848f689..24fcef532 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -71,6 +71,7 @@ impl Concatenate for G { // create an operator builder. use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone()); + builder.set_notify(false); // create new input handles for each input stream. let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::>(); diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index 8fdd4f708..61bd5c196 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -72,6 +72,7 @@ impl Feedback for G { fn feedback(&mut self, summary: ::Summary) -> (Handle, StreamCore) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); + builder.set_notify(false); let (output, stream) = builder.new_output(); (Handle { builder, summary, output }, stream) diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index fd7887053..77fd1f527 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -50,6 +50,7 @@ impl OkErr for StreamCore { L: FnMut(C::Item<'_>) -> Result+'static { let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope()); + builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); let (mut output1, stream1) = builder.new_output(); diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index b8c7b7415..754e9182c 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -42,6 +42,7 @@ impl Partition for StreamCore { F: FnMut(C::Item<'_>) -> (u64, D2) + 'static, { let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope()); + builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); let mut outputs = Vec::with_capacity(parts as usize);