From 529432d4b37fbcec5870c994db5bdae86ecd29da Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 16 Jul 2025 09:59:41 -0400 Subject: [PATCH 1/4] BatchContainer::push_own takes a reference to owned data --- differential-dataflow/examples/columnar.rs | 22 ++++++------------- .../implementations/huffman_container.rs | 8 +++---- .../src/trace/implementations/mod.rs | 19 +++++----------- .../src/trace/implementations/ord_neu.rs | 4 ++-- .../src/trace/implementations/rhh.rs | 14 ++++++------ 5 files changed, 25 insertions(+), 42 deletions(-) diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 20c33a41b..40fdd7544 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -579,8 +579,6 @@ pub mod dd_builder { use columnar::Columnar; - use timely::container::PushInto; - use differential_dataflow::trace::Builder; use differential_dataflow::trace::Description; use differential_dataflow::trace::implementations::Layout; @@ -614,9 +612,6 @@ pub mod dd_builder { layout::Val: Columnar, layout::Time: Columnar, layout::Diff: Columnar, - // These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`. - for<'a> L::KeyContainer: PushInto<&'a layout::Key>, - for<'a> L::ValContainer: PushInto<&'a layout::Val>, { type Input = Column<((layout::Key,layout::Val),layout::Time,layout::Diff)>; type Time = layout::Time; @@ -653,8 +648,8 @@ pub mod dd_builder { // Pre-load the first update. if self.result.keys.is_empty() { - self.result.vals.vals.push_into(&val); - self.result.keys.push_into(&key); + self.result.vals.vals.push_own(&val); + self.result.keys.push_own(&key); self.staging.push(time, diff); } // Perhaps this is a continuation of an already received key. @@ -666,15 +661,15 @@ pub mod dd_builder { // New value; complete representation of prior value. self.staging.seal(&mut self.result.upds); self.staging.push(time, diff); - self.result.vals.vals.push_into(&val); + self.result.vals.vals.push_own(&val); } } else { // New key; complete representation of prior key. self.staging.seal(&mut self.result.upds); self.staging.push(time, diff); self.result.vals.offs.push_ref(self.result.vals.len()); - self.result.vals.vals.push_into(&val); - self.result.keys.push_into(&key); + self.result.vals.vals.push_own(&val); + self.result.keys.push_own(&key); } } } @@ -719,9 +714,6 @@ pub mod dd_builder { layout::Val: Columnar, layout::Time: Columnar, layout::Diff: Columnar, - // These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`. - for<'a> L::KeyContainer: PushInto<&'a layout::Key>, - for<'a> L::ValContainer: PushInto<&'a layout::Val>, { type Input = Column<((layout::Key,layout::Val),layout::Time,layout::Diff)>; type Time = layout::Time; @@ -756,7 +748,7 @@ pub mod dd_builder { // Pre-load the first update. if self.result.keys.is_empty() { - self.result.keys.push_into(&key); + self.result.keys.push_own(&key); self.staging.push(time, diff); } // Perhaps this is a continuation of an already received key. @@ -766,7 +758,7 @@ pub mod dd_builder { // New key; complete representation of prior key. self.staging.seal(&mut self.result.upds); self.staging.push(time, diff); - self.result.keys.push_into(&key); + self.result.keys.push_own(&key); } } } diff --git a/differential-dataflow/src/trace/implementations/huffman_container.rs b/differential-dataflow/src/trace/implementations/huffman_container.rs index e09c7634c..72a5a6111 100644 --- a/differential-dataflow/src/trace/implementations/huffman_container.rs +++ b/differential-dataflow/src/trace/implementations/huffman_container.rs @@ -30,8 +30,8 @@ impl HuffmanContainer { } } -impl PushInto> for HuffmanContainer { - fn push_into(&mut self, item: Vec) { +impl<'a, B: Ord + Clone + 'static> PushInto<&'a Vec> for HuffmanContainer { + fn push_into(&mut self, item: &'a Vec) { for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; } match &mut self.inner { Ok((huffman, bytes)) => { @@ -39,7 +39,7 @@ impl PushInto> for HuffmanContainer { self.offsets.push(bytes.len()); }, Err(raw) => { - raw.extend(item); + raw.extend(item.iter().cloned()); self.offsets.push(raw.len()); } } @@ -100,7 +100,7 @@ impl BatchContainer for HuffmanContainer { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } - fn push_own(&mut self, item: Self::Owned) { self.push_into(item) } + fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) } fn with_capacity(size: usize) -> Self { let mut offsets = OffsetList::with_capacity(size + 1); diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index 69ca3a50f..087de2e46 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -358,7 +358,7 @@ impl BatchContainer for OffsetList { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } - fn push_own(&mut self, item: Self::Owned) { self.push_into(item) } + fn push_own(&mut self, item: &Self::Owned) { self.push_into(*item) } fn with_capacity(size: usize) -> Self { Self::with_capacity(size) @@ -544,7 +544,7 @@ pub mod containers { /// Push an item into this container fn push_ref(&mut self, item: Self::ReadItem<'_>); /// Push an item into this container - fn push_own(&mut self, item: Self::Owned); + fn push_own(&mut self, item: &Self::Owned); /// Creates a new container with sufficient capacity. fn with_capacity(size: usize) -> Self; @@ -637,7 +637,7 @@ pub mod containers { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } - fn push_own(&mut self, item: Self::Owned) { self.push_into(item) } + fn push_own(&mut self, item: &Self::Owned) { self.push_into(item.clone()) } fn with_capacity(size: usize) -> Self { Vec::with_capacity(size) @@ -669,7 +669,7 @@ pub mod containers { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } - fn push_own(&mut self, item: Self::Owned) { self.push_into(item) } + fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) } fn with_capacity(size: usize) -> Self { Self::with_capacity(size) @@ -713,15 +713,6 @@ pub mod containers { } } - impl PushInto> for SliceContainer { - fn push_into(&mut self, item: Vec) { - for x in item.into_iter() { - self.inner.push(x); - } - self.offsets.push(self.inner.len()); - } - } - impl BatchContainer for SliceContainer where B: Ord + Clone + Sized + 'static, @@ -736,7 +727,7 @@ pub mod containers { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } - fn push_own(&mut self, item: Self::Owned) { self.push_into(item) } + fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) } fn with_capacity(size: usize) -> Self { let mut offsets = Vec::with_capacity(size + 1); diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 8cde8383f..39bc6806b 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -249,8 +249,8 @@ pub mod layers { // Conventional; move `stash` into `updates`. self.total += self.stash.len(); for (time, diff) in self.stash.drain(..) { - upds.times.push_own(time); - upds.diffs.push_own(diff); + upds.times.push_own(&time); + upds.diffs.push_own(&diff); } upds.offs.push_ref(upds.times.len()); } diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index 9dcf68ede..4e16fd80b 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -191,7 +191,7 @@ mod val_batch { while self.keys.len() < desired { // We insert a default (dummy) key and repeat the offset to indicate this. let current_offset = self.keys_offs.index(self.keys.len()); - self.keys.push_own( as Default>::default()); + self.keys.push_own(& as Default>::default()); self.keys_offs.push_ref(current_offset); } @@ -620,8 +620,8 @@ mod val_batch { else { // Conventional; move `update_stash` into `updates`. for (time, diff) in self.update_stash.drain(..) { - self.result.times.push_own(time); - self.result.diffs.push_own(diff); + self.result.times.push_own(&time); + self.result.diffs.push_own(&diff); } } Some(self.result.times.len()) @@ -776,11 +776,11 @@ mod val_batch { else { // If we have pushed a single element, we need to copy it out to meet this one. if let Some((time, diff)) = self.singleton.take() { - self.result.times.push_own(time); - self.result.diffs.push_own(diff); + self.result.times.push_own(&time); + self.result.diffs.push_own(&diff); } - self.result.times.push_own(time); - self.result.diffs.push_own(diff); + self.result.times.push_own(&time); + self.result.diffs.push_own(&diff); } } } From 7b05e019c811f597c8a8ffe65690b5042e4c5545 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 16 Jul 2025 10:15:19 -0400 Subject: [PATCH 2/4] Add BatchContainer::clear() --- .../trace/implementations/huffman_container.rs | 2 ++ .../src/trace/implementations/mod.rs | 15 +++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/differential-dataflow/src/trace/implementations/huffman_container.rs b/differential-dataflow/src/trace/implementations/huffman_container.rs index 72a5a6111..9da01c2a5 100644 --- a/differential-dataflow/src/trace/implementations/huffman_container.rs +++ b/differential-dataflow/src/trace/implementations/huffman_container.rs @@ -102,6 +102,8 @@ impl BatchContainer for HuffmanContainer { fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) } + fn clear(&mut self) { *self = Self::default(); } + fn with_capacity(size: usize) -> Self { let mut offsets = OffsetList::with_capacity(size + 1); offsets.push(0); diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index 087de2e46..96e2c2967 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -360,6 +360,8 @@ impl BatchContainer for OffsetList { fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } fn push_own(&mut self, item: &Self::Owned) { self.push_into(*item) } + fn clear(&mut self) { self.zero_prefix = 0; self.smol.clear(); self.chonk.clear(); } + fn with_capacity(size: usize) -> Self { Self::with_capacity(size) } @@ -546,6 +548,9 @@ pub mod containers { /// Push an item into this container fn push_own(&mut self, item: &Self::Owned); + /// Clears the container. May not release resources. + fn clear(&mut self); + /// Creates a new container with sufficient capacity. fn with_capacity(size: usize) -> Self; /// Creates a new container with sufficient capacity. @@ -639,6 +644,8 @@ pub mod containers { fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } fn push_own(&mut self, item: &Self::Owned) { self.push_into(item.clone()) } + fn clear(&mut self) { self.clear() } + fn with_capacity(size: usize) -> Self { Vec::with_capacity(size) } @@ -671,6 +678,8 @@ pub mod containers { fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) } + fn clear(&mut self) { self.clear() } + fn with_capacity(size: usize) -> Self { Self::with_capacity(size) } @@ -729,6 +738,12 @@ pub mod containers { fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) } + fn clear(&mut self) { + self.offsets.clear(); + self.offsets.push(0); + self.inner.clear(); + } + fn with_capacity(size: usize) -> Self { let mut offsets = Vec::with_capacity(size + 1); offsets.push(0); From fade6337a4c728888371dfbdb8f77efee8e4f707 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 16 Jul 2025 13:26:17 -0400 Subject: [PATCH 3/4] Remove use of borrow_as() --- differential-dataflow/examples/columnar.rs | 16 +++++-- .../src/operators/arrange/upsert.rs | 7 ++- differential-dataflow/src/operators/reduce.rs | 25 +++++----- .../src/trace/implementations/ord_neu.rs | 47 +++++++++---------- .../src/trace/implementations/rhh.rs | 19 +++++--- dogsdogsdogs/src/operators/half_join.rs | 14 ++++-- dogsdogsdogs/src/operators/lookup_map.rs | 8 ++-- 7 files changed, 83 insertions(+), 53 deletions(-) diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 40fdd7544..cbe4d62c3 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -638,6 +638,9 @@ pub mod dd_builder { // Owned key and val would need to be members of `self`, as this method can be called multiple times, // and we need to correctly cache last for reasons of correctness, not just performance. + let mut key_con = L::KeyContainer::with_capacity(1); + let mut val_con = L::ValContainer::with_capacity(1); + for ((key,val),time,diff) in chunk.drain() { // It would be great to avoid. let key = as Columnar>::into_owned(key); @@ -646,6 +649,9 @@ pub mod dd_builder { let time = as Columnar>::into_owned(time); let diff = as Columnar>::into_owned(diff); + key_con.clear(); key_con.push_own(&key); + val_con.clear(); val_con.push_own(&val); + // Pre-load the first update. if self.result.keys.is_empty() { self.result.vals.vals.push_own(&val); @@ -653,9 +659,9 @@ pub mod dd_builder { self.staging.push(time, diff); } // Perhaps this is a continuation of an already received key. - else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) { + else if self.result.keys.last() == key_con.get(0) { // Perhaps this is a continuation of an already received value. - if self.result.vals.vals.last().map(|v| L::ValContainer::borrow_as(&val).eq(&v)).unwrap_or(false) { + if self.result.vals.vals.last() == val_con.get(0) { self.staging.push(time, diff); } else { // New value; complete representation of prior value. @@ -739,6 +745,8 @@ pub mod dd_builder { // Owned key and val would need to be members of `self`, as this method can be called multiple times, // and we need to correctly cache last for reasons of correctness, not just performance. + let mut key_con = L::KeyContainer::with_capacity(1); + for ((key,_val),time,diff) in chunk.drain() { // It would be great to avoid. let key = as Columnar>::into_owned(key); @@ -746,13 +754,15 @@ pub mod dd_builder { let time = as Columnar>::into_owned(time); let diff = as Columnar>::into_owned(diff); + key_con.clear(); key_con.push_own(&key); + // Pre-load the first update. if self.result.keys.is_empty() { self.result.keys.push_own(&key); self.staging.push(time, diff); } // Perhaps this is a continuation of an already received key. - else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) { + else if self.result.keys.last() == key_con.get(0) { self.staging.push(time, diff); } else { // New key; complete representation of prior key. diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index fcf9dfe46..55e65d232 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -233,14 +233,17 @@ where // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); let mut builder = Bu::new(); + let mut key_con = Tr::KeyContainer::with_capacity(1); for (key, mut list) in to_process { + key_con.clear(); key_con.push_own(&key); + // The prior value associated with the key. let mut prev_value: Option = None; // Attempt to find the key in the trace. - trace_cursor.seek_key(&trace_storage, Tr::KeyContainer::borrow_as(&key)); - if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&Tr::KeyContainer::borrow_as(&key))).unwrap_or(false) { + trace_cursor.seek_key(&trace_storage, key_con.index(0)); + if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&key_con.index(0))).unwrap_or(false) { // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index fb0042766..317b908f6 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -431,11 +431,16 @@ where // We first extract those times from this list that lie in the interval we will process. sort_dedup(&mut interesting); // `exposed` contains interesting (key, time)s now below `upper_limit` - let exposed = { - let (exposed, new_interesting) = interesting.drain(..).partition(|(_, time)| !upper_limit.less_equal(time)); - interesting = new_interesting; - exposed - }; + let mut exposed_keys = T1::KeyContainer::with_capacity(0); + let mut exposed_time = T1::TimeContainer::with_capacity(0); + // Keep pairs greater or equal to `upper_limit`, and "expose" other pairs. + interesting.retain(|(key, time)| { + if upper_limit.less_equal(time) { true } else { + exposed_keys.push_own(key); + exposed_time.push_own(time); + false + } + }); // Prepare an output buffer and builder for each capability. // @@ -471,12 +476,10 @@ where // indicates whether more data remain. We move through `exposed` using (index) `exposed_position`. // There could perhaps be a less provocative variable name. let mut exposed_position = 0; - while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() { - - use std::borrow::Borrow; + while batch_cursor.key_valid(batch_storage) || exposed_position < exposed_keys.len() { // Determine the next key we will work on; could be synthetic, could be from a batch. - let key1 = exposed.get(exposed_position).map(|x| T1::KeyContainer::borrow_as(&x.0)); + let key1 = exposed_keys.get(exposed_position); let key2 = batch_cursor.get_key(batch_storage); let key = match (key1, key2) { (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), @@ -492,8 +495,8 @@ where interesting_times.clear(); // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. - while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(&T1::KeyContainer::borrow_as(&k))).unwrap_or(false) { - interesting_times.push(exposed[exposed_position].1.clone()); + while exposed_keys.get(exposed_position) == Some(key) { + interesting_times.push(T1::owned_time(exposed_time.index(exposed_position))); exposed_position += 1; } diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 39bc6806b..0c01c3378 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -207,10 +207,15 @@ pub mod layers { /// /// Tracked independently to account for duplicate compression. total: usize, + + /// Time container to stage singleton times for evaluation. + time_con: T, + /// Diff container to stage singleton times for evaluation. + diff_con: D, } impl Default for UpdsBuilder { - fn default() -> Self { Self { stash: Vec::default(), total: 0, } } + fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } } } @@ -230,34 +235,28 @@ pub mod layers { pub fn seal BatchContainer = usize>>(&mut self, upds: &mut Upds) -> bool { use crate::consolidation; consolidation::consolidate(&mut self.stash); - if !self.stash.is_empty() { - // If there is a single element, equal to a just-prior recorded update, - // we push nothing and report an unincremented offset to encode this case. - let time_diff = upds.times.last().zip(upds.diffs.last()); - let last_eq = self.stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| { - let t1 = T::borrow_as(t1); - let d1 = D::borrow_as(d1); - t1.eq(&t2) && d1.eq(&d2) - }); - if self.stash.len() == 1 && last_eq.unwrap_or(false) { - // Just clear out the stash, as we won't drain it here. + // If everything consolidates away, return false. + if self.stash.is_empty() { return false; } + // If there is a singleton, we may be able to optimize. + if self.stash.len() == 1 { + let (time, diff) = self.stash.last().unwrap(); + self.time_con.clear(); self.time_con.push_own(time); + self.diff_con.clear(); self.diff_con.push_own(diff); + if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) { self.total += 1; self.stash.clear(); upds.offs.push_ref(upds.times.len()); + return true; } - else { - // Conventional; move `stash` into `updates`. - self.total += self.stash.len(); - for (time, diff) in self.stash.drain(..) { - upds.times.push_own(&time); - upds.diffs.push_own(&diff); - } - upds.offs.push_ref(upds.times.len()); - } - true - } else { - false } + // Conventional; move `stash` into `updates`. + self.total += self.stash.len(); + for (time, diff) in self.stash.drain(..) { + upds.times.push_own(&time); + upds.diffs.push_own(&diff); + } + upds.offs.push_ref(upds.times.len()); + true } /// Completes the building and returns the total updates sealed. diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index 4e16fd80b..71d20b69d 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -204,6 +204,13 @@ mod val_batch { self.key_count += 1; } + /// Inserts a reference to an owned key, inefficiently. Should be removed. + fn insert_key_own(&mut self, key: &layout::Key, offset: Option) { + let mut key_con = L::KeyContainer::with_capacity(1); + key_con.push_own(&key); + self.insert_key(key_con.index(0), offset) + } + /// Indicates both the desired location and the hash signature of the key. fn desired_location(&self, key: &K) -> usize { if self.divisor == 0 { 0 } @@ -608,9 +615,8 @@ mod val_batch { // we push nothing and report an unincremented offset to encode this case. let time_diff = self.result.times.last().zip(self.result.diffs.last()); let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| { - let t1 = L::TimeContainer::borrow_as(t1); - let d1 = L::DiffContainer::borrow_as(d1); - t1.eq(&t2) && d1.eq(&d2) + // TODO: The use of `into_owned` is a work-around for not having reference types. + *t1 == L::TimeContainer::into_owned(t2) && *d1 == L::DiffContainer::into_owned(d2) }); if self.update_stash.len() == 1 && last_eq.unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. @@ -767,9 +773,8 @@ mod val_batch { /// to recover the singleton to push it into `updates` to join the second update. fn push_update(&mut self, time: layout::Time, diff: layout::Diff) { // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it. - let t1 = L::TimeContainer::borrow_as(&time); - let d1 = L::DiffContainer::borrow_as(&diff); - if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) { + // TODO: The use of `into_owned` is a bandage for not having references we can compare. + if self.result.times.last().map(|t| L::TimeContainer::into_owned(t) == time).unwrap_or(false) && self.result.diffs.last().map(|d| L::DiffContainer::into_owned(d) == diff).unwrap_or(false) { assert!(self.singleton.is_none()); self.singleton = Some((time, diff)); } @@ -849,7 +854,7 @@ mod val_batch { self.push_update(time, diff); self.result.vals.push_into(val); // Insert the key, but with no specified offset. - self.result.insert_key(L::KeyContainer::borrow_as(&key), None); + self.result.insert_key_own(&key, None); } } } diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index bf9b3b589..d7d9d68d6 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -331,13 +331,21 @@ where let (mut cursor, storage) = trace.cursor(); let mut yielded = false; + let mut key_con = Tr::KeyContainer::with_capacity(1); + let mut time_con = Tr::TimeContainer::with_capacity(1); + for time in frontier.iter() { + time_con.push_own(time); + } + // Process proposals one at a time, stopping if we should yield. for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() { // Use TOTAL ORDER to allow the release of `time`. yielded = yielded || yield_function(timer, *work); - if !yielded && !frontier.iter().any(|t| comparison(Tr::TimeContainer::borrow_as(t), initial)) { - cursor.seek_key(&storage, Tr::KeyContainer::borrow_as(key)); - if cursor.get_key(&storage) == Some(Tr::KeyContainer::borrow_as(key)) { + + if !yielded && !(0 .. time_con.len()).any(|i| comparison(time_con.index(i), initial)) { + key_con.clear(); key_con.push_own(&key); + cursor.seek_key(&storage, key_con.index(0)); + if cursor.get_key(&storage) == key_con.get(0) { while let Some(val2) = cursor.get_val(&storage) { cursor.map_times(&storage, |t, d| { if comparison(t, initial) { diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 0b0f6df5a..6eb7582eb 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -89,12 +89,14 @@ where }); let (mut cursor, storage) = trace.cursor(); - + // Key container to stage keys for comparison. + let mut key_con = Tr::KeyContainer::with_capacity(1); for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() { if !input2.frontier.less_equal(time) { logic2(prefix, &mut key1); - cursor.seek_key(&storage, Tr::KeyContainer::borrow_as(&key1)); - if cursor.get_key(&storage) == Some(Tr::KeyContainer::borrow_as(&key1)) { + key_con.clear(); key_con.push_own(&key1); + cursor.seek_key(&storage, key_con.index(1)); + if cursor.get_key(&storage) == Some(key_con.index(1)) { while let Some(value) = cursor.get_val(&storage) { let mut count = Tr::Diff::zero(); cursor.map_times(&storage, |t, d| { From 8fe4b6433c9ab74e28096b9048dc2c06f9eaf4fc Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 16 Jul 2025 13:34:47 -0400 Subject: [PATCH 4/4] Remove BatchContainer::borrow_as() --- .../src/trace/implementations/huffman_container.rs | 2 -- differential-dataflow/src/trace/implementations/mod.rs | 9 --------- 2 files changed, 11 deletions(-) diff --git a/differential-dataflow/src/trace/implementations/huffman_container.rs b/differential-dataflow/src/trace/implementations/huffman_container.rs index 9da01c2a5..26ac1ffee 100644 --- a/differential-dataflow/src/trace/implementations/huffman_container.rs +++ b/differential-dataflow/src/trace/implementations/huffman_container.rs @@ -95,8 +95,6 @@ impl BatchContainer for HuffmanContainer { Err(bytes) => other.extend_from_slice(bytes), } } - fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { Self::ReadItem { inner: Err(&owned[..]) } } - fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index 96e2c2967..c9c15206a 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -353,8 +353,6 @@ impl BatchContainer for OffsetList { #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item } #[inline(always)] - fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { *owned } - #[inline(always)] fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } @@ -538,10 +536,6 @@ pub mod containers { fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { *other = Self::into_owned(item); } - /// Borrows an owned instance as oneself. - #[must_use] - fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a>; - /// Push an item into this container fn push_ref(&mut self, item: Self::ReadItem<'_>); @@ -637,7 +631,6 @@ pub mod containers { #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() } #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); } - #[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { owned } fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } @@ -671,7 +664,6 @@ pub mod containers { #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() } #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); } - #[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { owned } fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } @@ -731,7 +723,6 @@ pub mod containers { #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.to_vec() } #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from_slice(item); } - #[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { &owned[..] } fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }