diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 8e0ee89a3..cfabfd472 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -109,7 +109,7 @@ use timely::progress::Antichain; use timely::dataflow::operators::Capability; use crate::operators::arrange::arrangement::Arranged; -use crate::trace::Builder; +use crate::trace::{Builder, Description}; use crate::trace::{self, Trace, TraceReader, Batch, Cursor}; use crate::trace::cursor::IntoOwned; use crate::{ExchangeData, Hashable}; @@ -281,7 +281,8 @@ where updates.sort(); builder.push(&mut updates); } - let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); + let description = Description::new(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); + let batch = builder.done(description); prev_frontier.clone_from(&upper); // Communicate `batch` to the arrangement and the stream. diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 0b014c645..886c13d67 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -23,7 +23,7 @@ use crate::trace::cursor::IntoOwned; use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent}; use crate::lattice::Lattice; -use crate::trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic}; +use crate::trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; use crate::trace::implementations::{KeySpine, KeyBuilder, ValSpine, ValBuilder}; @@ -565,7 +565,8 @@ where if output_upper.borrow() != output_lower.borrow() { - let batch = builder.done(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); + let description = Description::new(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); + let batch = builder.done(description); // ship batch to the output, and commit to the output trace. output.session(&capabilities[index]).give(batch.clone()); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 63d724979..90e13508e 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -12,7 +12,7 @@ use timely::container::{ContainerBuilder, PushInto}; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; -use crate::trace::{Batcher, Builder}; +use crate::trace::{Batcher, Builder, Description}; use crate::Data; /// Creates batches from unordered tuples. @@ -109,7 +109,8 @@ where self.stash.clear(); - let seal = B::seal(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(M::Time::minimum()).borrow()); + let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum())); + let seal = B::seal(&mut readied, description); self.lower = upper; seal } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 26270c004..f0c18a3c4 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -682,7 +682,7 @@ mod val_batch { } #[inline(never)] - fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { + fn done(mut self, description: Description) -> OrdValBatch { // Record the final offsets self.result.vals_offs.push(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. @@ -691,23 +691,18 @@ mod val_batch { OrdValBatch { updates: self.result.times.len() + self.singletons, storage: self.result, - description: Description::new(lower, upper, since), + description, } } - fn seal( - chain: &mut Vec, - lower: AntichainRef, - upper: AntichainRef, - since: AntichainRef, - ) -> Self::Output { + fn seal(chain: &mut Vec, description: Description) -> Self::Output { let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]); let mut builder = Self::with_capacity(keys, vals, upds); for mut chunk in chain.drain(..) { builder.push(&mut chunk); } - builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) + builder.done(description) } } } @@ -1170,7 +1165,7 @@ mod key_batch { } #[inline(never)] - fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { + fn done(mut self, description: Description) -> OrdKeyBatch { // Record the final offsets self.result.keys_offs.push(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. @@ -1178,23 +1173,18 @@ mod key_batch { OrdKeyBatch { updates: self.result.times.len() + self.singletons, storage: self.result, - description: Description::new(lower, upper, since), + description, } } - fn seal( - chain: &mut Vec, - lower: AntichainRef, - upper: AntichainRef, - since: AntichainRef, - ) -> Self::Output { + fn seal(chain: &mut Vec, description: Description) -> Self::Output { let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]); let mut builder = Self::with_capacity(keys, vals, upds); for mut chunk in chain.drain(..) { builder.push(&mut chunk); } - builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) + builder.done(description) } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 5a57d37a5..4c1ac9a11 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -855,7 +855,7 @@ mod val_batch { } #[inline(never)] - fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> RhhValBatch { + fn done(mut self, description: Description) -> RhhValBatch { // Record the final offsets self.result.vals_offs.push(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. @@ -864,23 +864,18 @@ mod val_batch { RhhValBatch { updates: self.result.times.len() + self.singletons, storage: self.result, - description: Description::new(lower, upper, since), + description, } } - fn seal( - chain: &mut Vec, - lower: AntichainRef, - upper: AntichainRef, - since: AntichainRef, - ) -> Self::Output { + fn seal(chain: &mut Vec, description: Description) -> Self::Output { let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]); let mut builder = Self::with_capacity(keys, vals, upds); for mut chunk in chain.drain(..) { builder.push(&mut chunk); } - - builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) + + builder.done(description) } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 4a85747c2..7ea9064fa 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -342,18 +342,14 @@ pub trait Builder: Sized { /// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state. fn push(&mut self, chunk: &mut Self::Input); /// Completes building and returns the batch. - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output; + fn done(self, description: Description) -> Self::Output; /// Builds a batch from a chain of updates corresponding to the indicated lower and upper bounds. /// /// This method relies on the chain only containing updates greater or equal to the lower frontier, - /// and not greater or equal to the upper frontier. Chains must also be sorted and consolidated. - fn seal( - chain: &mut Vec, - lower: AntichainRef, - upper: AntichainRef, - since: AntichainRef, - ) -> Self::Output; + /// and not greater or equal to the upper frontier, as encoded in the description. Chains must also + /// be sorted and consolidated. + fn seal(chain: &mut Vec, description: Description) -> Self::Output; } /// Represents a merge in progress. @@ -467,14 +463,9 @@ pub mod rc_blanket_impls { type Output = Rc; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } } fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } - fn seal( - chain: &mut Vec, - lower: AntichainRef, - upper: AntichainRef, - since: AntichainRef, - ) -> Self::Output { - Rc::new(B::seal(chain, lower, upper, since)) + fn done(self, description: Description) -> Rc { Rc::new(self.builder.done(description)) } + fn seal(chain: &mut Vec, description: Description) -> Self::Output { + Rc::new(B::seal(chain, description)) } }