Skip to content

Commit 44bdd05

Browse files
committed
Address feedback
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent f58f15e commit 44bdd05

File tree

6 files changed

+30
-35
lines changed

6 files changed

+30
-35
lines changed

src/operators/arrange/arrangement.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ where
7575

7676
use ::timely::dataflow::scopes::Child;
7777
use ::timely::progress::timestamp::Refines;
78-
use timely::container::{PushContainer, PushInto};
78+
use timely::Container;
79+
use timely::container::PushInto;
7980

8081
impl<G, Tr> Arranged<G, Tr>
8182
where
@@ -293,7 +294,7 @@ where
293294
F: Fn(T2::Val<'_>) -> V + 'static,
294295
T2::Diff: Abelian,
295296
T2::Batch: Batch,
296-
<T2::Builder as Builder>::Input: PushContainer,
297+
<T2::Builder as Builder>::Input: Container,
297298
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
298299
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
299300
{
@@ -313,7 +314,7 @@ where
313314
V: Data,
314315
F: Fn(T2::Val<'_>) -> V + 'static,
315316
T2::Batch: Batch,
316-
<T2::Builder as Builder>::Input: PushContainer,
317+
<T2::Builder as Builder>::Input: Container,
317318
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
318319
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
319320
{

src/operators/reduce.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
//! to the key and the list of values.
66
//! The function is expected to populate a list of output values.
77
8-
use timely::container::{PushContainer, PushInto};
8+
use timely::Container;
9+
use timely::container::PushInto;
910
use crate::hashable::Hashable;
1011
use crate::{Data, ExchangeData, Collection};
1112
use crate::difference::{Semigroup, Abelian};
@@ -313,7 +314,7 @@ where
313314
V: Data,
314315
F: Fn(T2::Val<'_>) -> V + 'static,
315316
T2::Batch: Batch,
316-
<T2::Builder as Builder>::Input: PushContainer,
317+
<T2::Builder as Builder>::Input: Container,
317318
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
318319
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
319320
{
@@ -450,12 +451,14 @@ where
450451
// TODO: It would be better if all updates went into one batch, but timely dataflow prevents
451452
// this as long as it requires that there is only one capability for each message.
452453
let mut buffers = Vec::<(G::Timestamp, Vec<(V, G::Timestamp, T2::Diff)>)>::new();
453-
let mut chains = Vec::new();
454+
let mut builders = Vec::new();
454455
for cap in capabilities.iter() {
455456
buffers.push((cap.time().clone(), Vec::new()));
456-
chains.push(Default::default());
457+
builders.push(T2::Builder::new());
457458
}
458459

460+
let mut buffer = Default::default();
461+
459462
// cursors for navigating input and output traces.
460463
let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
461464
let source_storage = &source_storage;
@@ -533,8 +536,9 @@ where
533536
for index in 0 .. buffers.len() {
534537
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
535538
for (val, time, diff) in buffers[index].1.drain(..) {
536-
// TODO(antiguru): This is dumb. Need to stage data and then reveal it.
537-
((key.into_owned(), val), time, diff).push_into(&mut chains[index]);
539+
((key.into_owned(), val), time, diff).push_into(&mut buffer);
540+
builders[index].push(&mut buffer);
541+
buffer.clear();
538542
}
539543
}
540544
}
@@ -546,10 +550,8 @@ where
546550
output_lower.extend(lower_limit.borrow().iter().cloned());
547551

548552
// build and ship each batch (because only one capability per message).
549-
for (index, mut chain) in chains.drain(..).enumerate() {
550-
let mut builder = T2::Builder::new();
551-
// TODO(antiguru): Form actual chains.
552-
builder.push(&mut chain);
553+
for (index, builder) in builders.drain(..).enumerate() {
554+
553555
// Form the upper limit of the next batch, which includes all times greater
554556
// than the input batch, or the capabilities from i + 1 onward.
555557
output_upper.clear();

src/trace/implementations/mod.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ impl<K,V,T,R> Update for Preferred<K, V, T, R>
161161
where
162162
K: ToOwned + ?Sized,
163163
K::Owned: Ord+Clone+'static,
164-
V: ToOwned + ?Sized + 'static,
165-
V::Owned: Ord+Clone,
164+
V: ToOwned + ?Sized,
165+
V::Owned: Ord+Clone+'static,
166166
T: Ord+Lattice+timely::progress::Timestamp+Clone,
167167
R: Semigroup+Clone,
168168
{
@@ -177,8 +177,8 @@ where
177177
K: Ord+ToOwned+PreferredContainer + ?Sized,
178178
K::Owned: Ord+Clone+'static,
179179
// for<'a> K::Container: BatchContainer<ReadItem<'a> = &'a K>,
180-
V: Ord+ToOwned+PreferredContainer + ?Sized + 'static,
181-
V::Owned: Ord+Clone,
180+
V: Ord+ToOwned+PreferredContainer + ?Sized,
181+
V::Owned: Ord+Clone+'static,
182182
T: Ord+Lattice+timely::progress::Timestamp+Clone,
183183
D: Semigroup+Clone,
184184
{
@@ -192,6 +192,7 @@ where
192192
use std::convert::TryInto;
193193
use std::ops::Deref;
194194
use abomonation_derive::Abomonation;
195+
use timely::Container;
195196
use timely::container::PushInto;
196197
use timely::progress::Timestamp;
197198
use crate::trace::cursor::MyTrait;
@@ -364,9 +365,7 @@ impl BatchContainer for OffsetList {
364365
}
365366

366367
/// Behavior to split an update into principal components.
367-
pub trait BuilderInput<L: Layout> {
368-
/// The item to break apart.
369-
type Item<'a>;
368+
pub trait BuilderInput<L: Layout>: Container {
370369
/// Key portion
371370
type Key<'a>: Ord;
372371
/// Value portion
@@ -393,7 +392,6 @@ where
393392
T: Timestamp + Lattice + Clone + 'static,
394393
R: Semigroup + Clone + 'static,
395394
{
396-
type Item<'a> = ((K, V), T, R);
397395
type Key<'a> = K;
398396
type Val<'a> = V;
399397
type Time = T;
@@ -419,7 +417,6 @@ where
419417
T: Timestamp + Lattice + Columnation + Clone + 'static,
420418
R: Semigroup + Columnation + Clone + 'static,
421419
{
422-
type Item<'a> = &'a ((K, V), T, R);
423420
type Key<'a> = &'a K;
424421
type Val<'a> = &'a V;
425422
type Time = T;
@@ -442,12 +439,11 @@ impl<K,V,T,R> BuilderInput<Preferred<K, V, T, R>> for TimelyStack<((<K as ToOwne
442439
where
443440
K: Ord+ToOwned+PreferredContainer + ?Sized,
444441
K::Owned: Columnation + Ord+Clone+'static,
445-
V: Ord+ToOwned+PreferredContainer + ?Sized + 'static,
446-
V::Owned: Columnation + Ord+Clone,
447-
T: Columnation + Ord+Lattice+timely::progress::Timestamp+Clone,
442+
V: Ord+ToOwned+PreferredContainer + ?Sized,
443+
V::Owned: Columnation + Ord+Clone+'static,
444+
T: Columnation + Ord+Lattice+Timestamp+Clone,
448445
R: Columnation + Semigroup+Clone,
449446
{
450-
type Item<'a> = &'a ((<K as ToOwned>::Owned, <V as ToOwned>::Owned), T, R);
451447
type Key<'a> = &'a K::Owned;
452448
type Val<'a> = &'a V::Owned;
453449
type Time = T;

src/trace/implementations/ord_neu.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ mod val_batch {
6969

7070
use std::marker::PhantomData;
7171
use abomonation_derive::Abomonation;
72-
use timely::Container;
7372
use timely::container::PushInto;
7473
use timely::progress::{Antichain, frontier::AntichainRef};
7574

@@ -544,7 +543,7 @@ mod val_batch {
544543
impl<L, CI> Builder for OrdValBuilder<L, CI>
545544
where
546545
L: Layout,
547-
CI: Container + for<'a> BuilderInput<L, Item<'a> = <CI as Container>::Item<'a>, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
546+
CI: for<'a> BuilderInput<L, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
548547
for<'a> CI::Key<'a>: PushInto<L::KeyContainer>,
549548
for<'a> CI::Val<'a>: PushInto<L::ValContainer>,
550549
{
@@ -618,7 +617,6 @@ mod key_batch {
618617

619618
use std::marker::PhantomData;
620619
use abomonation_derive::Abomonation;
621-
use timely::Container;
622620
use timely::container::PushInto;
623621
use timely::progress::{Antichain, frontier::AntichainRef};
624622

@@ -990,7 +988,7 @@ mod key_batch {
990988
impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
991989
where
992990
L: Layout,
993-
CI: Container + for<'a> BuilderInput<L, Item<'a> = <CI as Container>::Item<'a>, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
991+
CI: for<'a> BuilderInput<L, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
994992
for<'a> CI::Key<'a>: PushInto<L::KeyContainer>,
995993
{
996994

src/trace/implementations/rhh.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ mod val_batch {
8080
use std::convert::TryInto;
8181
use std::marker::PhantomData;
8282
use abomonation_derive::Abomonation;
83-
use timely::Container;
8483
use timely::container::PushInto;
8584
use timely::progress::{Antichain, frontier::AntichainRef};
8685

@@ -740,7 +739,7 @@ mod val_batch {
740739
where
741740
<L::Target as Update>::Key: Default + HashOrdered,
742741
// RhhValBatch<L>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
743-
CI: Container + for<'a> BuilderInput<L, Item<'a> = <CI as Container>::Item<'a>, Key<'a> = <L::Target as Update>::Key, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
742+
CI: for<'a> BuilderInput<L, Key<'a> = <L::Target as Update>::Key, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
744743
for<'a> CI::Val<'a>: PushInto<L::ValContainer>,
745744
{
746745
type Input = CI;

src/trace/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,10 +336,9 @@ pub trait Builder: Sized {
336336
///
337337
/// They represent respectively the number of distinct `key`, `(key, val)`, and total updates.
338338
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self;
339-
/// Adds an element to the batch.
339+
/// Adds a chunk of elements to the batch.
340340
///
341-
/// The default implementation uses `self.copy` with references to the owned arguments.
342-
/// One should override it if the builder can take advantage of owned arguments.
341+
/// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state.
343342
fn push(&mut self, chunk: &mut Self::Input);
344343
/// Completes building and returns the batch.
345344
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output;

0 commit comments

Comments
 (0)