diff --git a/src/operator/distinct.rs b/src/operator/distinct.rs index e4640825..72d5ac3a 100644 --- a/src/operator/distinct.rs +++ b/src/operator/distinct.rs @@ -9,17 +9,16 @@ use crate::{ }, circuit_cache_key, time::NestedTimestamp32, - trace::{ord::OrdKeySpine, BatchReader, Builder, Cursor as TraceCursor, Trace, TraceReader}, + trace::{ord::OrdKeySpine, BatchReader, Builder, Cursor as TraceCursor, Trace}, NumEntries, Timestamp, }; use size_of::SizeOf; use std::{ borrow::Cow, - cmp::{max, Ordering}, - collections::BTreeSet, + cmp::Ordering, + collections::{BTreeSet, HashMap}, hash::Hash, marker::PhantomData, - mem::take, ops::{Add, Neg}, }; @@ -299,12 +298,12 @@ where pub struct DistinctTrace where Z: ZSet, - T: TraceReader + 'static, + T: BatchReader + 'static, { // Keeps track of keys that need to be considered at future times. // Specifically, `future_updates[i]` accumulates all keys observed during // the current epoch whose weight can change at time `i`. - future_updates: Vec>, + future_updates: HashMap>, // TODO: not needed once timekeeping is handled by the circuit. time: u32, empty_input: bool, @@ -315,12 +314,12 @@ where impl DistinctTrace where Z: ZSet, - T: TraceReader + 'static, + T: BatchReader + 'static, T::Time: Timestamp, { fn new() -> Self { Self { - future_updates: Vec::new(), + future_updates: HashMap::new(), time: HasZero::zero(), empty_input: false, empty_output: false, @@ -334,7 +333,7 @@ where Z: ZSet, Z::Key: Clone + Ord + PartialEq, Z::R: ZRingValue, - T: TraceReader + 'static, + T: BatchReader + 'static, { // Evaluate nested incremental distinct for a single value. // @@ -446,7 +445,10 @@ where // Record next_ts in `self.future_updates`. if let Some(next_ts) = next_ts { let idx: usize = next_ts.inner() as usize; - self.future_updates[idx].insert(value.clone()); + self.future_updates + .entry(idx as u32) + .or_insert_with(BTreeSet::new) + .insert(value.clone()); } } else if weight.ge0() && !weight.is_zero() { builder.push((Z::item_from(value.clone(), ()), HasOne::one())); @@ -458,14 +460,14 @@ impl Operator for DistinctTrace where Z: ZSet, Z::Key: SizeOf + Clone + Ord + PartialEq, - T: TraceReader + 'static, + T: BatchReader + 'static, { fn name(&self) -> Cow<'static, str> { Cow::Borrowed("DistinctTrace") } fn metadata(&self, meta: &mut OperatorMeta) { - let size: usize = self.future_updates.iter().map(BTreeSet::len).sum(); + let size: usize = self.future_updates.values().map(BTreeSet::len).sum(); let bytes = self.future_updates.size_of(); meta.extend(metadata! { @@ -495,11 +497,7 @@ where assert_eq!(scope, 0); self.empty_input && self.empty_output - && self - .future_updates - .iter() - .skip(self.time as usize) - .all(|vals| vals.is_empty()) + && self.future_updates.values().all(|vals| vals.is_empty()) } } @@ -527,7 +525,7 @@ where // appeared in one of the previous epochs at time `t`. // // To efficiently compute keys that satisfy the second condition, we use the - // `future_updates` vector, where for each key observed in the current epoch + // `future_updates` map, where for each key observed in the current epoch // at time `t1` we lookup the smallest time `t2 > t1` (if any) at which we saw // the key during any previous epochs and record this key in // `future_updates[t2]`. Then when evaluating an operatpr at time `t` we @@ -541,26 +539,13 @@ where self.empty_input = delta.is_zero(); - // Make sure we have enough room in `future_updates` to - // accommodate the largest timestamp in the trace, so we don't - // need to worry about growing `future_updates` later on. - let mut new_len: u32 = self.time + 1; - trace.map_batches(|batch| { - for ts in batch.upper() { - new_len = max(new_len, ts.inner() + 1); - } - }); - - self.future_updates - .resize(new_len as usize, BTreeSet::new()); - let mut builder = Z::Builder::with_capacity((), delta.len()); let mut trace_cursor = trace.cursor(); // For all keys in delta, for all keys in future_updates[time]. let mut delta_cursor = delta.cursor(); - let candidates = take(&mut self.future_updates[self.time as usize]); + let candidates = self.future_updates.remove(&self.time).unwrap_or_default(); let mut cand_iterator = candidates.iter(); let mut candidate = cand_iterator.next(); diff --git a/src/operator/join.rs b/src/operator/join.rs index 4f63a870..1f4986e3 100644 --- a/src/operator/join.rs +++ b/src/operator/join.rs @@ -10,7 +10,7 @@ use crate::{ time::Timestamp, trace::{ cursor::Cursor as TraceCursor, spine_fueled::Spine, Batch, BatchReader, Batcher, Builder, - Trace, TraceReader, + Trace, }, OrdIndexedZSet, OrdZSet, }; @@ -549,7 +549,7 @@ impl JoinStats { pub struct JoinTrace where - T: TraceReader, + T: BatchReader, Z: IndexedZSet, { join_func: F, @@ -569,7 +569,7 @@ where impl JoinTrace where - T: TraceReader, + T: BatchReader, Z: IndexedZSet, { pub fn new(join_func: F, location: &'static Location<'static>) -> Self { @@ -590,7 +590,7 @@ impl Operator for JoinTrace where F: 'static, I: 'static, - T: TraceReader + 'static, + T: BatchReader + 'static, Z: IndexedZSet, Z::Batcher: SizeOf, It: 'static, diff --git a/src/operator/trace.rs b/src/operator/trace.rs index 42131cc6..b9aaa599 100644 --- a/src/operator/trace.rs +++ b/src/operator/trace.rs @@ -5,7 +5,7 @@ use crate::{ Circuit, ExportId, ExportStream, GlobalNodeId, OwnershipPreference, Scope, Stream, }, circuit_cache_key, - trace::{cursor::Cursor, spine_fueled::Spine, Batch, BatchReader, Builder, Trace, TraceReader}, + trace::{cursor::Cursor, spine_fueled::Spine, Batch, BatchReader, Builder, Trace}, NumEntries, Timestamp, }; use size_of::SizeOf; @@ -154,7 +154,7 @@ where impl Stream, T> where P: Clone + 'static, - T: TraceReader + 'static, + T: Trace + 'static, { pub fn delay_trace(&self) -> Stream, T> { self.circuit() @@ -168,14 +168,14 @@ where pub struct UntimedTraceAppend where - T: TraceReader, + T: Trace, { _phantom: PhantomData, } impl UntimedTraceAppend where - T: TraceReader, + T: Trace, { pub fn new() -> Self { Self { @@ -186,7 +186,7 @@ where impl Default for UntimedTraceAppend where - T: TraceReader, + T: Trace, { fn default() -> Self { Self::new() @@ -195,7 +195,7 @@ where impl Operator for UntimedTraceAppend where - T: TraceReader + 'static, + T: Trace + 'static, { fn name(&self) -> Cow<'static, str> { Cow::from("UntimedTraceAppend") @@ -241,7 +241,7 @@ where pub struct TraceAppend where - T: TraceReader, + T: Trace, { time: T::Time, _phantom: PhantomData, @@ -249,7 +249,7 @@ where impl TraceAppend where - T: TraceReader, + T: Trace, { pub fn new() -> Self { Self { @@ -261,7 +261,7 @@ where impl Default for TraceAppend where - T: TraceReader, + T: Trace, { fn default() -> Self { Self::new() @@ -270,7 +270,7 @@ where impl Operator for TraceAppend where - T: TraceReader + 'static, + T: Trace + 'static, B: 'static, { fn name(&self) -> Cow<'static, str> { @@ -325,7 +325,7 @@ where } } -pub struct Z1Trace { +pub struct Z1Trace { time: T::Time, trace: Option, // `dirty[scope]` is `true` iff at least one non-empty update was added to the trace diff --git a/src/operator/upsert.rs b/src/operator/upsert.rs index 6480462d..643728aa 100644 --- a/src/operator/upsert.rs +++ b/src/operator/upsert.rs @@ -6,8 +6,8 @@ use crate::{ }, operator::trace::{DelayedTraceId, TraceAppend, TraceId, Z1Trace}, trace::{ - consolidation::consolidate, cursor::Cursor, spine_fueled::Spine, Batch, Builder, Trace, - TraceReader, + consolidation::consolidate, cursor::Cursor, spine_fueled::Spine, Batch, BatchReader, + Builder, Trace, }, utils::VecExt, Circuit, Stream, Timestamp, @@ -98,7 +98,7 @@ where pub struct Upsert where - T: TraceReader, + T: BatchReader, { time: T::Time, phantom: PhantomData, @@ -106,7 +106,7 @@ where impl Upsert where - T: TraceReader, + T: BatchReader, { pub fn new() -> Self { Self { @@ -118,7 +118,7 @@ where impl Default for Upsert where - T: TraceReader, + T: BatchReader, { fn default() -> Self { Self::new() @@ -127,7 +127,7 @@ where impl Operator for Upsert where - T: TraceReader + 'static, + T: BatchReader + 'static, B: 'static, { fn name(&self) -> Cow<'static, str> { diff --git a/src/trace/mod.rs b/src/trace/mod.rs index e99c7e4c..1af4702e 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -25,40 +25,14 @@ use crate::{ }; use size_of::SizeOf; -/// A trace whose contents may be read. -/// -/// This is a restricted interface to the more general `Trace` trait, which -/// extends this trait with further methods to update the contents of the trace. -/// These methods are used to examine the contents, and to update the reader's -/// capabilities (which may release restrictions on the mutations to the -/// underlying trace and cause work to happen). -pub trait TraceReader: BatchReader { - /// The type of an immutable collection of updates. - type Batch: Batch + 'static; - - // TODO: Do we want a version of `cursor` with an upper bound on time? E.g., it - // could help in `distinct` to avoid iterating into the future (and then - // drop future timestamps anyway). - /* - /// Acquires a cursor to the restriction of the collection's contents to updates at times not greater or - /// equal to an element of `upper`. - /// - /// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from - /// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should - /// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This - /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses. - fn cursor_through(&self, upper: AntichainRef) -> Option<(Self::Cursor, >::Storage)>; - */ - - /// Maps logic across the non-empty sequence of batches in the trace. - fn map_batches(&self, f: F); -} - /// An append-only collection of `(key, val, time, diff)` tuples. /// /// The trace must be constructable from, and navigable by the `Key`, `Val`, /// `Time` types, but does not need to return them. -pub trait Trace: TraceReader { +pub trait Trace: BatchReader { + /// The type of an immutable collection of updates. + type Batch: Batch + 'static; + /// Allocates a new empty trace. fn new(activator: Option) -> Self; diff --git a/src/trace/spine_fueled.rs b/src/trace/spine_fueled.rs index f5bf806c..80144e57 100644 --- a/src/trace/spine_fueled.rs +++ b/src/trace/spine_fueled.rs @@ -85,7 +85,7 @@ use crate::{ trace::{ cursor::{Cursor, CursorList}, rc_blanket_impls::RcBatchCursor, - Batch, BatchReader, Consumer, Merger, Trace, TraceReader, ValueConsumer, + Batch, BatchReader, Consumer, Merger, Trace, ValueConsumer, }, NumEntries, }; @@ -238,15 +238,11 @@ where } } -impl TraceReader for Spine +impl Spine where B: Batch + 'static, - B::Key: Ord, - B::Val: Ord, { - type Batch = B; - - fn map_batches(&self, mut f: F) { + fn map_batches(&self, mut f: F) { for batch in self.merging.iter().rev() { match batch { MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => { @@ -438,6 +434,8 @@ where B::Key: Ord, B::Val: Ord, { + type Batch = B; + fn new(activator: Option) -> Self { Self::with_effort(1, activator) } @@ -476,7 +474,7 @@ where } } - fn consolidate(mut self) -> Option { + fn consolidate(mut self) -> Option { // Merge batches until there is nothing left to merge. let mut fuel = isize::max_value(); while !self.reduced() { @@ -865,7 +863,7 @@ where /// Mutate all batches. Can only be invoked when there are no in-progress /// matches in the trait. - fn map_batches_mut::Batch)>(&mut self, mut f: F) { + fn map_batches_mut::Batch)>(&mut self, mut f: F) { for batch in self.merging.iter_mut().rev() { match batch { MergeState::Double(MergeVariant::InProgress(_batch1, _batch2, _)) => {