Skip to content

Commit ec05777

Browse files
committed
Cleanup, use MergerChunk to clear in reduce
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 076e5dd commit ec05777

File tree

7 files changed

+19
-20
lines changed

7 files changed

+19
-20
lines changed

differential-dataflow/examples/columnar.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ pub mod batcher {
392392
ready: VecDeque<C>,
393393
}
394394

395-
impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
395+
impl<C: Container> ContainerBuilder for Chunker<C> {
396396
type Container = C;
397397

398398
fn extract(&mut self) -> Option<&mut Self::Container> {

differential-dataflow/src/collection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl<G: Scope, D, R, C> Collection<G, D, R, C> {
6868
Collection { inner: stream, phantom: std::marker::PhantomData }
6969
}
7070
}
71-
impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
71+
impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
7272
/// Creates a new collection accumulating the contents of the two collections.
7373
///
7474
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
@@ -686,7 +686,7 @@ where
686686
G: Scope,
687687
D: Data,
688688
R: Semigroup + 'static,
689-
C: Container + Clone + 'static,
689+
C: Container,
690690
I: IntoIterator<Item=Collection<G, D, R, C>>,
691691
{
692692
scope

differential-dataflow/src/operators/arrange/arrangement.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::difference::Semigroup;
3131
use crate::lattice::Lattice;
3232
use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
3333
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine};
34+
use crate::trace::implementations::merge_batcher::container::MergerChunk;
3435

3536
use trace::wrappers::enter::{TraceEnter, BatchEnter,};
3637
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
@@ -293,7 +294,7 @@ where
293294
Time=T1::Time,
294295
Diff: Abelian,
295296
>+'static,
296-
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
297+
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
297298
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
298299
{
299300
self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
@@ -315,7 +316,7 @@ where
315316
ValOwn: Data,
316317
Time=T1::Time,
317318
>+'static,
318-
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
319+
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
319320
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
320321
{
321322
use crate::operators::reduce::reduce_trace;
@@ -393,7 +394,7 @@ pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P
393394
where
394395
G: Scope<Timestamp: Lattice>,
395396
P: ParallelizationContract<G::Timestamp, Ba::Input>,
396-
Ba: Batcher<Time=G::Timestamp,Input: Container + Clone + 'static> + 'static,
397+
Ba: Batcher<Time=G::Timestamp,Input: Container> + 'static,
397398
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
398399
Tr: Trace<Time=G::Timestamp>+'static,
399400
{

differential-dataflow/src/operators/iterate.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,15 +156,15 @@ where
156156
G: Scope<Timestamp: Lattice>,
157157
D: Data,
158158
R: Abelian + 'static,
159-
C: Container + Clone + 'static,
159+
C: Container,
160160
{
161161
collection: Collection<G, D, R, C>,
162162
feedback: Handle<G, C>,
163163
source: Option<Collection<G, D, R, C>>,
164164
step: <G::Timestamp as Timestamp>::Summary,
165165
}
166166

167-
impl<G, D: Data, R: Abelian, C: Container + Clone + 'static> Variable<G, D, R, C>
167+
impl<G, D: Data, R: Abelian, C: Container> Variable<G, D, R, C>
168168
where
169169
G: Scope<Timestamp: Lattice>,
170170
StreamCore<G, C>: crate::operators::Negate<G, C> + ResultsIn<G, C>,
@@ -218,7 +218,7 @@ where
218218
}
219219
}
220220

221-
impl<G: Scope<Timestamp: Lattice>, D: Data, R: Abelian, C: Container + Clone + 'static> Deref for Variable<G, D, R, C> {
221+
impl<G: Scope<Timestamp: Lattice>, D: Data, R: Abelian, C: Container> Deref for Variable<G, D, R, C> {
222222
type Target = Collection<G, D, R, C>;
223223
fn deref(&self) -> &Self::Target {
224224
&self.collection
@@ -236,14 +236,14 @@ where
236236
G: Scope<Timestamp: Lattice>,
237237
D: Data,
238238
R: Semigroup + 'static,
239-
C: Container + Clone + 'static,
239+
C: Container,
240240
{
241241
collection: Collection<G, D, R, C>,
242242
feedback: Handle<G, C>,
243243
step: <G::Timestamp as Timestamp>::Summary,
244244
}
245245

246-
impl<G, D: Data, R: Semigroup, C: Container+Clone> SemigroupVariable<G, D, R, C>
246+
impl<G, D: Data, R: Semigroup, C: Container> SemigroupVariable<G, D, R, C>
247247
where
248248
G: Scope<Timestamp: Lattice>,
249249
StreamCore<G, C>: ResultsIn<G, C>,
@@ -267,7 +267,7 @@ where
267267
}
268268
}
269269

270-
impl<G: Scope, D: Data, R: Semigroup, C: Container+Clone+'static> Deref for SemigroupVariable<G, D, R, C> where G::Timestamp: Lattice {
270+
impl<G: Scope, D: Data, R: Semigroup, C: Container> Deref for SemigroupVariable<G, D, R, C> where G::Timestamp: Lattice {
271271
type Target = Collection<G, D, R, C>;
272272
fn deref(&self) -> &Self::Target {
273273
&self.collection

differential-dataflow/src/operators/join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ where
362362
T1: TraceReader+Clone+'static,
363363
T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
364364
L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff,&mut JoinSession<T1::Time, CB, CB::Container>)+'static,
365-
CB: ContainerBuilder + 'static,
365+
CB: ContainerBuilder,
366366
{
367367
// Rename traces for symmetry from here on out.
368368
let mut trace1 = arranged1.trace.clone();

differential-dataflow/src/operators/reduce.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
//! to the key and the list of values.
66
//! The function is expected to populate a list of output values.
77
8-
use timely::Container;
98
use timely::container::PushInto;
109
use crate::hashable::Hashable;
1110
use crate::{Data, ExchangeData, Collection};
@@ -25,7 +24,7 @@ use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Descripti
2524
use crate::trace::cursor::CursorList;
2625
use crate::trace::implementations::{KeySpine, KeyBuilder, ValSpine, ValBuilder};
2726
use crate::trace::implementations::containers::BatchContainer;
28-
27+
use crate::trace::implementations::merge_batcher::container::MergerChunk;
2928
use crate::trace::TraceReader;
3029

3130
/// Extension trait for the `reduce` differential dataflow method.
@@ -315,7 +314,7 @@ where
315314
G: Scope<Timestamp=T1::Time>,
316315
T1: TraceReader<KeyOwn: Ord> + Clone + 'static,
317316
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static,
318-
Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
317+
Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
319318
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
320319
{
321320
let mut result_trace = None;
@@ -533,8 +532,7 @@ where
533532
for (val, time, diff) in buffers[index].1.drain(..) {
534533
buffer.push_into(((T1::owned_key(key), val), time, diff));
535534
builders[index].push(&mut buffer);
536-
// TODO: Clear!
537-
// buffer.clear();
535+
buffer.clear();
538536
}
539537
}
540538
}

differential-dataflow/src/trace/implementations/merge_batcher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub struct MergeBatcher<Input, C, M: Merger> {
4848

4949
impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
5050
where
51-
C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<&'a mut Input>,
51+
C: ContainerBuilder<Container=M::Chunk> + for<'a> PushInto<&'a mut Input>,
5252
M: Merger<Time: Timestamp>,
5353
{
5454
type Input = Input;
@@ -325,7 +325,7 @@ pub mod container {
325325

326326
impl<MC, CQ> Merger for ContainerMerger<MC, CQ>
327327
where
328-
for<'a> MC: MergerChunk<TimeOwned: Ord + PartialOrder + Data> + Clone + Default + PushInto<<MC as DrainContainer>::Item<'a>> + 'static,
328+
for<'a> MC: MergerChunk<TimeOwned: Ord + PartialOrder + Data> + Clone + PushInto<<MC as DrainContainer>::Item<'a>> + 'static,
329329
CQ: ContainerQueue<MC>,
330330
{
331331
type Time = MC::TimeOwned;

0 commit comments

Comments
 (0)