Skip to content

Commit 63212f9

Browse files
authored
Pass description itself to builder (#551)
Pass the description itself to the builder instead of supplying the parameters required to build a description. No functional changes. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent a10d3ca commit 63212f9

File tree

6 files changed

+29
-50
lines changed

6 files changed

+29
-50
lines changed

src/operators/arrange/upsert.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ use timely::progress::Antichain;
109109
use timely::dataflow::operators::Capability;
110110

111111
use crate::operators::arrange::arrangement::Arranged;
112-
use crate::trace::Builder;
112+
use crate::trace::{Builder, Description};
113113
use crate::trace::{self, Trace, TraceReader, Batch, Cursor};
114114
use crate::trace::cursor::IntoOwned;
115115
use crate::{ExchangeData, Hashable};
@@ -281,7 +281,8 @@ where
281281
updates.sort();
282282
builder.push(&mut updates);
283283
}
284-
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
284+
let description = Description::new(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
285+
let batch = builder.done(description);
285286
prev_frontier.clone_from(&upper);
286287

287288
// Communicate `batch` to the arrangement and the stream.

src/operators/reduce.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::trace::cursor::IntoOwned;
2323

2424
use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent};
2525
use crate::lattice::Lattice;
26-
use crate::trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic};
26+
use crate::trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic, Description};
2727
use crate::trace::cursor::CursorList;
2828
use crate::trace::implementations::{KeySpine, KeyBuilder, ValSpine, ValBuilder};
2929

@@ -565,7 +565,8 @@ where
565565

566566
if output_upper.borrow() != output_lower.borrow() {
567567

568-
let batch = builder.done(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
568+
let description = Description::new(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
569+
let batch = builder.done(description);
569570

570571
// ship batch to the output, and commit to the output trace.
571572
output.session(&capabilities[index]).give(batch.clone());

src/trace/implementations/merge_batcher.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use timely::container::{ContainerBuilder, PushInto};
1212

1313
use crate::difference::Semigroup;
1414
use crate::logging::{BatcherEvent, DifferentialEvent};
15-
use crate::trace::{Batcher, Builder};
15+
use crate::trace::{Batcher, Builder, Description};
1616
use crate::Data;
1717

1818
/// Creates batches from unordered tuples.
@@ -109,7 +109,8 @@ where
109109

110110
self.stash.clear();
111111

112-
let seal = B::seal(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(M::Time::minimum()).borrow());
112+
let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
113+
let seal = B::seal(&mut readied, description);
113114
self.lower = upper;
114115
seal
115116
}

src/trace/implementations/ord_neu.rs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,7 @@ mod val_batch {
682682
}
683683

684684
#[inline(never)]
685-
fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdValBatch<L> {
685+
fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
686686
// Record the final offsets
687687
self.result.vals_offs.push(self.result.times.len());
688688
// Remove any pending singleton, and if it was set increment our count.
@@ -691,23 +691,18 @@ mod val_batch {
691691
OrdValBatch {
692692
updates: self.result.times.len() + self.singletons,
693693
storage: self.result,
694-
description: Description::new(lower, upper, since),
694+
description,
695695
}
696696
}
697697

698-
fn seal(
699-
chain: &mut Vec<Self::Input>,
700-
lower: AntichainRef<Self::Time>,
701-
upper: AntichainRef<Self::Time>,
702-
since: AntichainRef<Self::Time>,
703-
) -> Self::Output {
698+
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
704699
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
705700
let mut builder = Self::with_capacity(keys, vals, upds);
706701
for mut chunk in chain.drain(..) {
707702
builder.push(&mut chunk);
708703
}
709704

710-
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
705+
builder.done(description)
711706
}
712707
}
713708
}
@@ -1170,31 +1165,26 @@ mod key_batch {
11701165
}
11711166

11721167
#[inline(never)]
1173-
fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdKeyBatch<L> {
1168+
fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
11741169
// Record the final offsets
11751170
self.result.keys_offs.push(self.result.times.len());
11761171
// Remove any pending singleton, and if it was set increment our count.
11771172
if self.singleton.take().is_some() { self.singletons += 1; }
11781173
OrdKeyBatch {
11791174
updates: self.result.times.len() + self.singletons,
11801175
storage: self.result,
1181-
description: Description::new(lower, upper, since),
1176+
description,
11821177
}
11831178
}
11841179

1185-
fn seal(
1186-
chain: &mut Vec<Self::Input>,
1187-
lower: AntichainRef<Self::Time>,
1188-
upper: AntichainRef<Self::Time>,
1189-
since: AntichainRef<Self::Time>,
1190-
) -> Self::Output {
1180+
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
11911181
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
11921182
let mut builder = Self::with_capacity(keys, vals, upds);
11931183
for mut chunk in chain.drain(..) {
11941184
builder.push(&mut chunk);
11951185
}
11961186

1197-
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
1187+
builder.done(description)
11981188
}
11991189
}
12001190

src/trace/implementations/rhh.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,7 @@ mod val_batch {
855855
}
856856

857857
#[inline(never)]
858-
fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> RhhValBatch<L> {
858+
fn done(mut self, description: Description<Self::Time>) -> RhhValBatch<L> {
859859
// Record the final offsets
860860
self.result.vals_offs.push(self.result.times.len());
861861
// Remove any pending singleton, and if it was set increment our count.
@@ -864,23 +864,18 @@ mod val_batch {
864864
RhhValBatch {
865865
updates: self.result.times.len() + self.singletons,
866866
storage: self.result,
867-
description: Description::new(lower, upper, since),
867+
description,
868868
}
869869
}
870870

871-
fn seal(
872-
chain: &mut Vec<Self::Input>,
873-
lower: AntichainRef<Self::Time>,
874-
upper: AntichainRef<Self::Time>,
875-
since: AntichainRef<Self::Time>,
876-
) -> Self::Output {
871+
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
877872
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
878873
let mut builder = Self::with_capacity(keys, vals, upds);
879874
for mut chunk in chain.drain(..) {
880875
builder.push(&mut chunk);
881876
}
882-
883-
builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
877+
878+
builder.done(description)
884879
}
885880
}
886881

src/trace/mod.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -342,18 +342,14 @@ pub trait Builder: Sized {
342342
/// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state.
343343
fn push(&mut self, chunk: &mut Self::Input);
344344
/// Completes building and returns the batch.
345-
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output;
345+
fn done(self, description: Description<Self::Time>) -> Self::Output;
346346

347347
/// Builds a batch from a chain of updates corresponding to the indicated lower and upper bounds.
348348
///
349349
/// This method relies on the chain only containing updates greater or equal to the lower frontier,
350-
/// and not greater or equal to the upper frontier. Chains must also be sorted and consolidated.
351-
fn seal(
352-
chain: &mut Vec<Self::Input>,
353-
lower: AntichainRef<Self::Time>,
354-
upper: AntichainRef<Self::Time>,
355-
since: AntichainRef<Self::Time>,
356-
) -> Self::Output;
350+
/// and not greater or equal to the upper frontier, as encoded in the description. Chains must also
351+
/// be sorted and consolidated.
352+
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output;
357353
}
358354

359355
/// Represents a merge in progress.
@@ -467,14 +463,9 @@ pub mod rc_blanket_impls {
467463
type Output = Rc<B::Output>;
468464
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } }
469465
fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) }
470-
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(lower, upper, since)) }
471-
fn seal(
472-
chain: &mut Vec<Self::Input>,
473-
lower: AntichainRef<Self::Time>,
474-
upper: AntichainRef<Self::Time>,
475-
since: AntichainRef<Self::Time>,
476-
) -> Self::Output {
477-
Rc::new(B::seal(chain, lower, upper, since))
466+
fn done(self, description: Description<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(description)) }
467+
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
468+
Rc::new(B::seal(chain, description))
478469
}
479470
}
480471

0 commit comments

Comments
 (0)