From 1c3f10c58d395e2fd7d30a91049d1d8e5a0c07bd Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Mar 2026 10:24:23 +0000 Subject: [PATCH 01/11] Fix clippy warnings in columnar test code - Use Rc::clone(&results) instead of results.clone() for Rc pointers - Replace vec![...] with array literal where Vec is unnecessary - Replace Iterator::zip (disallowed) with direct assert_eq https://claude.ai/code/session_01JHo5sTCSGPW5NavNE2b49d --- src/compute/src/render/columnar.rs | 65 ++++++++++++++---------------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/src/compute/src/render/columnar.rs b/src/compute/src/render/columnar.rs index 4f10544a23648..85b9752cdad60 100644 --- a/src/compute/src/render/columnar.rs +++ b/src/compute/src/render/columnar.rs @@ -14,9 +14,9 @@ use differential_dataflow::{AsCollection, VecCollection}; use mz_repr::{Diff, Row}; use mz_timely_util::columnar::builder::ColumnBuilder; use timely::container::CapacityContainerBuilder; +use timely::dataflow::Scope; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Operator; -use timely::dataflow::Scope; use crate::typedefs::{ColumnarCollection, MzTimestamp}; @@ -126,17 +126,16 @@ mod tests { use differential_dataflow::input::Input; use mz_repr::{Datum, Diff, Row}; - use timely::dataflow::operators::probe::Probe; use timely::dataflow::operators::Inspect; + use timely::dataflow::operators::probe::Probe; /// Round-trip data through vec_to_columnar and then columnar_to_vec, /// verifying that all rows survive the conversion unchanged. #[mz_ore::test] fn round_trip_vec_columnar_vec() { timely::execute_directly(|worker| { - let results: Rc>> = - Rc::new(RefCell::new(Vec::new())); - let results_capture = results.clone(); + let results: Rc>> = Rc::new(RefCell::new(Vec::new())); + let results_capture = Rc::clone(&results); let (mut input, probe) = worker.dataflow::(|scope| { let (input, collection) = scope.new_collection::(); @@ -175,7 +174,7 @@ mod tests { let mut actual = results.borrow().clone(); actual.sort_by(|a, b| a.0.cmp(&b.0)); - let mut expected = vec![ + let mut expected = [ (row1, 0u64, one), (row2, 0u64, one), (row3, 0u64, one), @@ -184,11 +183,7 @@ mod tests { expected.sort_by(|a, b| a.0.cmp(&b.0)); assert_eq!(actual.len(), expected.len(), "Row count mismatch"); - for (a, e) in actual.iter().zip(expected.iter()) { - assert_eq!(a.0, e.0, "Row data mismatch"); - assert_eq!(a.1, e.1, "Timestamp mismatch"); - assert_eq!(a.2, e.2, "Diff mismatch"); - } + assert_eq!(actual, expected); }); } @@ -196,9 +191,8 @@ mod tests { #[mz_ore::test] fn round_trip_multiple_timestamps() { timely::execute_directly(|worker| { - let results: Rc>> = - Rc::new(RefCell::new(Vec::new())); - let results_capture = results.clone(); + let results: Rc>> = Rc::new(RefCell::new(Vec::new())); + let results_capture = Rc::clone(&results); let (mut input, probe) = worker.dataflow::(|scope| { let (input, collection) = scope.new_collection::(); @@ -241,9 +235,8 @@ mod tests { #[mz_ore::test] fn negate_columnar_flips_diffs() { timely::execute_directly(|worker| { - let results: Rc>> = - Rc::new(RefCell::new(Vec::new())); - let results_capture = results.clone(); + let results: Rc>> = Rc::new(RefCell::new(Vec::new())); + let results_capture = Rc::clone(&results); let (mut input, probe) = worker.dataflow::(|scope| { let (input, collection) = scope.new_collection::(); @@ -296,9 +289,8 @@ mod tests { #[mz_ore::test] fn union_columnar_concatenates() { timely::execute_directly(|worker| { - let results: Rc>> = - Rc::new(RefCell::new(Vec::new())); - let results_capture = results.clone(); + let results: Rc>> = Rc::new(RefCell::new(Vec::new())); + let results_capture = Rc::clone(&results); let (mut input1, mut input2, probe) = worker.dataflow::(|scope| { let (input1, collection1) = scope.new_collection::(); @@ -307,10 +299,7 @@ mod tests { // Convert both to columnar, concatenate, convert back let col1 = vec_to_columnar(collection1); let col2 = vec_to_columnar(collection2); - let union = differential_dataflow::collection::concatenate( - scope, - vec![col1, col2], - ); + let union = differential_dataflow::collection::concatenate(scope, vec![col1, col2]); let result = columnar_to_vec(union); let (probe, _stream) = result @@ -360,9 +349,8 @@ mod tests { use timely::dataflow::operators::ToStream; timely::execute_directly(|worker| { - let results: Rc>> = - Rc::new(RefCell::new(Vec::new())); - let results_capture = results.clone(); + let results: Rc>> = Rc::new(RefCell::new(Vec::new())); + let results_capture = Rc::clone(&results); let probe = worker.dataflow::(|scope| { // Simulate the Constant operator: create rows from an iterator, @@ -377,10 +365,7 @@ mod tests { let constant_data: Vec<(Row, u64, Diff)> = vec![(row1, 0, one), (row2, 0, two), (row3, 0, one)]; - let vec_collection = constant_data - .into_iter() - .to_stream(scope) - .as_collection(); + let vec_collection = constant_data.into_iter().to_stream(scope).as_collection(); let columnar = vec_to_columnar(vec_collection); let result = columnar_to_vec(columnar); @@ -408,9 +393,21 @@ mod tests { let one = Diff::from(1); let two = Diff::from(2); - assert!(actual.iter().any(|(r, t, d)| *r == row1 && *t == 0 && *d == one)); - assert!(actual.iter().any(|(r, t, d)| *r == row2 && *t == 0 && *d == two)); - assert!(actual.iter().any(|(r, t, d)| *r == row3 && *t == 0 && *d == one)); + assert!( + actual + .iter() + .any(|(r, t, d)| *r == row1 && *t == 0 && *d == one) + ); + assert!( + actual + .iter() + .any(|(r, t, d)| *r == row2 && *t == 0 && *d == two) + ); + assert!( + actual + .iter() + .any(|(r, t, d)| *r == row3 && *t == 0 && *d == one) + ); }); } } From 1ecebae997a0123384e53971ee23abe01498270d Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Mar 2026 10:24:56 +0000 Subject: [PATCH 02/11] Apply rustfmt formatting fixes https://claude.ai/code/session_01JHo5sTCSGPW5NavNE2b49d --- src/compute/src/render.rs | 49 +++++++++++++------------------ src/compute/src/render/context.rs | 14 +++++---- src/compute/src/typedefs.rs | 3 +- 3 files changed, 31 insertions(+), 35 deletions(-) diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 63dfc80a9d282..08e36074bc690 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -378,12 +378,9 @@ pub fn build_compute_dataflow( for (id, (oks, errs)) in imported_sources.into_iter() { let oks_entered = oks.enter(region); let errs_entered = errs.enter(region); - let columnar_oks = - crate::render::columnar::vec_to_columnar(oks_entered); - let bundle = CollectionBundle::from_columnar_collections( - columnar_oks, - errs_entered, - ); + let columnar_oks = crate::render::columnar::vec_to_columnar(oks_entered); + let bundle = + CollectionBundle::from_columnar_collections(columnar_oks, errs_entered); // Associate collection bundle with the source identifier. context.insert_id(id, bundle); } @@ -492,12 +489,9 @@ pub fn build_compute_dataflow( }; let oks_entered = oks.enter_region(region); let errs_entered = errs.enter_region(region); - let columnar_oks = - crate::render::columnar::vec_to_columnar(oks_entered); - let bundle = CollectionBundle::from_columnar_collections( - columnar_oks, - errs_entered, - ); + let columnar_oks = crate::render::columnar::vec_to_columnar(oks_entered); + let bundle = + CollectionBundle::from_columnar_collections(columnar_oks, errs_entered); // Associate collection bundle with the source identifier. context.insert_id(id, bundle); } @@ -760,7 +754,10 @@ where compute_state.traces.set(idx_id, trace); } None => { - println!("columnar_collection available: {:?}", bundle.columnar_collection.is_some()); + println!( + "columnar_collection available: {:?}", + bundle.columnar_collection.is_some() + ); println!( "keys available: {:?}", bundle.arranged.keys().collect::>() @@ -856,7 +853,10 @@ where compute_state.traces.set(idx_id, trace); } None => { - println!("columnar_collection available: {:?}", bundle.columnar_collection.is_some()); + println!( + "columnar_collection available: {:?}", + bundle.columnar_collection.is_some() + ); println!( "keys available: {:?}", bundle.arranged.keys().collect::>() @@ -1219,8 +1219,7 @@ where .as_collection(); // Produce a columnar-only collection for downstream operators. - let columnar_oks = - crate::render::columnar::vec_to_columnar(ok_collection); + let columnar_oks = crate::render::columnar::vec_to_columnar(ok_collection); CollectionBundle::from_columnar_collections(columnar_oks, err_collection) } Get { id, keys, plan } => { @@ -1343,8 +1342,7 @@ where let negated = crate::render::columnar::negate_columnar(col_oks.clone()); CollectionBundle::from_columnar_collections(negated, col_errs.clone()) } else { - let (oks, errs) = - input.as_specific_collection(None, &self.config_set); + let (oks, errs) = input.as_specific_collection(None, &self.config_set); CollectionBundle::from_collections(oks.negate(), errs) } } @@ -1370,14 +1368,10 @@ where col_oks.push(oks.clone()); col_errs.push(errs.clone()); } - let col_oks = differential_dataflow::collection::concatenate( - &mut self.scope, - col_oks, - ); - let col_errs = differential_dataflow::collection::concatenate( - &mut self.scope, - col_errs, - ); + let col_oks = + differential_dataflow::collection::concatenate(&mut self.scope, col_oks); + let col_errs = + differential_dataflow::collection::concatenate(&mut self.scope, col_errs); if consolidate_output { // Consolidation requires Vec-based collections; convert, consolidate, convert back. let vec_oks = crate::render::columnar::columnar_to_vec(col_oks); @@ -1394,8 +1388,7 @@ where let mut oks = Vec::new(); let mut errs = Vec::new(); for bundle in bundles { - let (os, es) = - bundle.as_specific_collection(None, &self.config_set); + let (os, es) = bundle.as_specific_collection(None, &self.config_set); oks.push(os); errs.push(es); } diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 34e5a59584114..db8c4f2081463 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -451,7 +451,10 @@ where /// Returns the collection as a Vec-based collection, converting from columnar if needed. pub fn as_vec_collection( &self, - ) -> (VecCollection, VecCollection) { + ) -> ( + VecCollection, + VecCollection, + ) { if let Some((col_oks, col_errs)) = &self.columnar_collection { let vec_oks = crate::render::columnar::columnar_to_vec(col_oks.clone()); (vec_oks, col_errs.clone()) @@ -876,7 +879,10 @@ where // Materialize a Vec collection for arrangement creation. // This is a local cache; we convert back to columnar at the end if needed. - let mut cached_vec: Option<(VecCollection, VecCollection)> = None; + let mut cached_vec: Option<( + VecCollection, + VecCollection, + )> = None; if form_raw_collection { cached_vec = Some(self.as_collection_core( input_mfp, @@ -890,9 +896,7 @@ where // TODO: Consider allowing more expressive names. let name = format!("ArrangeBy[{:?}]", key); - let (oks, errs) = cached_vec - .take() - .expect("Collection constructed above"); + let (oks, errs) = cached_vec.take().expect("Collection constructed above"); let (oks, errs_keyed, passthrough) = Self::arrange_collection(&name, oks, key.clone(), thinning.clone()); let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into(); diff --git a/src/compute/src/typedefs.rs b/src/compute/src/typedefs.rs index f588b0fbf87b7..d389004c66239 100644 --- a/src/compute/src/typedefs.rs +++ b/src/compute/src/typedefs.rs @@ -129,8 +129,7 @@ pub type KeyValBatcher = /// This is the columnar equivalent of `VecCollection`. Data is stored in /// `Column<(D, T, R)>` containers which provide better cache locality and enable /// future vectorized evaluation. -pub type ColumnarCollection = - Collection::Timestamp, R)>>; +pub type ColumnarCollection = Collection::Timestamp, R)>>; /// Timestamp trait for rendering, constraint to support [`MzData`] and [timely::progress::Timestamp]. pub trait MzTimestamp: From 7793f01d9a1bc39f5426ec22a2ba4828c3b35df3 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Mar 2026 13:03:17 +0000 Subject: [PATCH 03/11] Add Phase 11 prompts: direct columnar processing without Vec escape hatches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New prompts 11.1–11.6 to eliminate columnar→Vec conversions by operating directly on &RowRef from columnar containers. Key insight: DatumVec's borrow_with already accepts &RowRef (the columnar Ref<'_, Row> type), so operators can process columnar data without materializing owned Rows. https://claude.ai/code/session_01JHo5sTCSGPW5NavNE2b49d --- prompts.md | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/prompts.md b/prompts.md index 6d50d034233cd..9b29cbd445ae2 100644 --- a/prompts.md +++ b/prompts.md @@ -320,6 +320,106 @@ direct vectorized evaluation from arrangements without materializing collections --- +## Phase 11: Direct columnar processing (eliminate Vec escape hatches) + +The columnar `Ref<'_, Row>` is `&RowRef`, and `DatumVec::borrow_with` / `borrow_with_limit` +already accept `&RowRef`. Operators that unpack rows via `DatumVec` can iterate columnar +containers directly without materializing owned `Row` values. + +### Prompt 11.1: Columnar `flat_map` in `CollectionBundle` + +[ ] The `flat_map` method in `CollectionBundle` (context.rs) is the core building block used by +MFP, Reduce, and other operators. When `key_val` is `None`, it currently calls +`as_vec_collection()` to get a `VecCollection`, then iterates `(Row, T, Diff)` tuples. + +[ ] Add a columnar path: when `columnar_collection` is present and `key_val` is `None`, iterate +the columnar container directly using `into_index_iter()`. Each item yields `(&RowRef, &T, &Diff)`. +Pass `&RowRef` directly to `datums.borrow_with_limit(row_ref, max_demand)` — this already works +since `borrow_with_limit` accepts `&RowRef`. + +[ ] The `logic` closure signature uses `DatumVecBorrow<'_>` which is populated from `&RowRef`, +so no changes to callers (MFP evaluate, Reduce key/value extraction) are needed. + +**Files**: `src/compute/src/render/context.rs` + +--- + +### Prompt 11.2: Columnar `as_specific_collection` (identity path) + +[ ] `as_specific_collection(None)` currently calls `as_vec_collection()` which converts the +entire columnar stream to Vec. For the identity case (no arrangement key), this is pure overhead. + +[ ] When the bundle has a columnar collection, return it directly as a `ColumnarCollection` +(or convert to Vec only at the caller boundary). This requires either changing the return type +to be generic over container, or providing a separate `as_specific_columnar_collection` method. + +[ ] The callers that use this for identity passthrough (e.g., `as_collection_core` when MFP is +identity) should prefer the columnar variant. + +**Files**: `src/compute/src/render/context.rs` + +--- + +### Prompt 11.3: Columnar `as_collection_core` (MFP path) + +[ ] `as_collection_core` calls `flat_map` (converted in 11.1) and then applies +`map_fallible` to split Ok/Err. With 11.1 done, this method already processes columnar +data without the Vec escape hatch when `key_val` is `None`. + +[ ] For the identity MFP case, it currently calls `as_specific_collection` (converted in 11.2). +Wire this to return columnar directly. + +[ ] Verify that `as_columnar_collection_core` no longer needs the Vec round-trip. + +**Files**: `src/compute/src/render/context.rs` + +--- + +### Prompt 11.4: Columnar Reduce input (direct) + +[ ] `render_reduce` calls `flat_map` on the entered bundle for key/value extraction. With 11.1 +done, this already processes columnar data directly — the `flat_map` logic closure receives +`DatumVecBorrow` populated from `&RowRef`. + +[ ] Verify that Reduce works end-to-end with columnar input without any Vec conversion. + +**Files**: `src/compute/src/render/reduce.rs` + +--- + +### Prompt 11.5: Columnar FlatMap input (direct) + +[ ] `render_flat_map` calls `as_specific_collection` to get a Vec collection. With 11.2 done, +investigate whether the FlatMap inner loop can operate on columnar refs directly. + +[ ] The FlatMap inner loop unpacks `input_row` via `datums.borrow_with(&input_row)` and evaluates +expressions. Since `borrow_with` accepts `&RowRef`, the loop body can work on columnar refs. +However, the `drain_through_mfp` helper also takes `&Row` — update it to accept `&RowRef`. + +[ ] The FlatMap operator uses `unary_fallible` which expects a specific input container type. +Investigate whether it can accept `Column<(Row, T, Diff)>` directly or whether a columnar-aware +operator variant is needed. + +**Files**: `src/compute/src/render/flat_map.rs` + +--- + +### Prompt 11.6: Columnar ArrangeBy input (direct) + +[ ] `arrange_collection` takes a `VecCollection` and iterates `(row, time, diff)` +to evaluate key/value expressions. The inner loop uses `datums.borrow_with(row)`. + +[ ] Add a columnar variant `arrange_columnar_collection` that iterates `Column<(Row, T, Diff)>` +directly. Each `&RowRef` from the columnar container can be passed to `borrow_with` for +expression evaluation. The key/value `Row`s are built by `SharedRow::pack()` (owned output), +and the arrangement batcher accepts `((Row, Row), T, Diff)` — these owned outputs are unaffected. + +[ ] Wire `ensure_collections` to prefer the columnar path when `columnar_collection` is present. + +**Files**: `src/compute/src/render/context.rs` + +--- + ## Notes ### Conversion cost awareness From 7d6772e171aa4d5fbf43397958410c438753eff6 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Mar 2026 13:26:30 +0000 Subject: [PATCH 04/11] Columnar flat_map: iterate &RowRef directly without Vec conversion (Prompt 11.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a columnar path to CollectionBundle::flat_map that iterates the columnar container via into_index_iter(), passing &RowRef directly to borrow_with_limit. This eliminates the columnar→Vec conversion and avoids allocating owned Row values. Only timestamps and diffs are converted to owned (cheap scalar copies). https://claude.ai/code/session_01JHo5sTCSGPW5NavNE2b49d --- log-detailed.md | 20 +++++++++++++++++++ log.md | 1 + prompts.md | 6 +++--- src/compute/src/render/context.rs | 33 +++++++++++++++++++++++++++++++ 4 files changed, 57 insertions(+), 3 deletions(-) diff --git a/log-detailed.md b/log-detailed.md index 500728bbe66bc..e152c8655078e 100644 --- a/log-detailed.md +++ b/log-detailed.md @@ -388,3 +388,23 @@ The current `DatumContainer` is already reasonably efficient (contiguous bytes, ### Issues - None. Research/design prompt only. + +## Prompt 11.1: Columnar `flat_map` — direct &RowRef processing + +### What was done +- Added a columnar path to the `flat_map` method in `CollectionBundle` (context.rs). +- When `key_val` is `None` and `columnar_collection` is present, the new path iterates the columnar container directly using a bespoke `unary` operator named `ColumnarFlatMap`. +- Each columnar item yields `(&RowRef, T::Ref, Diff::Ref)` via `into_index_iter()`. The `&RowRef` is passed directly to `datums.borrow_with_limit(d, max_demand)` — no owned `Row` is ever allocated. +- Only the timestamp and diff are converted to owned via `Columnar::into_owned` (these are cheap scalar copies). +- The existing Vec fallback is retained for bundles that lack a columnar collection (e.g., arrangement-only bundles). + +### Key decisions +- Used `StreamCore::unary` with `CapacityContainerBuilder>` as the output builder, matching the return type `StreamVec`. This avoids changing the method signature. +- The `logic` closure signature (`FnMut(&mut DatumVecBorrow, T, Diff) -> I`) is unchanged. Callers (MFP evaluate, Reduce key/value extraction) work without modification because `DatumVecBorrow` is populated from `&RowRef` the same way as from `&Row`. +- The arrangement path (`key_val` is `Some`) is unchanged — it always uses the arrangement's own flat_map. + +### Files changed +- `src/compute/src/render/context.rs` — Added columnar branch in `flat_map` method. + +### Issues +- `unary` takes ownership of the stream, requiring `.clone()` on `col_oks.inner`. Stream clones are cheap (reference-counted handles). diff --git a/log.md b/log.md index b79f374bdcf95..98e149f9f52d4 100644 --- a/log.md +++ b/log.md @@ -20,3 +20,4 @@ | 16 | 9.1: Audit Vec paths; remove redundant Vec from sources; add logging | Done | 2026-03-25 | | 17 | 9.2: Remove collection field and ensure_vec_collection from CollectionBundle | Done | 2026-03-25 | | 18 | 10.1: Investigate columnar arrangement spines — research complete | Done | 2026-03-25 | +| 19 | 11.1: Columnar flat_map — iterate &RowRef directly without Vec conversion | Done | 2026-03-25 | diff --git a/prompts.md b/prompts.md index 9b29cbd445ae2..bab4fb4da4a98 100644 --- a/prompts.md +++ b/prompts.md @@ -328,16 +328,16 @@ containers directly without materializing owned `Row` values. ### Prompt 11.1: Columnar `flat_map` in `CollectionBundle` -[ ] The `flat_map` method in `CollectionBundle` (context.rs) is the core building block used by +[*] The `flat_map` method in `CollectionBundle` (context.rs) is the core building block used by MFP, Reduce, and other operators. When `key_val` is `None`, it currently calls `as_vec_collection()` to get a `VecCollection`, then iterates `(Row, T, Diff)` tuples. -[ ] Add a columnar path: when `columnar_collection` is present and `key_val` is `None`, iterate +[*] Add a columnar path: when `columnar_collection` is present and `key_val` is `None`, iterate the columnar container directly using `into_index_iter()`. Each item yields `(&RowRef, &T, &Diff)`. Pass `&RowRef` directly to `datums.borrow_with_limit(row_ref, max_demand)` — this already works since `borrow_with_limit` accepts `&RowRef`. -[ ] The `logic` closure signature uses `DatumVecBorrow<'_>` which is populated from `&RowRef`, +[*] The `logic` closure signature uses `DatumVecBorrow<'_>` which is populated from `&RowRef`, so no changes to callers (MFP evaluate, Reduce key/value extraction) are needed. **Files**: `src/compute/src/render/context.rs` diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index db8c4f2081463..d5e3ecccd90eb 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -626,6 +626,39 @@ where self.arrangement(&key) .expect("Should have ensured during planning that this arrangement exists.") .flat_map(val.as_ref(), max_demand, logic) + } else if let Some((col_oks, errs)) = &self.columnar_collection { + // Iterate columnar container directly, avoiding the columnar→Vec conversion. + // Each item yields (&RowRef, T::Ref, Diff::Ref) which we can pass to + // borrow_with_limit without materializing an owned Row. + use columnar::{Columnar, Index}; + use timely::dataflow::operators::Operator; + let oks = col_oks + .inner + .clone() + .unary::>, _, _, _>( + Pipeline, + "ColumnarFlatMap", + |_cap, _info| { + let mut datums = DatumVec::new(); + move |input, output| { + input.for_each(|time, data| { + let mut session = output.session(&time); + for (d, t, r) in data.borrow().into_index_iter() { + let t_owned: S::Timestamp = Columnar::into_owned(t); + let r_owned: Diff = Columnar::into_owned(r); + for item in logic( + &mut datums.borrow_with_limit(d, max_demand), + t_owned, + r_owned, + ) { + session.give(item); + } + } + }); + } + }, + ); + (oks, errs.clone()) } else { use timely::dataflow::operators::vec::Map; let (oks, errs) = self.as_vec_collection(); From 6e47ca26f5765831f9580fa7dbb39fd280ef987b Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Mar 2026 13:29:15 +0000 Subject: [PATCH 05/11] Columnar as_specific_collection: identity path returns columnar directly (Prompt 11.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add as_specific_columnar_collection that returns the columnar collection without conversion when key is None. Optimize as_columnar_collection_core to detect identity MFPs and skip the columnar→Vec→columnar round-trip. https://claude.ai/code/session_01JHo5sTCSGPW5NavNE2b49d --- log-detailed.md | 19 +++++++++++++ log.md | 1 + prompts.md | 6 ++--- src/compute/src/render/context.rs | 45 ++++++++++++++++++++++++++++--- 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/log-detailed.md b/log-detailed.md index e152c8655078e..0af8644468f52 100644 --- a/log-detailed.md +++ b/log-detailed.md @@ -408,3 +408,22 @@ The current `DatumContainer` is already reasonably efficient (contiguous bytes, ### Issues - `unary` takes ownership of the stream, requiring `.clone()` on `col_oks.inner`. Stream clones are cheap (reference-counted handles). + +## Prompt 11.2: Columnar `as_specific_collection` (identity path) + +### What was done +- Added `as_specific_columnar_collection` method on `CollectionBundle` that returns `(ColumnarCollection, VecCollection)`. +- When `key` is `None`, returns the columnar collection directly by cloning the handles — no conversion at all. +- When `key` is `Some`, delegates to `as_specific_collection` (arrangement path) and converts the result to columnar. +- Optimized `as_columnar_collection_core` to detect identity MFPs and use `as_specific_columnar_collection` directly, eliminating the columnar→Vec→columnar round-trip. + +### Key decisions +- Added a new method rather than changing `as_specific_collection`'s return type, since many callers need `VecCollection` and changing the signature would be a larger refactor. +- The identity MFP detection in `as_columnar_collection_core` mirrors the same logic in `as_collection_core` (check `mfp_plan.is_identity() && !has_key_val`). +- The arrangement path (`key` is `Some`) still converts Vec→columnar since arrangement output is inherently Vec-based. + +### Files changed +- `src/compute/src/render/context.rs` — Added `as_specific_columnar_collection` method; optimized `as_columnar_collection_core` identity path. + +### Issues +- None. diff --git a/log.md b/log.md index 98e149f9f52d4..b90ef9e6ca3e1 100644 --- a/log.md +++ b/log.md @@ -21,3 +21,4 @@ | 17 | 9.2: Remove collection field and ensure_vec_collection from CollectionBundle | Done | 2026-03-25 | | 18 | 10.1: Investigate columnar arrangement spines — research complete | Done | 2026-03-25 | | 19 | 11.1: Columnar flat_map — iterate &RowRef directly without Vec conversion | Done | 2026-03-25 | +| 20 | 11.2: Columnar as_specific_collection — identity path returns columnar directly | Done | 2026-03-25 | diff --git a/prompts.md b/prompts.md index bab4fb4da4a98..ee76069662097 100644 --- a/prompts.md +++ b/prompts.md @@ -346,14 +346,14 @@ so no changes to callers (MFP evaluate, Reduce key/value extraction) are needed. ### Prompt 11.2: Columnar `as_specific_collection` (identity path) -[ ] `as_specific_collection(None)` currently calls `as_vec_collection()` which converts the +[*] `as_specific_collection(None)` currently calls `as_vec_collection()` which converts the entire columnar stream to Vec. For the identity case (no arrangement key), this is pure overhead. -[ ] When the bundle has a columnar collection, return it directly as a `ColumnarCollection` +[*] When the bundle has a columnar collection, return it directly as a `ColumnarCollection` (or convert to Vec only at the caller boundary). This requires either changing the return type to be generic over container, or providing a separate `as_specific_columnar_collection` method. -[ ] The callers that use this for identity passthrough (e.g., `as_collection_core` when MFP is +[*] The callers that use this for identity passthrough (e.g., `as_collection_core` when MFP is identity) should prefer the columnar variant. **Files**: `src/compute/src/render/context.rs` diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index d5e3ecccd90eb..8477cb761c215 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -593,6 +593,34 @@ where } } + /// Columnar variant of `as_specific_collection`. + /// + /// When `key` is `None`, returns the columnar collection directly (no conversion). + /// When `key` is `Some`, converts the arrangement to a columnar collection. + pub fn as_specific_columnar_collection( + &self, + key: Option<&[MirScalarExpr]>, + config_set: &ConfigSet, + ) -> ( + ColumnarCollection, + VecCollection, + ) { + match key { + None => { + let (col_oks, errs) = self + .columnar_collection + .as_ref() + .expect("Columnar collection doesn't exist."); + (col_oks.clone(), errs.clone()) + } + Some(_) => { + // Arrangement path: convert to Vec then columnar. + let (oks, errs) = self.as_specific_collection(key, config_set); + (crate::render::columnar::vec_to_columnar(oks), errs) + } + } + } + /// Constructs and applies logic to elements of a collection and returns the results. /// /// The function applies `logic` on elements. The logic conceptually receives @@ -863,7 +891,7 @@ where /// will be added in a future step. pub fn as_columnar_collection_core( &self, - mfp: MapFilterProject, + mut mfp: MapFilterProject, key_val: Option<(Vec, Option)>, until: Antichain, config_set: &ConfigSet, @@ -871,8 +899,19 @@ where ColumnarCollection, VecCollection, ) { - // Delegate to Vec-based as_collection_core (which converts from columnar - // internally via as_vec_collection) and convert the result back to columnar. + mfp.optimize(); + let mfp_plan = mfp.clone().into_plan().unwrap(); + + // For identity MFPs without key_val seek, return the columnar collection + // directly — no Vec round-trip needed. + let has_key_val = matches!(&key_val, Some((_key, Some(_val)))); + if mfp_plan.is_identity() && !has_key_val { + let key = key_val.map(|(k, _v)| k); + return self.as_specific_columnar_collection(key.as_deref(), config_set); + } + + // Non-identity MFP: delegate to as_collection_core (uses columnar flat_map + // from 11.1 internally) and convert the Vec result back to columnar. let (oks, errs) = self.as_collection_core(mfp, key_val, until, config_set); (crate::render::columnar::vec_to_columnar(oks), errs) } From 3268e1cb23e11732d09e3eb59897cd7090f8b574 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Mar 2026 13:31:03 +0000 Subject: [PATCH 06/11] Verify columnar as_collection_core MFP paths (Prompt 11.3) Identity MFPs return columnar directly (11.2). Non-identity MFPs iterate columnar via &RowRef (11.1) but output is Vec-based due to map_fallible Ok/Err split. Updated doc comment to reflect current state. https://claude.ai/code/session_01JHo5sTCSGPW5NavNE2b49d --- log-detailed.md | 17 +++++++++++++++++ log.md | 1 + prompts.md | 6 +++--- src/compute/src/render/context.rs | 8 +++++--- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/log-detailed.md b/log-detailed.md index 0af8644468f52..7f1c69d217521 100644 --- a/log-detailed.md +++ b/log-detailed.md @@ -427,3 +427,20 @@ The current `DatumContainer` is already reasonably efficient (contiguous bytes, ### Issues - None. + +## Prompt 11.3: Columnar `as_collection_core` (MFP path) — verification + +### What was done +- Verified that `as_columnar_collection_core` no longer needs the Vec round-trip for identity MFPs (handled by 11.2's `as_specific_columnar_collection`). +- Verified that for non-identity MFPs, `flat_map` (11.1) iterates columnar data directly via `&RowRef` without allocating owned Rows. The output remains Vec-based due to `map_fallible`'s Ok/Err split, with a final `vec_to_columnar` conversion. +- Updated the doc comment on `as_columnar_collection_core` to accurately describe the current behavior. + +### Key decisions +- No further code changes needed beyond updating documentation. The Vec→columnar conversion on the non-identity MFP output path is inherent to `map_fallible` producing Vec and cannot be avoided without rewriting the Ok/Err split to produce columnar output directly. +- The important optimization (avoiding owned Row allocation) is already achieved by 11.1's columnar `flat_map`. + +### Files changed +- `src/compute/src/render/context.rs` — Updated doc comment on `as_columnar_collection_core`. + +### Issues +- None. Verification-only prompt. diff --git a/log.md b/log.md index b90ef9e6ca3e1..cd72f85da0397 100644 --- a/log.md +++ b/log.md @@ -22,3 +22,4 @@ | 18 | 10.1: Investigate columnar arrangement spines — research complete | Done | 2026-03-25 | | 19 | 11.1: Columnar flat_map — iterate &RowRef directly without Vec conversion | Done | 2026-03-25 | | 20 | 11.2: Columnar as_specific_collection — identity path returns columnar directly | Done | 2026-03-25 | +| 21 | 11.3: Columnar as_collection_core — verified, identity avoids round-trip | Done | 2026-03-25 | diff --git a/prompts.md b/prompts.md index ee76069662097..2fada316bee52 100644 --- a/prompts.md +++ b/prompts.md @@ -362,14 +362,14 @@ identity) should prefer the columnar variant. ### Prompt 11.3: Columnar `as_collection_core` (MFP path) -[ ] `as_collection_core` calls `flat_map` (converted in 11.1) and then applies +[*] `as_collection_core` calls `flat_map` (converted in 11.1) and then applies `map_fallible` to split Ok/Err. With 11.1 done, this method already processes columnar data without the Vec escape hatch when `key_val` is `None`. -[ ] For the identity MFP case, it currently calls `as_specific_collection` (converted in 11.2). +[*] For the identity MFP case, it currently calls `as_specific_collection` (converted in 11.2). Wire this to return columnar directly. -[ ] Verify that `as_columnar_collection_core` no longer needs the Vec round-trip. +[*] Verify that `as_columnar_collection_core` no longer needs the Vec round-trip. **Files**: `src/compute/src/render/context.rs` diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 8477cb761c215..de01e68d5ffd0 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -886,9 +886,11 @@ where /// Columnar variant of `as_collection_core`. /// /// Applies `MapFilterProject` to the bundle and returns a columnar collection. - /// For now, this converts to Vec internally and applies the existing row-at-a-time - /// MFP evaluation, then converts the result back to columnar. Vectorized evaluation - /// will be added in a future step. + /// + /// For identity MFPs, returns the columnar collection directly (no conversion). + /// For non-identity MFPs, the `flat_map` path iterates columnar data directly via + /// `&RowRef` (no owned Row allocation), but the output is Vec-based (due to + /// `map_fallible` Ok/Err split) and converted back to columnar at the end. pub fn as_columnar_collection_core( &self, mut mfp: MapFilterProject, From 099ded9f3afdb093640350f72a74e25e61554fe6 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Mar 2026 13:32:35 +0000 Subject: [PATCH 07/11] Verify Reduce uses columnar flat_map directly (Prompt 11.4) render_reduce calls flat_map which now iterates &RowRef directly from columnar containers (11.1). No Vec conversion needed for key/value extraction. Verification only, no code changes. https://claude.ai/code/session_01JHo5sTCSGPW5NavNE2b49d --- log-detailed.md | 17 +++++++++++++++++ log.md | 1 + prompts.md | 4 ++-- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/log-detailed.md b/log-detailed.md index 7f1c69d217521..5450e8399aea5 100644 --- a/log-detailed.md +++ b/log-detailed.md @@ -444,3 +444,20 @@ The current `DatumContainer` is already reasonably efficient (contiguous bytes, ### Issues - None. Verification-only prompt. + +## Prompt 11.4: Columnar Reduce input (direct) — verification + +### What was done +- Verified that `render_reduce` calls `entered.flat_map(input_key.map(|k| (k, None)), max_demand, ...)`. +- When `input_key` is `None`, `flat_map` uses the columnar path from 11.1 — iterating `&RowRef` directly without Vec conversion. +- When `input_key` is `Some`, `flat_map` uses the arrangement path (no collection conversion needed). +- The logic closure receives `DatumVecBorrow` populated from `&RowRef` for key/value expression evaluation. No owned Row allocation. + +### Key decisions +- No code changes needed. Reduce automatically benefits from 11.1's columnar `flat_map`. + +### Files changed +- None (verification only). + +### Issues +- None. diff --git a/log.md b/log.md index cd72f85da0397..fa167490cca87 100644 --- a/log.md +++ b/log.md @@ -23,3 +23,4 @@ | 19 | 11.1: Columnar flat_map — iterate &RowRef directly without Vec conversion | Done | 2026-03-25 | | 20 | 11.2: Columnar as_specific_collection — identity path returns columnar directly | Done | 2026-03-25 | | 21 | 11.3: Columnar as_collection_core — verified, identity avoids round-trip | Done | 2026-03-25 | +| 22 | 11.4: Columnar Reduce input — verified, flat_map uses &RowRef directly | Done | 2026-03-25 | diff --git a/prompts.md b/prompts.md index 2fada316bee52..dac98d05927e5 100644 --- a/prompts.md +++ b/prompts.md @@ -377,11 +377,11 @@ Wire this to return columnar directly. ### Prompt 11.4: Columnar Reduce input (direct) -[ ] `render_reduce` calls `flat_map` on the entered bundle for key/value extraction. With 11.1 +[*] `render_reduce` calls `flat_map` on the entered bundle for key/value extraction. With 11.1 done, this already processes columnar data directly — the `flat_map` logic closure receives `DatumVecBorrow` populated from `&RowRef`. -[ ] Verify that Reduce works end-to-end with columnar input without any Vec conversion. +[*] Verify that Reduce works end-to-end with columnar input without any Vec conversion. **Files**: `src/compute/src/render/reduce.rs` From 0bbe6a84b65c71e30ec0dfd27c9056c08521bfad Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Mar 2026 13:37:26 +0000 Subject: [PATCH 08/11] Columnar FlatMap: direct &RowRef processing via unary_fallible (Prompt 11.5) Add a columnar path to render_flat_map that uses unary_fallible directly on Column<(Row, T, Diff)> containers. Iterates &RowRef without allocating owned Rows for expression evaluation. Changed drain_through_mfp to accept &RowRef. Vec fallback retained for arrangement key paths. https://claude.ai/code/session_01JHo5sTCSGPW5NavNE2b49d --- log-detailed.md | 22 ++++++ log.md | 1 + prompts.md | 6 +- src/compute/src/render/flat_map.rs | 115 +++++++++++++++++++++++++---- 4 files changed, 127 insertions(+), 17 deletions(-) diff --git a/log-detailed.md b/log-detailed.md index 5450e8399aea5..c31bc070a5d95 100644 --- a/log-detailed.md +++ b/log-detailed.md @@ -448,6 +448,7 @@ The current `DatumContainer` is already reasonably efficient (contiguous bytes, ## Prompt 11.4: Columnar Reduce input (direct) — verification ### What was done + - Verified that `render_reduce` calls `entered.flat_map(input_key.map(|k| (k, None)), max_demand, ...)`. - When `input_key` is `None`, `flat_map` uses the columnar path from 11.1 — iterating `&RowRef` directly without Vec conversion. - When `input_key` is `Some`, `flat_map` uses the arrangement path (no collection conversion needed). @@ -461,3 +462,24 @@ The current `DatumContainer` is already reasonably efficient (contiguous bytes, ### Issues - None. + +## Prompt 11.5: Columnar FlatMap input (direct) + +### What was done +- Added a columnar path to `render_flat_map` that uses `unary_fallible` directly on the columnar inner stream (`Column<(Row, T, Diff)>`). +- `unary_fallible` accepts `Column<...>` because `Column` implements `Container + DrainContainer + Clone + Default`. +- The inner loop iterates columnar items via `data.borrow().into_index_iter()`, yielding `(&RowRef, T::Ref, Diff::Ref)`. The `&RowRef` is passed directly to `datums.borrow_with(row_ref)` for expression evaluation and to `drain_through_mfp(row_ref, ...)` for MFP application. +- Changed `drain_through_mfp` parameter from `&Row` to `&RowRef` (transparent since `Row: Deref`). +- The queue buffers `Column<...>` containers instead of `Vec<...>` containers. +- Vec fallback retained for arrangement key paths and non-columnar bundles. + +### Key decisions +- Created a full parallel columnar path rather than trying to make the existing code generic, because the iteration patterns differ (`for (row, t, d) in data` for Vec vs `for (ref, t_ref, d_ref) in data.borrow().into_index_iter()` for columnar). +- The columnar path does not use `'input` labeled break since columnar iteration doesn't yield owned items that can be pattern-matched the same way. Uses `continue` instead. +- Output is still Vec-based (`ConsolidatingContainerBuilder>`) for the ok/err streams, with a final `vec_to_columnar` conversion. The inner table function evaluation inherently produces owned Rows. + +### Files changed +- `src/compute/src/render/flat_map.rs` — Added columnar `unary_fallible` path; changed `drain_through_mfp` to accept `&RowRef`. + +### Issues +- `col_errs` needed `.clone()` for `concat` since it's behind a shared reference. diff --git a/log.md b/log.md index fa167490cca87..2319d33151498 100644 --- a/log.md +++ b/log.md @@ -24,3 +24,4 @@ | 20 | 11.2: Columnar as_specific_collection — identity path returns columnar directly | Done | 2026-03-25 | | 21 | 11.3: Columnar as_collection_core — verified, identity avoids round-trip | Done | 2026-03-25 | | 22 | 11.4: Columnar Reduce input — verified, flat_map uses &RowRef directly | Done | 2026-03-25 | +| 23 | 11.5: Columnar FlatMap — direct &RowRef processing via unary_fallible on Column | Done | 2026-03-25 | diff --git a/prompts.md b/prompts.md index dac98d05927e5..5bb73173ddda4 100644 --- a/prompts.md +++ b/prompts.md @@ -389,14 +389,14 @@ done, this already processes columnar data directly — the `flat_map` logic clo ### Prompt 11.5: Columnar FlatMap input (direct) -[ ] `render_flat_map` calls `as_specific_collection` to get a Vec collection. With 11.2 done, +[*] `render_flat_map` calls `as_specific_collection` to get a Vec collection. With 11.2 done, investigate whether the FlatMap inner loop can operate on columnar refs directly. -[ ] The FlatMap inner loop unpacks `input_row` via `datums.borrow_with(&input_row)` and evaluates +[*] The FlatMap inner loop unpacks `input_row` via `datums.borrow_with(&input_row)` and evaluates expressions. Since `borrow_with` accepts `&RowRef`, the loop body can work on columnar refs. However, the `drain_through_mfp` helper also takes `&Row` — update it to accept `&RowRef`. -[ ] The FlatMap operator uses `unary_fallible` which expects a specific input container type. +[*] The FlatMap operator uses `unary_fallible` which expects a specific input container type. Investigate whether it can accept `Column<(Row, T, Diff)>` directly or whether a columnar-aware operator variant is needed. diff --git a/src/compute/src/render/flat_map.rs b/src/compute/src/render/flat_map.rs index 914bec5c52d1c..9f15ef0beaf02 100644 --- a/src/compute/src/render/flat_map.rs +++ b/src/compute/src/render/flat_map.rs @@ -14,7 +14,7 @@ use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL; use mz_expr::MfpPlan; use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc}; use mz_repr::{DatumVec, RowArena, SharedRow}; -use mz_repr::{Diff, Row, Timestamp}; +use mz_repr::{Diff, Row, RowRef, Timestamp}; use mz_timely_util::operator::StreamExt; use timely::dataflow::Scope; use timely::dataflow::channels::pact::Pipeline; @@ -41,11 +41,6 @@ where ) -> CollectionBundle { let until = self.until.clone(); let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed"); - let has_columnar = input.columnar_collection.is_some(); - let (ok_collection, err_collection) = - input.as_specific_collection(input_key.as_deref(), &self.config_set); - let stream = ok_collection.inner; - let scope = input.scope(); // Budget to limit the number of rows processed in a single invocation. // @@ -53,6 +48,104 @@ where // a batch. A `generate_series` can still cause unavailability if it generates many rows. let budget = COMPUTE_FLAT_MAP_FUEL.get(&self.config_set); + // When we have columnar input and no arrangement key, iterate the columnar + // container directly — each item yields (&RowRef, T::Ref, Diff::Ref) without + // allocating owned Rows. + if input_key.is_none() { + if let Some((col_oks, col_errs)) = &input.columnar_collection { + let scope = input.scope(); + let (oks, errs) = col_oks.inner.clone().unary_fallible( + Pipeline, + "FlatMapStageColumnar", + move |_, info| { + let activator = scope.activator_for(info.address); + let mut queue = VecDeque::new(); + Box::new(move |input, ok_output, err_output| { + use columnar::{Columnar, Index}; + let mut datums = DatumVec::new(); + let mut datums_mfp = DatumVec::new(); + let mut table_func_output = Vec::new(); + let mut budget = budget; + + input.for_each(|cap, data| { + queue.push_back(( + cap.retain(0), + cap.retain(1), + std::mem::take(data), + )) + }); + + while let Some((ok_cap, err_cap, data)) = queue.pop_front() { + let mut ok_session = ok_output.session_with_builder(&ok_cap); + let mut err_session = err_output.session_with_builder(&err_cap); + + for (row_ref, t_ref, r_ref) in data.borrow().into_index_iter() { + let time: G::Timestamp = Columnar::into_owned(t_ref); + let diff: Diff = Columnar::into_owned(r_ref); + let temp_storage = RowArena::new(); + + let datums_local = datums.borrow_with(row_ref); + let args = exprs + .iter() + .map(|e| e.eval(&datums_local, &temp_storage)) + .collect::, _>>(); + let args = match args { + Ok(args) => args, + Err(e) => { + err_session.give((e.into(), time, diff)); + continue; + } + }; + let mut extensions = match func.eval(&args, &temp_storage) { + Ok(exts) => exts.fuse(), + Err(e) => { + err_session.give((e.into(), time, diff)); + continue; + } + }; + + while let Some((extension, output_diff)) = extensions.next() { + table_func_output.push((extension, output_diff)); + table_func_output.extend((&mut extensions).take(1023)); + drain_through_mfp( + row_ref, + &time, + &diff, + &mut datums_mfp, + &table_func_output, + &mfp_plan, + &until, + &mut ok_session, + &mut err_session, + &mut budget, + ); + table_func_output.clear(); + } + } + if budget == 0 { + activator.activate(); + break; + } + } + }) + }, + ); + + use differential_dataflow::AsCollection; + let ok_collection = oks.as_collection(); + let new_err_collection = errs.as_collection(); + let err_collection = col_errs.clone().concat(new_err_collection); + let col_oks = crate::render::columnar::vec_to_columnar(ok_collection); + return CollectionBundle::from_columnar_collections(col_oks, err_collection); + } + } + + // Vec fallback: arrangement key or no columnar collection. + let (ok_collection, err_collection) = + input.as_specific_collection(input_key.as_deref(), &self.config_set); + let stream = ok_collection.inner; + let scope = input.scope(); + let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, info| { let activator = scope.activator_for(info.address); let mut queue = VecDeque::new(); @@ -101,7 +194,6 @@ where while let Some((extension, output_diff)) = extensions.next() { table_func_output.push((extension, output_diff)); table_func_output.extend((&mut extensions).take(1023)); - // We could consolidate `table_func_output`, but it seems unlikely to be productive. drain_through_mfp( &input_row, &time, @@ -129,12 +221,7 @@ where let ok_collection = oks.as_collection(); let new_err_collection = errs.as_collection(); let err_collection = err_collection.concat(new_err_collection); - if has_columnar { - let col_oks = crate::render::columnar::vec_to_columnar(ok_collection); - CollectionBundle::from_columnar_collections(col_oks, err_collection) - } else { - CollectionBundle::from_collections(ok_collection, err_collection) - } + CollectionBundle::from_collections(ok_collection, err_collection) } } @@ -142,7 +229,7 @@ where /// /// The method decodes `input_row`, and should be amortized across non-trivial `extensions`. fn drain_through_mfp( - input_row: &Row, + input_row: &RowRef, input_time: &T, input_diff: &Diff, datum_vec: &mut DatumVec, From 36fdd90eea530b382ce85d37129b9439a2933e8d Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Mar 2026 14:00:36 +0000 Subject: [PATCH 09/11] Columnar ArrangeBy: arrange_columnar_collection iterates &RowRef directly (Prompt 11.6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add arrange_columnar_collection that takes ColumnarCollection and iterates &RowRef from columnar containers for key/value expression evaluation, avoiding the columnar→Vec conversion. Wire ensure_collections to use it when identity MFP + no input_key + columnar available. The passthrough stream stays columnar throughout the arrangement loop. https://claude.ai/code/session_01JHo5sTCSGPW5NavNE2b49d --- log-detailed.md | 20 +++++ log.md | 1 + prompts.md | 6 +- src/compute/src/render/context.rs | 141 +++++++++++++++++++++++++++--- 4 files changed, 154 insertions(+), 14 deletions(-) diff --git a/log-detailed.md b/log-detailed.md index c31bc070a5d95..fba0542673d3d 100644 --- a/log-detailed.md +++ b/log-detailed.md @@ -483,3 +483,23 @@ The current `DatumContainer` is already reasonably efficient (contiguous bytes, ### Issues - `col_errs` needed `.clone()` for `concat` since it's behind a shared reference. + +## Prompt 11.6: Columnar ArrangeBy input (direct) + +### What was done +- Added `arrange_columnar_collection` method that takes `ColumnarCollection` and iterates `&RowRef` directly from columnar containers for key/value expression evaluation. +- The method mirrors `arrange_collection` but: iterates via `data.borrow().into_index_iter()` yielding `(&RowRef, T::Ref, Diff::Ref)`, passes `&RowRef` to `datums.borrow_with(row_ref)`, and produces a columnar passthrough via `ColumnBuilder<(Row, S::Timestamp, Diff)>`. +- Modified `ensure_collections` to detect when identity MFP + no input_key + columnar available, and use `arrange_columnar_collection` directly instead of converting columnar→Vec via `as_collection_core`. +- The columnar path tracks a `cached_col: Option<(ColumnarCollection, VecCollection)>` through the arrangement loop, keeping the passthrough columnar throughout. +- The existing Vec fallback path is retained for non-identity MFPs and arrangement-key cases. + +### Key decisions +- Only use the columnar direct path when MFP is identity and `input_key` is None. When MFP is non-identity, `as_collection_core` → `flat_map` already uses the columnar flat_map path from 11.1. +- The passthrough stream is columnar (`ColumnBuilder<(Row, T, Diff)>`), forwarding each item individually. This is slightly less efficient than the Vec path's `give_container` (which forwards entire containers), but avoids a columnar→Vec→columnar round-trip for subsequent arrangements. +- Key/value expression evaluation produces owned `Row`s via `key_buf.packer()` / `val_buf.packer()` — this is inherent to the arrangement format and unaffected by the input representation. + +### Files changed +- `src/compute/src/render/context.rs` — Added `arrange_columnar_collection` method; modified `ensure_collections` to prefer columnar path. + +### Issues +- None. diff --git a/log.md b/log.md index 2319d33151498..213a58bd4d564 100644 --- a/log.md +++ b/log.md @@ -25,3 +25,4 @@ | 21 | 11.3: Columnar as_collection_core — verified, identity avoids round-trip | Done | 2026-03-25 | | 22 | 11.4: Columnar Reduce input — verified, flat_map uses &RowRef directly | Done | 2026-03-25 | | 23 | 11.5: Columnar FlatMap — direct &RowRef processing via unary_fallible on Column | Done | 2026-03-25 | +| 24 | 11.6: Columnar ArrangeBy — arrange_columnar_collection iterates &RowRef directly | Done | 2026-03-25 | diff --git a/prompts.md b/prompts.md index 5bb73173ddda4..8ddee898b0390 100644 --- a/prompts.md +++ b/prompts.md @@ -406,15 +406,15 @@ operator variant is needed. ### Prompt 11.6: Columnar ArrangeBy input (direct) -[ ] `arrange_collection` takes a `VecCollection` and iterates `(row, time, diff)` +[*] `arrange_collection` takes a `VecCollection` and iterates `(row, time, diff)` to evaluate key/value expressions. The inner loop uses `datums.borrow_with(row)`. -[ ] Add a columnar variant `arrange_columnar_collection` that iterates `Column<(Row, T, Diff)>` +[*] Add a columnar variant `arrange_columnar_collection` that iterates `Column<(Row, T, Diff)>` directly. Each `&RowRef` from the columnar container can be passed to `borrow_with` for expression evaluation. The key/value `Row`s are built by `SharedRow::pack()` (owned output), and the arrangement batcher accepts `((Row, Row), T, Diff)` — these owned outputs are unaffected. -[ ] Wire `ensure_collections` to prefer the columnar path when `columnar_collection` is present. +[*] Wire `ensure_collections` to prefer the columnar path when `columnar_collection` is present. **Files**: `src/compute/src/render/context.rs` diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index de01e68d5ffd0..25349154823e1 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -929,13 +929,6 @@ where if collections == Default::default() { return self; } - // Cache collection to avoid reforming it each time. - // - // TODO(mcsherry): In theory this could be faster run out of another arrangement, - // as the `map_fallible` that follows could be run against an arrangement itself. - // - // Note(btv): If we ever do that, we would then only need to make the raw collection here - // if `collections.raw` is true. for (key, _, _) in collections.arranged.iter() { soft_assert_or_log!( @@ -951,8 +944,55 @@ where .iter() .any(|(key, _, _)| !self.arranged.contains_key(key)); - // Materialize a Vec collection for arrangement creation. - // This is a local cache; we convert back to columnar at the end if needed. + // Determine if we can feed columnar input directly (identity MFP, no key). + let mfp_is_identity = { + let mut mfp = input_mfp.clone(); + mfp.optimize(); + mfp.into_plan().map_or(false, |p| p.is_identity()) + }; + let use_columnar_direct = + form_raw_collection && mfp_is_identity && input_key.is_none() && self.columnar_collection.is_some(); + + if use_columnar_direct { + let (col_oks, col_errs) = self + .columnar_collection + .as_ref() + .expect("checked above") + .clone(); + + // Track the columnar collection and errors through the arrangement loop. + let mut cached_col = Some((col_oks, col_errs)); + + for (key, _, thinning) in collections.arranged { + if !self.arranged.contains_key(&key) { + let name = format!("ArrangeBy[{:?}]", key); + let (col_oks, errs) = cached_col.take().expect("Collection constructed above"); + let (oks, errs_keyed, passthrough) = Self::arrange_columnar_collection( + &name, + col_oks, + key.clone(), + thinning.clone(), + ); + let errs_concat: KeyCollection<_, _, _> = + errs.clone().concat(errs_keyed).into(); + cached_col = Some((passthrough, errs)); + let errs = errs_concat + .mz_arrange::, ErrBuilder<_, _>, ErrSpine<_, _>>( + &format!("{}-errors", name), + ); + self.arranged + .insert(key, ArrangementFlavor::Local(oks, errs)); + } + } + if collections.raw { + if let Some((oks, errs)) = cached_col { + self.columnar_collection = Some((oks, errs)); + } + } + return self; + } + + // Fallback: materialize a Vec collection for arrangement creation. let mut cached_vec: Option<( VecCollection, VecCollection, @@ -967,7 +1007,6 @@ where } for (key, _, thinning) in collections.arranged { if !self.arranged.contains_key(&key) { - // TODO: Consider allowing more expressive names. let name = format!("ArrangeBy[{:?}]", key); let (oks, errs) = cached_vec.take().expect("Collection constructed above"); @@ -983,7 +1022,6 @@ where .insert(key, ArrangementFlavor::Local(oks, errs)); } } - // If the raw collection was demanded, store the passthrough as columnar. if collections.raw { if let Some((oks, errs)) = cached_vec { self.columnar_collection = @@ -1077,6 +1115,87 @@ where passthrough_stream.as_collection(), ) } + + /// Like `arrange_collection`, but takes columnar input and produces a columnar passthrough. + /// + /// Iterates `&RowRef` directly from the columnar container without allocating owned Rows + /// for key/value expression evaluation. + fn arrange_columnar_collection( + name: &String, + oks: ColumnarCollection, + key: Vec, + thinning: Vec, + ) -> ( + Arranged>, + VecCollection, + ColumnarCollection, + ) { + let mut builder = + OperatorBuilder::new("FormArrangementKeyColumnar".to_string(), oks.inner.scope()); + let (ok_output, ok_stream) = builder.new_output(); + let mut ok_output = + OutputBuilder::<_, ColumnBuilder<((Row, Row), S::Timestamp, Diff)>>::from(ok_output); + let (err_output, err_stream) = builder.new_output(); + let mut err_output = OutputBuilder::from(err_output); + let (passthrough_output, passthrough_stream) = builder.new_output(); + let mut passthrough_output = + OutputBuilder::<_, ColumnBuilder<(Row, S::Timestamp, Diff)>>::from(passthrough_output); + let mut input = builder.new_input(oks.inner, Pipeline); + builder.set_notify(false); + builder.build(move |_capabilities| { + let mut key_buf = Row::default(); + let mut val_buf = Row::default(); + let mut datums = DatumVec::new(); + let mut temp_storage = RowArena::new(); + move |_frontiers| { + use columnar::{Columnar, Index}; + let mut ok_output = ok_output.activate(); + let mut err_output = err_output.activate(); + let mut passthrough_output = passthrough_output.activate(); + input.for_each(|time, data| { + let mut ok_session = ok_output.session_with_builder(&time); + let mut err_session = err_output.session(&time); + let mut pass_session = passthrough_output.session_with_builder(&time); + for (row_ref, t_ref, r_ref) in data.borrow().into_index_iter() { + let t: S::Timestamp = Columnar::into_owned(t_ref); + let r: Diff = Columnar::into_owned(r_ref); + temp_storage.clear(); + let datums = datums.borrow_with(row_ref); + let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage)); + match key_buf.packer().try_extend(key_iter) { + Ok(()) => { + let val_datum_iter = thinning.iter().map(|c| datums[*c]); + val_buf.packer().extend(val_datum_iter); + ok_session.give(((&*key_buf, &*val_buf), &t, &r)); + } + Err(e) => { + err_session.give((e.into(), t.clone(), r)); + } + } + pass_session.give((row_ref, &t, &r)); + } + }); + } + }); + + let oks = ok_stream + .mz_arrange_core::< + _, + Col2ValBatcher<_, _, _, _>, + RowRowBuilder<_, _>, + RowRowSpine<_, _>, + >( + ExchangeCore::, _>::new_core( + columnar_exchange::, + ), + name, + ); + ( + oks, + err_stream.as_collection(), + passthrough_stream.as_collection(), + ) + } } struct PendingWork From 9f848faae20a93a3aae5b525a126df2d544d9b6a Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Mar 2026 20:16:52 +0000 Subject: [PATCH 10/11] Use copy_from instead of into_owned for columnar iteration buffers Replace Columnar::into_owned with copy_from on reusable buffers in all columnar iteration loops. This avoids allocating new Row/Timestamp/Diff values each iteration, reusing the buffer's existing allocation instead. Affected operators: ColumnarToVec, NegateColumnar, ColumnarFlatMap, FlatMapStageColumnar, FormArrangementKeyColumnar. https://claude.ai/code/session_01JHo5sTCSGPW5NavNE2b49d --- src/compute/src/render/columnar.rs | 18 +++++++++++------- src/compute/src/render/context.rs | 22 +++++++++++++--------- src/compute/src/render/flat_map.rs | 21 +++++++++++++-------- 3 files changed, 37 insertions(+), 24 deletions(-) diff --git a/src/compute/src/render/columnar.rs b/src/compute/src/render/columnar.rs index 85b9752cdad60..27043b7b75c61 100644 --- a/src/compute/src/render/columnar.rs +++ b/src/compute/src/render/columnar.rs @@ -17,6 +17,7 @@ use timely::container::CapacityContainerBuilder; use timely::dataflow::Scope; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Operator; +use timely::progress::Timestamp; use crate::typedefs::{ColumnarCollection, MzTimestamp}; @@ -68,15 +69,17 @@ where Pipeline, "ColumnarToVec", |_cap, _info| { + let mut row_buf = Row::default(); + let mut t_buf = S::Timestamp::minimum(); + let mut r_buf = Diff::default(); move |input, output| { input.for_each(|time, data| { let mut session = output.session(&time); for (d, t, r) in data.borrow().into_index_iter() { - session.give(( - Columnar::into_owned(d), - Columnar::into_owned(t), - Columnar::into_owned(r), - )); + row_buf.copy_from(d); + t_buf.copy_from(t); + r_buf.copy_from(r); + session.give((row_buf.clone(), t_buf.clone(), r_buf.clone())); } }); } @@ -102,12 +105,13 @@ where Pipeline, "NegateColumnar", |_cap, _info| { + let mut r_buf = Diff::default(); move |input, output| { input.for_each(|time, data| { let mut session = output.session_with_builder(&time); for (d, t, r) in data.borrow().into_index_iter() { - let owned_r: Diff = Columnar::into_owned(r); - let neg_r = -owned_r; + r_buf.copy_from(r); + let neg_r = -r_buf; session.give((d, t, &neg_r)); } }); diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 25349154823e1..3f19b9fb87175 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -668,16 +668,18 @@ where "ColumnarFlatMap", |_cap, _info| { let mut datums = DatumVec::new(); + let mut t_buf = S::Timestamp::minimum(); + let mut r_buf = Diff::default(); move |input, output| { input.for_each(|time, data| { let mut session = output.session(&time); for (d, t, r) in data.borrow().into_index_iter() { - let t_owned: S::Timestamp = Columnar::into_owned(t); - let r_owned: Diff = Columnar::into_owned(r); + t_buf.copy_from(t); + r_buf.copy_from(r); for item in logic( &mut datums.borrow_with_limit(d, max_demand), - t_owned, - r_owned, + t_buf.clone(), + r_buf, ) { session.give(item); } @@ -1147,6 +1149,8 @@ where let mut val_buf = Row::default(); let mut datums = DatumVec::new(); let mut temp_storage = RowArena::new(); + let mut t_buf = S::Timestamp::minimum(); + let mut r_buf = Diff::default(); move |_frontiers| { use columnar::{Columnar, Index}; let mut ok_output = ok_output.activate(); @@ -1157,8 +1161,8 @@ where let mut err_session = err_output.session(&time); let mut pass_session = passthrough_output.session_with_builder(&time); for (row_ref, t_ref, r_ref) in data.borrow().into_index_iter() { - let t: S::Timestamp = Columnar::into_owned(t_ref); - let r: Diff = Columnar::into_owned(r_ref); + t_buf.copy_from(t_ref); + r_buf.copy_from(r_ref); temp_storage.clear(); let datums = datums.borrow_with(row_ref); let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage)); @@ -1166,13 +1170,13 @@ where Ok(()) => { let val_datum_iter = thinning.iter().map(|c| datums[*c]); val_buf.packer().extend(val_datum_iter); - ok_session.give(((&*key_buf, &*val_buf), &t, &r)); + ok_session.give(((&*key_buf, &*val_buf), &t_buf, &r_buf)); } Err(e) => { - err_session.give((e.into(), t.clone(), r)); + err_session.give((e.into(), t_buf.clone(), r_buf)); } } - pass_session.give((row_ref, &t, &r)); + pass_session.give((row_ref, &t_buf, &r_buf)); } }); } diff --git a/src/compute/src/render/flat_map.rs b/src/compute/src/render/flat_map.rs index 9f15ef0beaf02..0537f3435b02d 100644 --- a/src/compute/src/render/flat_map.rs +++ b/src/compute/src/render/flat_map.rs @@ -14,13 +14,14 @@ use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL; use mz_expr::MfpPlan; use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc}; use mz_repr::{DatumVec, RowArena, SharedRow}; -use mz_repr::{Diff, Row, RowRef, Timestamp}; +use mz_repr::{Diff, Row, RowRef, Timestamp as MzTimestamp}; use mz_timely_util::operator::StreamExt; use timely::dataflow::Scope; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; use timely::dataflow::operators::generic::Session; use timely::progress::Antichain; +use timely::progress::Timestamp; use crate::render::DataflowError; use crate::render::context::{CollectionBundle, Context}; @@ -60,6 +61,8 @@ where move |_, info| { let activator = scope.activator_for(info.address); let mut queue = VecDeque::new(); + let mut t_buf = G::Timestamp::minimum(); + let mut r_buf = Diff::default(); Box::new(move |input, ok_output, err_output| { use columnar::{Columnar, Index}; let mut datums = DatumVec::new(); @@ -80,8 +83,8 @@ where let mut err_session = err_output.session_with_builder(&err_cap); for (row_ref, t_ref, r_ref) in data.borrow().into_index_iter() { - let time: G::Timestamp = Columnar::into_owned(t_ref); - let diff: Diff = Columnar::into_owned(r_ref); + t_buf.copy_from(t_ref); + r_buf.copy_from(r_ref); let temp_storage = RowArena::new(); let datums_local = datums.borrow_with(row_ref); @@ -92,14 +95,16 @@ where let args = match args { Ok(args) => args, Err(e) => { - err_session.give((e.into(), time, diff)); + err_session + .give((e.into(), t_buf.clone(), r_buf)); continue; } }; let mut extensions = match func.eval(&args, &temp_storage) { Ok(exts) => exts.fuse(), Err(e) => { - err_session.give((e.into(), time, diff)); + err_session + .give((e.into(), t_buf.clone(), r_buf)); continue; } }; @@ -109,8 +114,8 @@ where table_func_output.extend((&mut extensions).take(1023)); drain_through_mfp( row_ref, - &time, - &diff, + &t_buf, + &r_buf, &mut datums_mfp, &table_func_output, &mfp_plan, @@ -235,7 +240,7 @@ fn drain_through_mfp( datum_vec: &mut DatumVec, extensions: &[(Row, Diff)], mfp_plan: &MfpPlan, - until: &Antichain, + until: &Antichain, ok_output: &mut Session< '_, '_, From 7c85ea82b9eb6a0d0cba71a02e694e9cd525b40a Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Mar 2026 20:28:29 +0000 Subject: [PATCH 11/11] Pass timestamp and diff by reference in flat_map logic closures Change the flat_map closure signature from (DatumVecBorrow, T, Diff) to (DatumVecBorrow, &T, &Diff). This eliminates unnecessary clones in the columnar path (references to copy_from buffers are passed directly) and in the arrangement path (owned values from buffer.drain are passed by reference). Callers clone/copy only when they actually need ownership. https://claude.ai/code/session_01JHo5sTCSGPW5NavNE2b49d --- src/compute/src/render/context.rs | 24 ++++++++++++------------ src/compute/src/render/reduce.rs | 6 +++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 3f19b9fb87175..98ab5643ece15 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -308,7 +308,7 @@ where where I: IntoIterator, D: Data, - L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static, + L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, &S::Timestamp, &Diff) -> I + 'static, { // Set a number of tuples after which the operator should yield. // This allows us to remain responsive even when enumerating a substantial @@ -316,7 +316,7 @@ where let refuel = 1000000; let mut datums = DatumVec::new(); - let logic = move |k: DatumSeq, v: DatumSeq, t, d| { + let logic = move |k: DatumSeq, v: DatumSeq, t: &S::Timestamp, d: &Diff| { let mut datums_borrow = datums.borrow(); datums_borrow.extend(k.to_datum_iter().take(max_demand)); let max_demand = max_demand.saturating_sub(datums_borrow.len()); @@ -582,7 +582,7 @@ where if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) { // Decode all columns, pass max_demand as usize::MAX. let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| { - Some((SharedRow::pack(borrow.iter()), t, r)) + Some((SharedRow::pack(borrow.iter()), t.clone(), *r)) }); (ok.as_collection(), err) } else { @@ -645,7 +645,7 @@ where where I: IntoIterator, D: Data, - L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static, + L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, &S::Timestamp, &Diff) -> I + 'static, { // If `key_val` is set, we should have to use the corresponding arrangement. // If there isn't one, that implies an error in the contract between @@ -678,8 +678,8 @@ where r_buf.copy_from(r); for item in logic( &mut datums.borrow_with_limit(d, max_demand), - t_buf.clone(), - r_buf, + &t_buf, + &r_buf, ) { session.give(item); } @@ -694,7 +694,7 @@ where let (oks, errs) = self.as_vec_collection(); let mut datums = DatumVec::new(); let oks = oks.inner.flat_map(move |(v, t, d)| { - logic(&mut datums.borrow_with_limit(&v, max_demand), t, d) + logic(&mut datums.borrow_with_limit(&v, max_demand), &t, &d) }); (oks, errs) } @@ -724,7 +724,7 @@ where + 'static, I: IntoIterator, D: Data, - L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static, + L: FnMut(Tr::Key<'_>, Tr::Val<'_>, &S::Timestamp, &mz_repr::Diff) -> I + 'static, { use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB; let scope = trace.stream.scope(); @@ -854,7 +854,7 @@ where &mut datums_local, &temp_storage, event_time, - diff.clone(), + *diff, move |time| !until.less_equal(time), &mut row_builder, ) @@ -1233,7 +1233,7 @@ where ) where I: IntoIterator, D: Data, - L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static, + L: FnMut(C::Key<'_>, C::Val<'_>, &C::Time, &C::Diff) -> I + 'static, { use differential_dataflow::consolidation::consolidate; @@ -1254,7 +1254,7 @@ where }); consolidate(&mut buffer); for (time, diff) in buffer.drain(..) { - for datum in logic(key, val, time, diff) { + for datum in logic(key, val, &time, &diff) { session.give(datum); work += 1; } @@ -1274,7 +1274,7 @@ where }); consolidate(&mut buffer); for (time, diff) in buffer.drain(..) { - for datum in logic(key, val, time, diff) { + for datum in logic(key, val, &time, &diff) { session.give(datum); work += 1; } diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index dc5c215edd15d..5b807c7efa3e0 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -129,7 +129,7 @@ where key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder); let key = match key { Err(e) => { - return Some((Err(DataflowError::from(e)), time.clone(), diff.clone())); + return Some((Err(DataflowError::from(e)), time.clone(), *diff)); } Ok(Some(key)) => key.clone(), Ok(None) => panic!("Row expected as no predicate was used"), @@ -142,13 +142,13 @@ where val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder); let val = match val { Err(e) => { - return Some((Err(DataflowError::from(e)), time.clone(), diff.clone())); + return Some((Err(DataflowError::from(e)), time.clone(), *diff)); } Ok(Some(val)) => val.clone(), Ok(None) => panic!("Row expected as no predicate was used"), }; - Some((Ok((key, val)), time.clone(), diff.clone())) + Some((Ok((key, val)), time.clone(), *diff)) }, );