Skip to content

Commit 3f78d6b

Browse files
committed
Remove WithProgress::is_empty
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 0cdf9de commit 3f78d6b

File tree

5 files changed

+13
-30
lines changed

5 files changed

+13
-30
lines changed

container/src/lib.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::collections::VecDeque;
66

77
/// A type representing progress, with an update count.
88
///
9-
/// It describes its update count (`count()`) and whether it is empty (`is_empty()`).
9+
/// It describes its update count (`count()`).
1010
///
1111
/// We require [`Default`] for convenience purposes.
1212
pub trait WithProgress: Default {
@@ -17,12 +17,6 @@ pub trait WithProgress: Default {
1717
/// imposed on the [`CountPreservingContainerBuilder`] trait, whose implementors
1818
/// must preserve the number of items.
1919
fn count(&self) -> usize;
20-
21-
/// Determine if the container contains any elements, corresponding to `count() == 0`.
22-
#[inline(always)]
23-
fn is_empty(&self) -> bool {
24-
self.count() == 0
25-
}
2620
}
2721

2822
/// A container that can reveal its contents through iterating by reference and draining.
@@ -90,7 +84,7 @@ pub trait PushInto<T> {
9084
/// decide to represent a push order for `extract` and `finish`, or not.
9185
pub trait ContainerBuilder: Default + 'static {
9286
/// The container type we're building.
93-
type Container: WithProgress + Default + Clone + 'static;
87+
type Container: WithProgress + Clone + 'static;
9488
/// Extract assembled containers, potentially leaving unfinished data behind. Can
9589
/// be called repeatedly, for example while the caller can send data.
9690
///
@@ -204,7 +198,7 @@ impl<C: WithProgress + Clone + 'static> ContainerBuilder for CapacityContainerBu
204198

205199
#[inline]
206200
fn finish(&mut self) -> Option<&mut C> {
207-
if !self.current.is_empty() {
201+
if self.current.count() > 0 {
208202
self.pending.push_back(std::mem::take(&mut self.current));
209203
}
210204
self.empty = self.pending.pop_front();
@@ -337,15 +331,12 @@ pub mod buffer {
337331

338332
impl<T> WithProgress for Vec<T> {
339333
#[inline(always)] fn count(&self) -> usize { self.len() }
340-
#[inline(always)] fn is_empty(&self) -> bool { Vec::is_empty(self) }
341334
}
342335

343336
impl<T: WithProgress> WithProgress for std::rc::Rc<T> {
344337
#[inline(always)] fn count(&self) -> usize { self.as_ref().count() }
345-
#[inline(always)] fn is_empty(&self) -> bool { self.as_ref().is_empty() }
346338
}
347339

348340
impl<T: WithProgress> WithProgress for std::sync::Arc<T> {
349341
#[inline(always)] fn count(&self) -> usize { self.as_ref().count() }
350-
#[inline(always)] fn is_empty(&self) -> bool { self.as_ref().is_empty() }
351342
}

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! with the performance of batched sends.
33
44
use crate::communication::Push;
5-
use crate::container::{ContainerBuilder, CapacityContainerBuilder, WithProgress, PushInto, SizableContainer};
5+
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto, SizableContainer};
66
use crate::dataflow::channels::Message;
77
use crate::dataflow::operators::Capability;
88
use crate::progress::Timestamp;
@@ -109,11 +109,9 @@ impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB,
109109
// buffer always requires a container builder. We could expose the buffer's underlying pusher
110110
// directly, but this would bypass the buffer's time tracking.
111111
fn give_container(&mut self, container: &mut CB::Container) {
112-
if !container.is_empty() {
113-
self.flush();
114-
let time = self.time.as_ref().unwrap().clone();
115-
Message::push_at(container, time, &mut self.pusher);
116-
}
112+
self.flush();
113+
let time = self.time.as_ref().unwrap().clone();
114+
Message::push_at(container, time, &mut self.pusher);
117115
}
118116

119117
/// An internal implementation of push that should only be called by sessions.

timely/src/dataflow/operators/core/capture/extract.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,7 @@ where
7474
for datum in to_sort.into_iter() {
7575
sorted.push_into(datum);
7676
}
77-
if !sorted.is_empty() {
78-
result.push((time, sorted));
79-
}
77+
result.push((time, sorted));
8078
}
8179
result
8280
}

timely/src/dataflow/operators/core/filter.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ where
3030
fn filter<P: FnMut(&C::Item<'_>)->bool+'static>(&self, mut predicate: P) -> StreamCore<G, C> {
3131
self.unary(Pipeline, "Filter", move |_,_| move |input, output| {
3232
input.for_each(|time, data| {
33-
if !data.is_empty() {
34-
output.session(&time).give_iterator(data.drain().filter(&mut predicate));
35-
}
33+
output.session(&time).give_iterator(data.drain().filter(&mut predicate));
3634
});
3735
})
3836
}

timely/src/dataflow/operators/core/input.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::rc::Rc;
44
use std::cell::RefCell;
55

6-
use crate::container::{CapacityContainerBuilder, ContainerBuilder, SizableContainer, WithProgress, PushInto};
6+
use crate::container::{CapacityContainerBuilder, ContainerBuilder, SizableContainer, PushInto};
77

88
use crate::scheduling::{Schedule, Activator};
99

@@ -439,11 +439,9 @@ impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
439439
/// });
440440
/// ```
441441
pub fn send_batch(&mut self, buffer: &mut CB::Container) {
442-
if !buffer.is_empty() {
443-
// flush buffered elements to ensure local fifo.
444-
self.flush();
445-
Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
446-
}
442+
// flush buffered elements to ensure local fifo.
443+
self.flush();
444+
Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
447445
}
448446

449447
/// Advances the current epoch to `next`.

0 commit comments

Comments
 (0)