Skip to content

Commit be56290

Browse files
Merge pull request #678 from antiguru/notify_less
Disable notifications for some operators
2 parents c453fe8 + c1593e6 commit be56290

File tree

5 files changed

+6
-0
lines changed

5 files changed

+6
-0
lines changed

timely/src/dataflow/operators/branch.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
4040
condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
4141
) -> (Stream<S, D>, Stream<S, D>) {
4242
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
43+
builder.set_notify(false);
4344

4445
let mut input = builder.new_input(self, Pipeline);
4546
let (mut output1, stream1) = builder.new_output();
@@ -95,6 +96,7 @@ pub trait BranchWhen<T>: Sized {
9596
impl<S: Scope, C: Container + Data> BranchWhen<S::Timestamp> for StreamCore<S, C> {
9697
fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
9798
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
99+
builder.set_notify(false);
98100

99101
let mut input = builder.new_input(self, Pipeline);
100102
let (mut output1, stream1) = builder.new_output();

timely/src/dataflow/operators/core/concat.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ impl<G: Scope, C: Container + Data> Concatenate<G, C> for G {
7171
// create an operator builder.
7272
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7373
let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
74+
builder.set_notify(false);
7475

7576
// create new input handles for each input stream.
7677
let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::<Vec<_>>();

timely/src/dataflow/operators/core/feedback.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ impl<G: Scope> Feedback<G> for G {
7272
fn feedback<C: Container + Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>) {
7373

7474
let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
75+
builder.set_notify(false);
7576
let (output, stream) = builder.new_output();
7677

7778
(Handle { builder, summary, output }, stream)

timely/src/dataflow/operators/core/ok_err.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ impl<S: Scope, C: Container + Data> OkErr<S, C> for StreamCore<S, C> {
5050
L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
5151
{
5252
let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
53+
builder.set_notify(false);
5354

5455
let mut input = builder.new_input(self, Pipeline);
5556
let (mut output1, stream1) = builder.new_output();

timely/src/dataflow/operators/core/partition.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
4242
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
4343
{
4444
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
45+
builder.set_notify(false);
4546

4647
let mut input = builder.new_input(self, Pipeline);
4748
let mut outputs = Vec::with_capacity(parts as usize);

0 commit comments

Comments
 (0)