Skip to content

Commit 2834002

Browse files
committed
Address feedback
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent e0f4c40 commit 2834002

File tree

6 files changed

+30
-35
lines changed

6 files changed

+30
-35
lines changed

src/operators/arrange/arrangement.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ where
7575

7676
use ::timely::dataflow::scopes::Child;
7777
use ::timely::progress::timestamp::Refines;
78-
use timely::container::{PushContainer, PushInto};
78+
use timely::Container;
79+
use timely::container::PushInto;
7980

8081
impl<G, Tr> Arranged<G, Tr>
8182
where
@@ -293,7 +294,7 @@ where
293294
F: Fn(T2::Val<'_>) -> V + 'static,
294295
T2::Diff: Abelian,
295296
T2::Batch: Batch,
296-
<T2::Builder as Builder>::Input: PushContainer,
297+
<T2::Builder as Builder>::Input: Container,
297298
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
298299
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
299300
{
@@ -313,7 +314,7 @@ where
313314
V: Data,
314315
F: Fn(T2::Val<'_>) -> V + 'static,
315316
T2::Batch: Batch,
316-
<T2::Builder as Builder>::Input: PushContainer,
317+
<T2::Builder as Builder>::Input: Container,
317318
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
318319
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
319320
{

src/operators/reduce.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
//! to the key and the list of values.
66
//! The function is expected to populate a list of output values.
77
8-
use timely::container::{PushContainer, PushInto};
8+
use timely::Container;
9+
use timely::container::PushInto;
910
use crate::hashable::Hashable;
1011
use crate::{Data, ExchangeData, Collection};
1112
use crate::difference::{Semigroup, Abelian};
@@ -313,7 +314,7 @@ where
313314
V: Data,
314315
F: Fn(T2::Val<'_>) -> V + 'static,
315316
T2::Batch: Batch,
316-
<T2::Builder as Builder>::Input: PushContainer,
317+
<T2::Builder as Builder>::Input: Container,
317318
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
318319
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
319320
{
@@ -450,12 +451,14 @@ where
450451
// TODO: It would be better if all updates went into one batch, but timely dataflow prevents
451452
// this as long as it requires that there is only one capability for each message.
452453
let mut buffers = Vec::<(G::Timestamp, Vec<(V, G::Timestamp, T2::Diff)>)>::new();
453-
let mut chains = Vec::new();
454+
let mut builders = Vec::new();
454455
for cap in capabilities.iter() {
455456
buffers.push((cap.time().clone(), Vec::new()));
456-
chains.push(Default::default());
457+
builders.push(T2::Builder::new());
457458
}
458459

460+
let mut buffer = Default::default();
461+
459462
// cursors for navigating input and output traces.
460463
let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
461464
let source_storage = &source_storage;
@@ -533,8 +536,9 @@ where
533536
for index in 0 .. buffers.len() {
534537
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
535538
for (val, time, diff) in buffers[index].1.drain(..) {
536-
// TODO(antiguru): This is dumb. Need to stage data and then reveal it.
537-
((key.into_owned(), val), time, diff).push_into(&mut chains[index]);
539+
((key.into_owned(), val), time, diff).push_into(&mut buffer);
540+
builders[index].push(&mut buffer);
541+
buffer.clear();
538542
}
539543
}
540544
}
@@ -546,10 +550,8 @@ where
546550
output_lower.extend(lower_limit.borrow().iter().cloned());
547551

548552
// build and ship each batch (because only one capability per message).
549-
for (index, mut chain) in chains.drain(..).enumerate() {
550-
let mut builder = T2::Builder::new();
551-
// TODO(antiguru): Form actual chains.
552-
builder.push(&mut chain);
553+
for (index, builder) in builders.drain(..).enumerate() {
554+
553555
// Form the upper limit of the next batch, which includes all times greater
554556
// than the input batch, or the capabilities from i + 1 onward.
555557
output_upper.clear();

src/trace/implementations/mod.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub use self::ord_neu::OrdKeySpine as KeySpine;
5454
use std::borrow::{ToOwned};
5555
use std::cmp::Ordering;
5656

57+
use timely::Container;
5758
use timely::container::columnation::{Columnation, TimelyStack};
5859
use timely::container::PushInto;
5960
use timely::progress::Timestamp;
@@ -163,8 +164,8 @@ impl<K,V,T,R> Update for Preferred<K, V, T, R>
163164
where
164165
K: ToOwned + ?Sized,
165166
K::Owned: Ord+Clone+'static,
166-
V: ToOwned + ?Sized + 'static,
167-
V::Owned: Ord+Clone,
167+
V: ToOwned + ?Sized,
168+
V::Owned: Ord+Clone+'static,
168169
T: Ord+Lattice+timely::progress::Timestamp+Clone,
169170
R: Semigroup+Clone,
170171
{
@@ -179,8 +180,8 @@ where
179180
K: Ord+ToOwned+PreferredContainer + ?Sized,
180181
K::Owned: Ord+Clone+'static,
181182
// for<'a> K::Container: BatchContainer<ReadItem<'a> = &'a K>,
182-
V: Ord+ToOwned+PreferredContainer + ?Sized + 'static,
183-
V::Owned: Ord+Clone,
183+
V: Ord+ToOwned+PreferredContainer + ?Sized,
184+
V::Owned: Ord+Clone+'static,
184185
T: Ord+Lattice+timely::progress::Timestamp+Clone,
185186
D: Semigroup+Clone,
186187
{
@@ -364,9 +365,7 @@ impl BatchContainer for OffsetList {
364365
}
365366

366367
/// Behavior to split an update into principal components.
367-
pub trait BuilderInput<L: Layout> {
368-
/// The item to break apart.
369-
type Item<'a>;
368+
pub trait BuilderInput<L: Layout>: Container {
370369
/// Key portion
371370
type Key<'a>: Ord;
372371
/// Value portion
@@ -393,7 +392,6 @@ where
393392
T: Timestamp + Lattice + Clone + 'static,
394393
R: Semigroup + Clone + 'static,
395394
{
396-
type Item<'a> = ((K, V), T, R);
397395
type Key<'a> = K;
398396
type Val<'a> = V;
399397
type Time = T;
@@ -419,7 +417,6 @@ where
419417
T: Timestamp + Lattice + Columnation + Clone + 'static,
420418
R: Semigroup + Columnation + Clone + 'static,
421419
{
422-
type Item<'a> = &'a ((K, V), T, R);
423420
type Key<'a> = &'a K;
424421
type Val<'a> = &'a V;
425422
type Time = T;
@@ -442,12 +439,11 @@ impl<K,V,T,R> BuilderInput<Preferred<K, V, T, R>> for TimelyStack<((<K as ToOwne
442439
where
443440
K: Ord+ToOwned+PreferredContainer + ?Sized,
444441
K::Owned: Columnation + Ord+Clone+'static,
445-
V: Ord+ToOwned+PreferredContainer + ?Sized + 'static,
446-
V::Owned: Columnation + Ord+Clone,
447-
T: Columnation + Ord+Lattice+timely::progress::Timestamp+Clone,
442+
V: Ord+ToOwned+PreferredContainer + ?Sized,
443+
V::Owned: Columnation + Ord+Clone+'static,
444+
T: Columnation + Ord+Lattice+Timestamp+Clone,
448445
R: Columnation + Semigroup+Clone,
449446
{
450-
type Item<'a> = &'a ((<K as ToOwned>::Owned, <V as ToOwned>::Owned), T, R);
451447
type Key<'a> = &'a K::Owned;
452448
type Val<'a> = &'a V::Owned;
453449
type Time = T;

src/trace/implementations/ord_neu.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ mod val_batch {
6969

7070
use std::marker::PhantomData;
7171
use abomonation_derive::Abomonation;
72-
use timely::Container;
7372
use timely::container::PushInto;
7473
use timely::progress::{Antichain, frontier::AntichainRef};
7574

@@ -544,7 +543,7 @@ mod val_batch {
544543
impl<L, CI> Builder for OrdValBuilder<L, CI>
545544
where
546545
L: Layout,
547-
CI: Container + for<'a> BuilderInput<L, Item<'a> = <CI as Container>::Item<'a>, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
546+
CI: for<'a> BuilderInput<L, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
548547
for<'a> CI::Key<'a>: PushInto<L::KeyContainer>,
549548
for<'a> CI::Val<'a>: PushInto<L::ValContainer>,
550549
{
@@ -618,7 +617,6 @@ mod key_batch {
618617

619618
use std::marker::PhantomData;
620619
use abomonation_derive::Abomonation;
621-
use timely::Container;
622620
use timely::container::PushInto;
623621
use timely::progress::{Antichain, frontier::AntichainRef};
624622

@@ -990,7 +988,7 @@ mod key_batch {
990988
impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
991989
where
992990
L: Layout,
993-
CI: Container + for<'a> BuilderInput<L, Item<'a> = <CI as Container>::Item<'a>, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
991+
CI: for<'a> BuilderInput<L, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
994992
for<'a> CI::Key<'a>: PushInto<L::KeyContainer>,
995993
{
996994

src/trace/implementations/rhh.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ mod val_batch {
8787
use std::convert::TryInto;
8888
use std::marker::PhantomData;
8989
use abomonation_derive::Abomonation;
90-
use timely::Container;
9190
use timely::container::PushInto;
9291
use timely::progress::{Antichain, frontier::AntichainRef};
9392

@@ -747,7 +746,7 @@ mod val_batch {
747746
where
748747
<L::Target as Update>::Key: Default + HashOrdered,
749748
// RhhValBatch<L>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
750-
CI: Container + for<'a> BuilderInput<L, Item<'a> = <CI as Container>::Item<'a>, Key<'a> = <L::Target as Update>::Key, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
749+
CI: for<'a> BuilderInput<L, Key<'a> = <L::Target as Update>::Key, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
751750
for<'a> CI::Val<'a>: PushInto<L::ValContainer>,
752751
{
753752
type Input = CI;

src/trace/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,10 +336,9 @@ pub trait Builder: Sized {
336336
///
337337
/// They represent respectively the number of distinct `key`, `(key, val)`, and total updates.
338338
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self;
339-
/// Adds an element to the batch.
339+
/// Adds a chunk of elements to the batch.
340340
///
341-
/// The default implementation uses `self.copy` with references to the owned arguments.
342-
/// One should override it if the builder can take advantage of owned arguments.
341+
/// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state.
343342
fn push(&mut self, chunk: &mut Self::Input);
344343
/// Completes building and returns the batch.
345344
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output;

0 commit comments

Comments
 (0)