Skip to content

Commit dafe288

Browse files
Move Batcher::seal to Builder (#546)
* Reorganize the boundary between batchers and builders * Graspan tidy
1 parent cf97c1a commit dafe288

File tree

8 files changed

+146
-140
lines changed

8 files changed

+146
-140
lines changed

experiments/src/bin/graspan2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ fn unoptimized() {
8686

8787
let value_flow_next =
8888
value_flow_next
89-
.arrange::<ValBatcher<_,_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
89+
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
9090
// .distinct_total_core::<Diff>()
9191
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
9292
;

src/trace/implementations/merge_batcher.rs

Lines changed: 2 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ where
4949
T: Timestamp,
5050
{
5151
type Input = Input;
52-
type Output = M::Output;
5352
type Time = T;
53+
type Output = M::Chunk;
5454

5555
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
5656
Self {
@@ -109,7 +109,7 @@ where
109109

110110
self.stash.clear();
111111

112-
let seal = M::seal::<B>(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow());
112+
let seal = B::seal(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow());
113113
self.lower = upper;
114114
seal
115115
}
@@ -204,10 +204,6 @@ where
204204
pub trait Merger: Default {
205205
/// The internal representation of chunks of data.
206206
type Chunk: Container;
207-
/// The output type
208-
/// TODO: This should be replaced by `Chunk` or another container once the builder understands
209-
/// building from a complete chain.
210-
type Output;
211207
/// The type of time in frontiers to extract updates.
212208
type Time;
213209
/// Merge chains into an output chain.
@@ -223,15 +219,6 @@ pub trait Merger: Default {
223219
stash: &mut Vec<Self::Chunk>,
224220
);
225221

226-
/// Build from a chain
227-
/// TODO: We can move this entirely to `MergeBatcher` once builders can accepts chains.
228-
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
229-
chain: &mut Vec<Self::Chunk>,
230-
lower: AntichainRef<Self::Time>,
231-
upper: AntichainRef<Self::Time>,
232-
since: AntichainRef<Self::Time>,
233-
) -> B::Output;
234-
235222
/// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
236223
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
237224
}
@@ -286,7 +273,6 @@ where
286273
{
287274
type Time = T;
288275
type Chunk = Vec<((K, V), T, R)>;
289-
type Output = Vec<((K, V), T, R)>;
290276

291277
fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
292278
let mut list1 = list1.into_iter();
@@ -401,45 +387,6 @@ where
401387
readied.push(ready);
402388
}
403389
}
404-
405-
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
406-
chain: &mut Vec<Self::Chunk>,
407-
lower: AntichainRef<Self::Time>,
408-
upper: AntichainRef<Self::Time>,
409-
since: AntichainRef<Self::Time>,
410-
) -> B::Output {
411-
let mut keys = 0;
412-
let mut vals = 0;
413-
let mut upds = 0;
414-
let mut prev_keyval = None;
415-
for buffer in chain.iter() {
416-
for ((key, val), time, _) in buffer.iter() {
417-
if !upper.less_equal(time) {
418-
if let Some((p_key, p_val)) = prev_keyval {
419-
if p_key != key {
420-
keys += 1;
421-
vals += 1;
422-
} else if p_val != val {
423-
vals += 1;
424-
}
425-
} else {
426-
keys += 1;
427-
vals += 1;
428-
}
429-
upds += 1;
430-
prev_keyval = Some((key, val));
431-
}
432-
}
433-
}
434-
let mut builder = B::with_capacity(keys, vals, upds);
435-
436-
for mut chunk in chain.drain(..) {
437-
builder.push(&mut chunk);
438-
}
439-
440-
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
441-
}
442-
443390
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
444391
(chunk.len(), 0, 0, 0)
445392
}

src/trace/implementations/merge_batcher_col.rs

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use timely::{Container, Data, PartialOrder};
88

99
use crate::difference::Semigroup;
1010
use crate::trace::implementations::merge_batcher::Merger;
11-
use crate::trace::Builder;
1211

1312
/// A merger for timely stacks
1413
pub struct ColumnationMerger<T> {
@@ -62,7 +61,6 @@ where
6261
{
6362
type Time = T;
6463
type Chunk = TimelyStack<((K, V), T, R)>;
65-
type Output = TimelyStack<((K, V), T, R)>;
6664

6765
fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
6866
let mut list1 = list1.into_iter();
@@ -183,43 +181,6 @@ where
183181
}
184182
}
185183

186-
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
187-
chain: &mut Vec<Self::Chunk>,
188-
lower: AntichainRef<Self::Time>,
189-
upper: AntichainRef<Self::Time>,
190-
since: AntichainRef<Self::Time>,
191-
) -> B::Output {
192-
let mut keys = 0;
193-
let mut vals = 0;
194-
let mut upds = 0;
195-
let mut prev_keyval = None;
196-
for buffer in chain.iter() {
197-
for ((key, val), time, _) in buffer.iter() {
198-
if !upper.less_equal(time) {
199-
if let Some((p_key, p_val)) = prev_keyval {
200-
if p_key != key {
201-
keys += 1;
202-
vals += 1;
203-
} else if p_val != val {
204-
vals += 1;
205-
}
206-
} else {
207-
keys += 1;
208-
vals += 1;
209-
}
210-
upds += 1;
211-
prev_keyval = Some((key, val));
212-
}
213-
}
214-
}
215-
let mut builder = B::with_capacity(keys, vals, upds);
216-
for mut chunk in chain.drain(..) {
217-
builder.push(&mut chunk);
218-
}
219-
220-
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
221-
}
222-
223184
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
224185
let (mut size, mut capacity, mut allocations) = (0, 0, 0);
225186
let cb = |siz, cap| {

src/trace/implementations/merge_batcher_flat.rs

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
use std::cmp::Ordering;
44
use std::marker::PhantomData;
55
use timely::progress::frontier::{Antichain, AntichainRef};
6-
use timely::{Container, Data, PartialOrder};
6+
use timely::{Data, PartialOrder};
77
use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems};
88
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
99

1010
use crate::difference::{IsZero, Semigroup};
1111
use crate::trace::implementations::merge_batcher::Merger;
12-
use crate::trace::Builder;
1312
use crate::trace::cursor::IntoOwned;
1413

1514
/// A merger for flat stacks.
@@ -110,7 +109,6 @@ where
110109
{
111110
type Time = MC::TimeOwned;
112111
type Chunk = FlatStack<MC>;
113-
type Output = FlatStack<MC>;
114112

115113
fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
116114
let mut list1 = list1.into_iter();
@@ -242,47 +240,6 @@ where
242240
}
243241
}
244242

245-
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
246-
chain: &mut Vec<Self::Chunk>,
247-
lower: AntichainRef<Self::Time>,
248-
upper: AntichainRef<Self::Time>,
249-
since: AntichainRef<Self::Time>,
250-
) -> B::Output {
251-
let mut keys = 0;
252-
let mut vals = 0;
253-
let mut upds = 0;
254-
{
255-
let mut prev_keyval = None;
256-
for buffer in chain.iter() {
257-
for (key, val, time, _diff) in buffer.iter().map(MC::into_parts) {
258-
if !upper.less_equal(&time) {
259-
if let Some((p_key, p_val)) = prev_keyval {
260-
debug_assert!(p_key <= key);
261-
debug_assert!(p_key != key || p_val <= val);
262-
if p_key != key {
263-
keys += 1;
264-
vals += 1;
265-
} else if p_val != val {
266-
vals += 1;
267-
}
268-
} else {
269-
keys += 1;
270-
vals += 1;
271-
}
272-
upds += 1;
273-
prev_keyval = Some((key, val));
274-
}
275-
}
276-
}
277-
}
278-
let mut builder = B::with_capacity(keys, vals, upds);
279-
for mut chunk in chain.drain(..) {
280-
builder.push(&mut chunk);
281-
}
282-
283-
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
284-
}
285-
286243
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
287244
let (mut size, mut capacity, mut allocations) = (0, 0, 0);
288245
let cb = |siz, cap| {

src/trace/implementations/mod.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,9 @@ pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: Container {
343343

344344
/// Test that the value equals a key in the layout's value container.
345345
fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
346+
347+
/// Count the number of distinct keys, (key, val) pairs, and total updates.
348+
fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
346349
}
347350

348351
impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
@@ -372,6 +375,31 @@ where
372375
fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
373376
VBC::reborrow(other) == this
374377
}
378+
379+
fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
380+
let mut keys = 0;
381+
let mut vals = 0;
382+
let mut upds = 0;
383+
let mut prev_keyval = None;
384+
for link in chain.iter() {
385+
for ((key, val), _, _) in link.iter() {
386+
if let Some((p_key, p_val)) = prev_keyval {
387+
if p_key != key {
388+
keys += 1;
389+
vals += 1;
390+
} else if p_val != val {
391+
vals += 1;
392+
}
393+
} else {
394+
keys += 1;
395+
vals += 1;
396+
}
397+
upds += 1;
398+
prev_keyval = Some((key, val));
399+
}
400+
}
401+
(keys, vals, upds)
402+
}
375403
}
376404

377405
impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
@@ -401,6 +429,31 @@ where
401429
fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
402430
V::reborrow(other) == *this
403431
}
432+
433+
fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
434+
let mut keys = 0;
435+
let mut vals = 0;
436+
let mut upds = 0;
437+
let mut prev_keyval = None;
438+
for link in chain.iter() {
439+
for ((key, val), _, _) in link.iter() {
440+
if let Some((p_key, p_val)) = prev_keyval {
441+
if p_key != key {
442+
keys += 1;
443+
vals += 1;
444+
} else if p_val != val {
445+
vals += 1;
446+
}
447+
} else {
448+
keys += 1;
449+
vals += 1;
450+
}
451+
upds += 1;
452+
prev_keyval = Some((key, val));
453+
}
454+
}
455+
(keys, vals, upds)
456+
}
404457
}
405458

406459
mod flatcontainer {
@@ -483,6 +536,31 @@ mod flatcontainer {
483536
fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool {
484537
VBC::reborrow(other) == V::reborrow(*this)
485538
}
539+
540+
fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
541+
let mut keys = 0;
542+
let mut vals = 0;
543+
let mut upds = 0;
544+
let mut prev_keyval = None;
545+
for link in chain.iter() {
546+
for ((key, val), _, _) in link.iter() {
547+
if let Some((p_key, p_val)) = prev_keyval {
548+
if p_key != key {
549+
keys += 1;
550+
vals += 1;
551+
} else if p_val != val {
552+
vals += 1;
553+
}
554+
} else {
555+
keys += 1;
556+
vals += 1;
557+
}
558+
upds += 1;
559+
prev_keyval = Some((key, val));
560+
}
561+
}
562+
(keys, vals, upds)
563+
}
486564
}
487565
}
488566

src/trace/implementations/ord_neu.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -694,8 +694,22 @@ mod val_batch {
694694
description: Description::new(lower, upper, since),
695695
}
696696
}
697-
}
698697

698+
fn seal(
699+
chain: &mut Vec<Self::Input>,
700+
lower: AntichainRef<Self::Time>,
701+
upper: AntichainRef<Self::Time>,
702+
since: AntichainRef<Self::Time>,
703+
) -> Self::Output {
704+
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
705+
let mut builder = Self::with_capacity(keys, vals, upds);
706+
for mut chunk in chain.drain(..) {
707+
builder.push(&mut chunk);
708+
}
709+
710+
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
711+
}
712+
}
699713
}
700714

701715
mod key_batch {
@@ -1167,6 +1181,21 @@ mod key_batch {
11671181
description: Description::new(lower, upper, since),
11681182
}
11691183
}
1184+
1185+
fn seal(
1186+
chain: &mut Vec<Self::Input>,
1187+
lower: AntichainRef<Self::Time>,
1188+
upper: AntichainRef<Self::Time>,
1189+
since: AntichainRef<Self::Time>,
1190+
) -> Self::Output {
1191+
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1192+
let mut builder = Self::with_capacity(keys, vals, upds);
1193+
for mut chunk in chain.drain(..) {
1194+
builder.push(&mut chunk);
1195+
}
1196+
1197+
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
1198+
}
11701199
}
11711200

11721201
}

0 commit comments

Comments
 (0)