Skip to content

Commit e0f4c40

Browse files
committed
Pass data from batcher to builder by chain
Currently, the data shared between the batcher and the builder are individual tuples, either moved or by reference. This limits flexibility around what kind of data can be provided to a builder, i.e., it has to be in the form of tuples, either owned or a reference to a fully-formed one. This works fine for vector-like structures, but will not work for containers that like to arrange their data differently. This change alters the contract between the batcher and the builder to provide chunks instead of individual items (it does not require _chains_.) The data in the chunks must be sorted, and subsequent calls must maintain order, too. The input containers need to implement `BuilderInput`, a type that describes how a container's items can be broken into key, value, time, and diff, where key and value can be references or owned data, as long as they can be pushed into the underlying key and value containers. The change has some quirks around comparing keys to keys already in the builder. The types can differ, and the best solution I could come up with was to add two explicit comparison functions to `BuilderInput` to compare keys and values. While it is not elegant, it allows us to move forward with this change, without adding nightmare-inducing trait bounds all-over. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 9731d7f commit e0f4c40

File tree

9 files changed

+300
-187
lines changed

9 files changed

+300
-187
lines changed

src/operators/arrange/arrangement.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ where
7575

7676
use ::timely::dataflow::scopes::Child;
7777
use ::timely::progress::timestamp::Refines;
78+
use timely::container::{PushContainer, PushInto};
7879

7980
impl<G, Tr> Arranged<G, Tr>
8081
where
@@ -292,7 +293,8 @@ where
292293
F: Fn(T2::Val<'_>) -> V + 'static,
293294
T2::Diff: Abelian,
294295
T2::Batch: Batch,
295-
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
296+
<T2::Builder as Builder>::Input: PushContainer,
297+
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
296298
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
297299
{
298300
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
@@ -311,7 +313,8 @@ where
311313
V: Data,
312314
F: Fn(T2::Val<'_>) -> V + 'static,
313315
T2::Batch: Batch,
314-
T2::Builder: Builder<Input = ((T1::KeyOwned,V), T2::Time, T2::Diff)>,
316+
<T2::Builder as Builder>::Input: PushContainer,
317+
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
315318
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
316319
{
317320
use crate::operators::reduce::reduce_trace;

src/operators/arrange/upsert.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ where
138138
F: Fn(Tr::Val<'_>) -> V + 'static,
139139
Tr::Time: TotalOrder+ExchangeData,
140140
Tr::Batch: Batch,
141-
Tr::Builder: Builder<Input = ((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>,
141+
Tr::Builder: Builder<Input = Vec<((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>>,
142142
{
143143
let mut reader: Option<TraceAgent<Tr>> = None;
144144

@@ -282,9 +282,7 @@ where
282282
}
283283
// Must insert updates in (key, val, time) order.
284284
updates.sort();
285-
for update in updates.drain(..) {
286-
builder.push(update);
287-
}
285+
builder.push(&mut updates);
288286
}
289287
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
290288
prev_frontier.clone_from(&upper);

src/operators/reduce.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
//! to the key and the list of values.
66
//! The function is expected to populate a list of output values.
77
8+
use timely::container::{PushContainer, PushInto};
89
use crate::hashable::Hashable;
910
use crate::{Data, ExchangeData, Collection};
1011
use crate::difference::{Semigroup, Abelian};
@@ -252,7 +253,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
252253
F: Fn(T2::Val<'_>) -> V + 'static,
253254
T2::Diff: Abelian,
254255
T2::Batch: Batch,
255-
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
256+
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
256257
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
257258
{
258259
self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| {
@@ -274,7 +275,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
274275
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
275276
F: Fn(T2::Val<'_>) -> V + 'static,
276277
T2::Batch: Batch,
277-
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
278+
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
278279
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
279280
;
280281
}
@@ -293,7 +294,7 @@ where
293294
F: Fn(T2::Val<'_>) -> V + 'static,
294295
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
295296
T2::Batch: Batch,
296-
T2::Builder: Builder<Input = ((K, V), T2::Time, T2::Diff)>,
297+
T2::Builder: Builder<Input = Vec<((K, V), T2::Time, T2::Diff)>>,
297298
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
298299
{
299300
self.arrange_by_key_named(&format!("Arrange: {}", name))
@@ -312,7 +313,8 @@ where
312313
V: Data,
313314
F: Fn(T2::Val<'_>) -> V + 'static,
314315
T2::Batch: Batch,
315-
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
316+
<T2::Builder as Builder>::Input: PushContainer,
317+
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
316318
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
317319
{
318320
let mut result_trace = None;
@@ -448,10 +450,10 @@ where
448450
// TODO: It would be better if all updates went into one batch, but timely dataflow prevents
449451
// this as long as it requires that there is only one capability for each message.
450452
let mut buffers = Vec::<(G::Timestamp, Vec<(V, G::Timestamp, T2::Diff)>)>::new();
451-
let mut builders = Vec::new();
453+
let mut chains = Vec::new();
452454
for cap in capabilities.iter() {
453455
buffers.push((cap.time().clone(), Vec::new()));
454-
builders.push(T2::Builder::new());
456+
chains.push(Default::default());
455457
}
456458

457459
// cursors for navigating input and output traces.
@@ -531,7 +533,8 @@ where
531533
for index in 0 .. buffers.len() {
532534
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
533535
for (val, time, diff) in buffers[index].1.drain(..) {
534-
builders[index].push(((key.into_owned(), val), time, diff));
536+
// TODO(antiguru): This is dumb. Need to stage data and then reveal it.
537+
((key.into_owned(), val), time, diff).push_into(&mut chains[index]);
535538
}
536539
}
537540
}
@@ -543,8 +546,10 @@ where
543546
output_lower.extend(lower_limit.borrow().iter().cloned());
544547

545548
// build and ship each batch (because only one capability per message).
546-
for (index, builder) in builders.drain(..).enumerate() {
547-
549+
for (index, mut chain) in chains.drain(..).enumerate() {
550+
let mut builder = T2::Builder::new();
551+
// TODO(antiguru): Form actual chains.
552+
builder.push(&mut chain);
548553
// Form the upper limit of the next batch, which includes all times greater
549554
// than the input batch, or the capabilities from i + 1 onward.
550555
output_upper.clear();
@@ -648,7 +653,7 @@ where
648653
where
649654
F: Fn(C2::Val<'_>) -> V,
650655
L: FnMut(
651-
C1::Key<'a>,
656+
C1::Key<'a>,
652657
&[(C1::Val<'a>, C1::Diff)],
653658
&mut Vec<(V, C2::Diff)>,
654659
&mut Vec<(V, C2::Diff)>,
@@ -728,7 +733,7 @@ mod history_replay {
728733
where
729734
F: Fn(C2::Val<'_>) -> V,
730735
L: FnMut(
731-
C1::Key<'a>,
736+
C1::Key<'a>,
732737
&[(C1::Val<'a>, C1::Diff)],
733738
&mut Vec<(V, C2::Diff)>,
734739
&mut Vec<(V, C2::Diff)>,
@@ -1020,7 +1025,7 @@ mod history_replay {
10201025
new_interesting.push(next_time.clone());
10211026
debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
10221027
}
1023-
1028+
10241029

10251030
// Update `meet` to track the meet of each source of times.
10261031
meet = None;//T::maximum();

src/trace/implementations/merge_batcher.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ where
282282
type Time = T;
283283
type Input = Vec<((K, V), T, R)>;
284284
type Chunk = Vec<((K, V), T, R)>;
285-
type Output = ((K, V), T, R);
285+
type Output = Vec<((K, V), T, R)>;
286286

287287
fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
288288
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
@@ -497,8 +497,8 @@ where
497497
}
498498
let mut builder = B::with_capacity(keys, vals, upds);
499499

500-
for datum in chain.drain(..).flatten() {
501-
builder.push(datum);
500+
for mut chunk in chain.drain(..) {
501+
builder.push(&mut chunk);
502502
}
503503

504504
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())

src/trace/implementations/merge_batcher_col.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ where
6767
type Time = T;
6868
type Input = Vec<((K, V), T, R)>;
6969
type Chunk = TimelyStack<((K, V), T, R)>;
70-
type Output = ((K, V), T, R);
70+
type Output = TimelyStack<((K, V), T, R)>;
7171

7272
fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
7373
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
@@ -290,11 +290,8 @@ where
290290
}
291291
}
292292
let mut builder = B::with_capacity(keys, vals, upds);
293-
294-
for chunk in chain.drain(..) {
295-
for datum in chunk.iter() {
296-
builder.copy(datum);
297-
}
293+
for mut chunk in chain.drain(..) {
294+
builder.push(&mut chunk);
298295
}
299296

300297
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())

0 commit comments

Comments
 (0)