diff --git a/log-detailed.md b/log-detailed.md index 500728bbe66bc..fba0542673d3d 100644 --- a/log-detailed.md +++ b/log-detailed.md @@ -388,3 +388,118 @@ The current `DatumContainer` is already reasonably efficient (contiguous bytes, ### Issues - None. Research/design prompt only. + +## Prompt 11.1: Columnar `flat_map` — direct &RowRef processing + +### What was done +- Added a columnar path to the `flat_map` method in `CollectionBundle` (context.rs). +- When `key_val` is `None` and `columnar_collection` is present, the new path iterates the columnar container directly using a bespoke `unary` operator named `ColumnarFlatMap`. +- Each columnar item yields `(&RowRef, T::Ref, Diff::Ref)` via `into_index_iter()`. The `&RowRef` is passed directly to `datums.borrow_with_limit(d, max_demand)` — no owned `Row` is ever allocated. +- Only the timestamp and diff are converted to owned via `Columnar::into_owned` (these are cheap scalar copies). +- The existing Vec fallback is retained for bundles that lack a columnar collection (e.g., arrangement-only bundles). + +### Key decisions +- Used `StreamCore::unary` with `CapacityContainerBuilder>` as the output builder, matching the return type `StreamVec`. This avoids changing the method signature. +- The `logic` closure signature (`FnMut(&mut DatumVecBorrow, T, Diff) -> I`) is unchanged. Callers (MFP evaluate, Reduce key/value extraction) work without modification because `DatumVecBorrow` is populated from `&RowRef` the same way as from `&Row`. +- The arrangement path (`key_val` is `Some`) is unchanged — it always uses the arrangement's own flat_map. + +### Files changed +- `src/compute/src/render/context.rs` — Added columnar branch in `flat_map` method. + +### Issues +- `unary` takes ownership of the stream, requiring `.clone()` on `col_oks.inner`. Stream clones are cheap (reference-counted handles). + +## Prompt 11.2: Columnar `as_specific_collection` (identity path) + +### What was done +- Added `as_specific_columnar_collection` method on `CollectionBundle` that returns `(ColumnarCollection, VecCollection)`. +- When `key` is `None`, returns the columnar collection directly by cloning the handles — no conversion at all. +- When `key` is `Some`, delegates to `as_specific_collection` (arrangement path) and converts the result to columnar. +- Optimized `as_columnar_collection_core` to detect identity MFPs and use `as_specific_columnar_collection` directly, eliminating the columnar→Vec→columnar round-trip. + +### Key decisions +- Added a new method rather than changing `as_specific_collection`'s return type, since many callers need `VecCollection` and changing the signature would be a larger refactor. +- The identity MFP detection in `as_columnar_collection_core` mirrors the same logic in `as_collection_core` (check `mfp_plan.is_identity() && !has_key_val`). +- The arrangement path (`key` is `Some`) still converts Vec→columnar since arrangement output is inherently Vec-based. + +### Files changed +- `src/compute/src/render/context.rs` — Added `as_specific_columnar_collection` method; optimized `as_columnar_collection_core` identity path. + +### Issues +- None. + +## Prompt 11.3: Columnar `as_collection_core` (MFP path) — verification + +### What was done +- Verified that `as_columnar_collection_core` no longer needs the Vec round-trip for identity MFPs (handled by 11.2's `as_specific_columnar_collection`). +- Verified that for non-identity MFPs, `flat_map` (11.1) iterates columnar data directly via `&RowRef` without allocating owned Rows. The output remains Vec-based due to `map_fallible`'s Ok/Err split, with a final `vec_to_columnar` conversion. +- Updated the doc comment on `as_columnar_collection_core` to accurately describe the current behavior. + +### Key decisions +- No further code changes needed beyond updating documentation. The Vec→columnar conversion on the non-identity MFP output path is inherent to `map_fallible` producing Vec and cannot be avoided without rewriting the Ok/Err split to produce columnar output directly. +- The important optimization (avoiding owned Row allocation) is already achieved by 11.1's columnar `flat_map`. + +### Files changed +- `src/compute/src/render/context.rs` — Updated doc comment on `as_columnar_collection_core`. + +### Issues +- None. Verification-only prompt. + +## Prompt 11.4: Columnar Reduce input (direct) — verification + +### What was done + +- Verified that `render_reduce` calls `entered.flat_map(input_key.map(|k| (k, None)), max_demand, ...)`. +- When `input_key` is `None`, `flat_map` uses the columnar path from 11.1 — iterating `&RowRef` directly without Vec conversion. +- When `input_key` is `Some`, `flat_map` uses the arrangement path (no collection conversion needed). +- The logic closure receives `DatumVecBorrow` populated from `&RowRef` for key/value expression evaluation. No owned Row allocation. + +### Key decisions +- No code changes needed. Reduce automatically benefits from 11.1's columnar `flat_map`. + +### Files changed +- None (verification only). + +### Issues +- None. + +## Prompt 11.5: Columnar FlatMap input (direct) + +### What was done +- Added a columnar path to `render_flat_map` that uses `unary_fallible` directly on the columnar inner stream (`Column<(Row, T, Diff)>`). +- `unary_fallible` accepts `Column<...>` because `Column` implements `Container + DrainContainer + Clone + Default`. +- The inner loop iterates columnar items via `data.borrow().into_index_iter()`, yielding `(&RowRef, T::Ref, Diff::Ref)`. The `&RowRef` is passed directly to `datums.borrow_with(row_ref)` for expression evaluation and to `drain_through_mfp(row_ref, ...)` for MFP application. +- Changed `drain_through_mfp` parameter from `&Row` to `&RowRef` (transparent since `Row: Deref`). +- The queue buffers `Column<...>` containers instead of `Vec<...>` containers. +- Vec fallback retained for arrangement key paths and non-columnar bundles. + +### Key decisions +- Created a full parallel columnar path rather than trying to make the existing code generic, because the iteration patterns differ (`for (row, t, d) in data` for Vec vs `for (ref, t_ref, d_ref) in data.borrow().into_index_iter()` for columnar). +- The columnar path does not use `'input` labeled break since columnar iteration doesn't yield owned items that can be pattern-matched the same way. Uses `continue` instead. +- Output is still Vec-based (`ConsolidatingContainerBuilder>`) for the ok/err streams, with a final `vec_to_columnar` conversion. The inner table function evaluation inherently produces owned Rows. + +### Files changed +- `src/compute/src/render/flat_map.rs` — Added columnar `unary_fallible` path; changed `drain_through_mfp` to accept `&RowRef`. + +### Issues +- `col_errs` needed `.clone()` for `concat` since it's behind a shared reference. + +## Prompt 11.6: Columnar ArrangeBy input (direct) + +### What was done +- Added `arrange_columnar_collection` method that takes `ColumnarCollection` and iterates `&RowRef` directly from columnar containers for key/value expression evaluation. +- The method mirrors `arrange_collection` but: iterates via `data.borrow().into_index_iter()` yielding `(&RowRef, T::Ref, Diff::Ref)`, passes `&RowRef` to `datums.borrow_with(row_ref)`, and produces a columnar passthrough via `ColumnBuilder<(Row, S::Timestamp, Diff)>`. +- Modified `ensure_collections` to detect when identity MFP + no input_key + columnar available, and use `arrange_columnar_collection` directly instead of converting columnar→Vec via `as_collection_core`. +- The columnar path tracks a `cached_col: Option<(ColumnarCollection, VecCollection)>` through the arrangement loop, keeping the passthrough columnar throughout. +- The existing Vec fallback path is retained for non-identity MFPs and arrangement-key cases. + +### Key decisions +- Only use the columnar direct path when MFP is identity and `input_key` is None. When MFP is non-identity, `as_collection_core` → `flat_map` already uses the columnar flat_map path from 11.1. +- The passthrough stream is columnar (`ColumnBuilder<(Row, T, Diff)>`), forwarding each item individually. This is slightly less efficient than the Vec path's `give_container` (which forwards entire containers), but avoids a columnar→Vec→columnar round-trip for subsequent arrangements. +- Key/value expression evaluation produces owned `Row`s via `key_buf.packer()` / `val_buf.packer()` — this is inherent to the arrangement format and unaffected by the input representation. + +### Files changed +- `src/compute/src/render/context.rs` — Added `arrange_columnar_collection` method; modified `ensure_collections` to prefer columnar path. + +### Issues +- None. diff --git a/log.md b/log.md index b79f374bdcf95..213a58bd4d564 100644 --- a/log.md +++ b/log.md @@ -20,3 +20,9 @@ | 16 | 9.1: Audit Vec paths; remove redundant Vec from sources; add logging | Done | 2026-03-25 | | 17 | 9.2: Remove collection field and ensure_vec_collection from CollectionBundle | Done | 2026-03-25 | | 18 | 10.1: Investigate columnar arrangement spines — research complete | Done | 2026-03-25 | +| 19 | 11.1: Columnar flat_map — iterate &RowRef directly without Vec conversion | Done | 2026-03-25 | +| 20 | 11.2: Columnar as_specific_collection — identity path returns columnar directly | Done | 2026-03-25 | +| 21 | 11.3: Columnar as_collection_core — verified, identity avoids round-trip | Done | 2026-03-25 | +| 22 | 11.4: Columnar Reduce input — verified, flat_map uses &RowRef directly | Done | 2026-03-25 | +| 23 | 11.5: Columnar FlatMap — direct &RowRef processing via unary_fallible on Column | Done | 2026-03-25 | +| 24 | 11.6: Columnar ArrangeBy — arrange_columnar_collection iterates &RowRef directly | Done | 2026-03-25 | diff --git a/prompts.md b/prompts.md index 6d50d034233cd..8ddee898b0390 100644 --- a/prompts.md +++ b/prompts.md @@ -320,6 +320,106 @@ direct vectorized evaluation from arrangements without materializing collections --- +## Phase 11: Direct columnar processing (eliminate Vec escape hatches) + +The columnar `Ref<'_, Row>` is `&RowRef`, and `DatumVec::borrow_with` / `borrow_with_limit` +already accept `&RowRef`. Operators that unpack rows via `DatumVec` can iterate columnar +containers directly without materializing owned `Row` values. + +### Prompt 11.1: Columnar `flat_map` in `CollectionBundle` + +[*] The `flat_map` method in `CollectionBundle` (context.rs) is the core building block used by +MFP, Reduce, and other operators. When `key_val` is `None`, it currently calls +`as_vec_collection()` to get a `VecCollection`, then iterates `(Row, T, Diff)` tuples. + +[*] Add a columnar path: when `columnar_collection` is present and `key_val` is `None`, iterate +the columnar container directly using `into_index_iter()`. Each item yields `(&RowRef, &T, &Diff)`. +Pass `&RowRef` directly to `datums.borrow_with_limit(row_ref, max_demand)` — this already works +since `borrow_with_limit` accepts `&RowRef`. + +[*] The `logic` closure signature uses `DatumVecBorrow<'_>` which is populated from `&RowRef`, +so no changes to callers (MFP evaluate, Reduce key/value extraction) are needed. + +**Files**: `src/compute/src/render/context.rs` + +--- + +### Prompt 11.2: Columnar `as_specific_collection` (identity path) + +[*] `as_specific_collection(None)` currently calls `as_vec_collection()` which converts the +entire columnar stream to Vec. For the identity case (no arrangement key), this is pure overhead. + +[*] When the bundle has a columnar collection, return it directly as a `ColumnarCollection` +(or convert to Vec only at the caller boundary). This requires either changing the return type +to be generic over container, or providing a separate `as_specific_columnar_collection` method. + +[*] The callers that use this for identity passthrough (e.g., `as_collection_core` when MFP is +identity) should prefer the columnar variant. + +**Files**: `src/compute/src/render/context.rs` + +--- + +### Prompt 11.3: Columnar `as_collection_core` (MFP path) + +[*] `as_collection_core` calls `flat_map` (converted in 11.1) and then applies +`map_fallible` to split Ok/Err. With 11.1 done, this method already processes columnar +data without the Vec escape hatch when `key_val` is `None`. + +[*] For the identity MFP case, it currently calls `as_specific_collection` (converted in 11.2). +Wire this to return columnar directly. + +[*] Verify that `as_columnar_collection_core` no longer needs the Vec round-trip. + +**Files**: `src/compute/src/render/context.rs` + +--- + +### Prompt 11.4: Columnar Reduce input (direct) + +[*] `render_reduce` calls `flat_map` on the entered bundle for key/value extraction. With 11.1 +done, this already processes columnar data directly — the `flat_map` logic closure receives +`DatumVecBorrow` populated from `&RowRef`. + +[*] Verify that Reduce works end-to-end with columnar input without any Vec conversion. + +**Files**: `src/compute/src/render/reduce.rs` + +--- + +### Prompt 11.5: Columnar FlatMap input (direct) + +[*] `render_flat_map` calls `as_specific_collection` to get a Vec collection. With 11.2 done, +investigate whether the FlatMap inner loop can operate on columnar refs directly. + +[*] The FlatMap inner loop unpacks `input_row` via `datums.borrow_with(&input_row)` and evaluates +expressions. Since `borrow_with` accepts `&RowRef`, the loop body can work on columnar refs. +However, the `drain_through_mfp` helper also takes `&Row` — update it to accept `&RowRef`. + +[*] The FlatMap operator uses `unary_fallible` which expects a specific input container type. +Investigate whether it can accept `Column<(Row, T, Diff)>` directly or whether a columnar-aware +operator variant is needed. + +**Files**: `src/compute/src/render/flat_map.rs` + +--- + +### Prompt 11.6: Columnar ArrangeBy input (direct) + +[*] `arrange_collection` takes a `VecCollection` and iterates `(row, time, diff)` +to evaluate key/value expressions. The inner loop uses `datums.borrow_with(row)`. + +[*] Add a columnar variant `arrange_columnar_collection` that iterates `Column<(Row, T, Diff)>` +directly. Each `&RowRef` from the columnar container can be passed to `borrow_with` for +expression evaluation. The key/value `Row`s are built by `SharedRow::pack()` (owned output), +and the arrangement batcher accepts `((Row, Row), T, Diff)` — these owned outputs are unaffected. + +[*] Wire `ensure_collections` to prefer the columnar path when `columnar_collection` is present. + +**Files**: `src/compute/src/render/context.rs` + +--- + ## Notes ### Conversion cost awareness diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 63dfc80a9d282..08e36074bc690 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -378,12 +378,9 @@ pub fn build_compute_dataflow( for (id, (oks, errs)) in imported_sources.into_iter() { let oks_entered = oks.enter(region); let errs_entered = errs.enter(region); - let columnar_oks = - crate::render::columnar::vec_to_columnar(oks_entered); - let bundle = CollectionBundle::from_columnar_collections( - columnar_oks, - errs_entered, - ); + let columnar_oks = crate::render::columnar::vec_to_columnar(oks_entered); + let bundle = + CollectionBundle::from_columnar_collections(columnar_oks, errs_entered); // Associate collection bundle with the source identifier. context.insert_id(id, bundle); } @@ -492,12 +489,9 @@ pub fn build_compute_dataflow( }; let oks_entered = oks.enter_region(region); let errs_entered = errs.enter_region(region); - let columnar_oks = - crate::render::columnar::vec_to_columnar(oks_entered); - let bundle = CollectionBundle::from_columnar_collections( - columnar_oks, - errs_entered, - ); + let columnar_oks = crate::render::columnar::vec_to_columnar(oks_entered); + let bundle = + CollectionBundle::from_columnar_collections(columnar_oks, errs_entered); // Associate collection bundle with the source identifier. context.insert_id(id, bundle); } @@ -760,7 +754,10 @@ where compute_state.traces.set(idx_id, trace); } None => { - println!("columnar_collection available: {:?}", bundle.columnar_collection.is_some()); + println!( + "columnar_collection available: {:?}", + bundle.columnar_collection.is_some() + ); println!( "keys available: {:?}", bundle.arranged.keys().collect::>() @@ -856,7 +853,10 @@ where compute_state.traces.set(idx_id, trace); } None => { - println!("columnar_collection available: {:?}", bundle.columnar_collection.is_some()); + println!( + "columnar_collection available: {:?}", + bundle.columnar_collection.is_some() + ); println!( "keys available: {:?}", bundle.arranged.keys().collect::>() @@ -1219,8 +1219,7 @@ where .as_collection(); // Produce a columnar-only collection for downstream operators. - let columnar_oks = - crate::render::columnar::vec_to_columnar(ok_collection); + let columnar_oks = crate::render::columnar::vec_to_columnar(ok_collection); CollectionBundle::from_columnar_collections(columnar_oks, err_collection) } Get { id, keys, plan } => { @@ -1343,8 +1342,7 @@ where let negated = crate::render::columnar::negate_columnar(col_oks.clone()); CollectionBundle::from_columnar_collections(negated, col_errs.clone()) } else { - let (oks, errs) = - input.as_specific_collection(None, &self.config_set); + let (oks, errs) = input.as_specific_collection(None, &self.config_set); CollectionBundle::from_collections(oks.negate(), errs) } } @@ -1370,14 +1368,10 @@ where col_oks.push(oks.clone()); col_errs.push(errs.clone()); } - let col_oks = differential_dataflow::collection::concatenate( - &mut self.scope, - col_oks, - ); - let col_errs = differential_dataflow::collection::concatenate( - &mut self.scope, - col_errs, - ); + let col_oks = + differential_dataflow::collection::concatenate(&mut self.scope, col_oks); + let col_errs = + differential_dataflow::collection::concatenate(&mut self.scope, col_errs); if consolidate_output { // Consolidation requires Vec-based collections; convert, consolidate, convert back. let vec_oks = crate::render::columnar::columnar_to_vec(col_oks); @@ -1394,8 +1388,7 @@ where let mut oks = Vec::new(); let mut errs = Vec::new(); for bundle in bundles { - let (os, es) = - bundle.as_specific_collection(None, &self.config_set); + let (os, es) = bundle.as_specific_collection(None, &self.config_set); oks.push(os); errs.push(es); } diff --git a/src/compute/src/render/columnar.rs b/src/compute/src/render/columnar.rs index 4f10544a23648..27043b7b75c61 100644 --- a/src/compute/src/render/columnar.rs +++ b/src/compute/src/render/columnar.rs @@ -14,9 +14,10 @@ use differential_dataflow::{AsCollection, VecCollection}; use mz_repr::{Diff, Row}; use mz_timely_util::columnar::builder::ColumnBuilder; use timely::container::CapacityContainerBuilder; +use timely::dataflow::Scope; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Operator; -use timely::dataflow::Scope; +use timely::progress::Timestamp; use crate::typedefs::{ColumnarCollection, MzTimestamp}; @@ -68,15 +69,17 @@ where Pipeline, "ColumnarToVec", |_cap, _info| { + let mut row_buf = Row::default(); + let mut t_buf = S::Timestamp::minimum(); + let mut r_buf = Diff::default(); move |input, output| { input.for_each(|time, data| { let mut session = output.session(&time); for (d, t, r) in data.borrow().into_index_iter() { - session.give(( - Columnar::into_owned(d), - Columnar::into_owned(t), - Columnar::into_owned(r), - )); + row_buf.copy_from(d); + t_buf.copy_from(t); + r_buf.copy_from(r); + session.give((row_buf.clone(), t_buf.clone(), r_buf.clone())); } }); } @@ -102,12 +105,13 @@ where Pipeline, "NegateColumnar", |_cap, _info| { + let mut r_buf = Diff::default(); move |input, output| { input.for_each(|time, data| { let mut session = output.session_with_builder(&time); for (d, t, r) in data.borrow().into_index_iter() { - let owned_r: Diff = Columnar::into_owned(r); - let neg_r = -owned_r; + r_buf.copy_from(r); + let neg_r = -r_buf; session.give((d, t, &neg_r)); } }); @@ -126,17 +130,16 @@ mod tests { use differential_dataflow::input::Input; use mz_repr::{Datum, Diff, Row}; - use timely::dataflow::operators::probe::Probe; use timely::dataflow::operators::Inspect; + use timely::dataflow::operators::probe::Probe; /// Round-trip data through vec_to_columnar and then columnar_to_vec, /// verifying that all rows survive the conversion unchanged. #[mz_ore::test] fn round_trip_vec_columnar_vec() { timely::execute_directly(|worker| { - let results: Rc>> = - Rc::new(RefCell::new(Vec::new())); - let results_capture = results.clone(); + let results: Rc>> = Rc::new(RefCell::new(Vec::new())); + let results_capture = Rc::clone(&results); let (mut input, probe) = worker.dataflow::(|scope| { let (input, collection) = scope.new_collection::(); @@ -175,7 +178,7 @@ mod tests { let mut actual = results.borrow().clone(); actual.sort_by(|a, b| a.0.cmp(&b.0)); - let mut expected = vec![ + let mut expected = [ (row1, 0u64, one), (row2, 0u64, one), (row3, 0u64, one), @@ -184,11 +187,7 @@ mod tests { expected.sort_by(|a, b| a.0.cmp(&b.0)); assert_eq!(actual.len(), expected.len(), "Row count mismatch"); - for (a, e) in actual.iter().zip(expected.iter()) { - assert_eq!(a.0, e.0, "Row data mismatch"); - assert_eq!(a.1, e.1, "Timestamp mismatch"); - assert_eq!(a.2, e.2, "Diff mismatch"); - } + assert_eq!(actual, expected); }); } @@ -196,9 +195,8 @@ mod tests { #[mz_ore::test] fn round_trip_multiple_timestamps() { timely::execute_directly(|worker| { - let results: Rc>> = - Rc::new(RefCell::new(Vec::new())); - let results_capture = results.clone(); + let results: Rc>> = Rc::new(RefCell::new(Vec::new())); + let results_capture = Rc::clone(&results); let (mut input, probe) = worker.dataflow::(|scope| { let (input, collection) = scope.new_collection::(); @@ -241,9 +239,8 @@ mod tests { #[mz_ore::test] fn negate_columnar_flips_diffs() { timely::execute_directly(|worker| { - let results: Rc>> = - Rc::new(RefCell::new(Vec::new())); - let results_capture = results.clone(); + let results: Rc>> = Rc::new(RefCell::new(Vec::new())); + let results_capture = Rc::clone(&results); let (mut input, probe) = worker.dataflow::(|scope| { let (input, collection) = scope.new_collection::(); @@ -296,9 +293,8 @@ mod tests { #[mz_ore::test] fn union_columnar_concatenates() { timely::execute_directly(|worker| { - let results: Rc>> = - Rc::new(RefCell::new(Vec::new())); - let results_capture = results.clone(); + let results: Rc>> = Rc::new(RefCell::new(Vec::new())); + let results_capture = Rc::clone(&results); let (mut input1, mut input2, probe) = worker.dataflow::(|scope| { let (input1, collection1) = scope.new_collection::(); @@ -307,10 +303,7 @@ mod tests { // Convert both to columnar, concatenate, convert back let col1 = vec_to_columnar(collection1); let col2 = vec_to_columnar(collection2); - let union = differential_dataflow::collection::concatenate( - scope, - vec![col1, col2], - ); + let union = differential_dataflow::collection::concatenate(scope, vec![col1, col2]); let result = columnar_to_vec(union); let (probe, _stream) = result @@ -360,9 +353,8 @@ mod tests { use timely::dataflow::operators::ToStream; timely::execute_directly(|worker| { - let results: Rc>> = - Rc::new(RefCell::new(Vec::new())); - let results_capture = results.clone(); + let results: Rc>> = Rc::new(RefCell::new(Vec::new())); + let results_capture = Rc::clone(&results); let probe = worker.dataflow::(|scope| { // Simulate the Constant operator: create rows from an iterator, @@ -377,10 +369,7 @@ mod tests { let constant_data: Vec<(Row, u64, Diff)> = vec![(row1, 0, one), (row2, 0, two), (row3, 0, one)]; - let vec_collection = constant_data - .into_iter() - .to_stream(scope) - .as_collection(); + let vec_collection = constant_data.into_iter().to_stream(scope).as_collection(); let columnar = vec_to_columnar(vec_collection); let result = columnar_to_vec(columnar); @@ -408,9 +397,21 @@ mod tests { let one = Diff::from(1); let two = Diff::from(2); - assert!(actual.iter().any(|(r, t, d)| *r == row1 && *t == 0 && *d == one)); - assert!(actual.iter().any(|(r, t, d)| *r == row2 && *t == 0 && *d == two)); - assert!(actual.iter().any(|(r, t, d)| *r == row3 && *t == 0 && *d == one)); + assert!( + actual + .iter() + .any(|(r, t, d)| *r == row1 && *t == 0 && *d == one) + ); + assert!( + actual + .iter() + .any(|(r, t, d)| *r == row2 && *t == 0 && *d == two) + ); + assert!( + actual + .iter() + .any(|(r, t, d)| *r == row3 && *t == 0 && *d == one) + ); }); } } diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 34e5a59584114..98ab5643ece15 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -308,7 +308,7 @@ where where I: IntoIterator, D: Data, - L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static, + L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, &S::Timestamp, &Diff) -> I + 'static, { // Set a number of tuples after which the operator should yield. // This allows us to remain responsive even when enumerating a substantial @@ -316,7 +316,7 @@ where let refuel = 1000000; let mut datums = DatumVec::new(); - let logic = move |k: DatumSeq, v: DatumSeq, t, d| { + let logic = move |k: DatumSeq, v: DatumSeq, t: &S::Timestamp, d: &Diff| { let mut datums_borrow = datums.borrow(); datums_borrow.extend(k.to_datum_iter().take(max_demand)); let max_demand = max_demand.saturating_sub(datums_borrow.len()); @@ -451,7 +451,10 @@ where /// Returns the collection as a Vec-based collection, converting from columnar if needed. pub fn as_vec_collection( &self, - ) -> (VecCollection, VecCollection) { + ) -> ( + VecCollection, + VecCollection, + ) { if let Some((col_oks, col_errs)) = &self.columnar_collection { let vec_oks = crate::render::columnar::columnar_to_vec(col_oks.clone()); (vec_oks, col_errs.clone()) @@ -579,7 +582,7 @@ where if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) { // Decode all columns, pass max_demand as usize::MAX. let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| { - Some((SharedRow::pack(borrow.iter()), t, r)) + Some((SharedRow::pack(borrow.iter()), t.clone(), *r)) }); (ok.as_collection(), err) } else { @@ -590,6 +593,34 @@ where } } + /// Columnar variant of `as_specific_collection`. + /// + /// When `key` is `None`, returns the columnar collection directly (no conversion). + /// When `key` is `Some`, converts the arrangement to a columnar collection. + pub fn as_specific_columnar_collection( + &self, + key: Option<&[MirScalarExpr]>, + config_set: &ConfigSet, + ) -> ( + ColumnarCollection, + VecCollection, + ) { + match key { + None => { + let (col_oks, errs) = self + .columnar_collection + .as_ref() + .expect("Columnar collection doesn't exist."); + (col_oks.clone(), errs.clone()) + } + Some(_) => { + // Arrangement path: convert to Vec then columnar. + let (oks, errs) = self.as_specific_collection(key, config_set); + (crate::render::columnar::vec_to_columnar(oks), errs) + } + } + } + /// Constructs and applies logic to elements of a collection and returns the results. /// /// The function applies `logic` on elements. The logic conceptually receives @@ -614,7 +645,7 @@ where where I: IntoIterator, D: Data, - L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static, + L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, &S::Timestamp, &Diff) -> I + 'static, { // If `key_val` is set, we should have to use the corresponding arrangement. // If there isn't one, that implies an error in the contract between @@ -623,12 +654,47 @@ where self.arrangement(&key) .expect("Should have ensured during planning that this arrangement exists.") .flat_map(val.as_ref(), max_demand, logic) + } else if let Some((col_oks, errs)) = &self.columnar_collection { + // Iterate columnar container directly, avoiding the columnar→Vec conversion. + // Each item yields (&RowRef, T::Ref, Diff::Ref) which we can pass to + // borrow_with_limit without materializing an owned Row. + use columnar::{Columnar, Index}; + use timely::dataflow::operators::Operator; + let oks = col_oks + .inner + .clone() + .unary::>, _, _, _>( + Pipeline, + "ColumnarFlatMap", + |_cap, _info| { + let mut datums = DatumVec::new(); + let mut t_buf = S::Timestamp::minimum(); + let mut r_buf = Diff::default(); + move |input, output| { + input.for_each(|time, data| { + let mut session = output.session(&time); + for (d, t, r) in data.borrow().into_index_iter() { + t_buf.copy_from(t); + r_buf.copy_from(r); + for item in logic( + &mut datums.borrow_with_limit(d, max_demand), + &t_buf, + &r_buf, + ) { + session.give(item); + } + } + }); + } + }, + ); + (oks, errs.clone()) } else { use timely::dataflow::operators::vec::Map; let (oks, errs) = self.as_vec_collection(); let mut datums = DatumVec::new(); let oks = oks.inner.flat_map(move |(v, t, d)| { - logic(&mut datums.borrow_with_limit(&v, max_demand), t, d) + logic(&mut datums.borrow_with_limit(&v, max_demand), &t, &d) }); (oks, errs) } @@ -658,7 +724,7 @@ where + 'static, I: IntoIterator, D: Data, - L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static, + L: FnMut(Tr::Key<'_>, Tr::Val<'_>, &S::Timestamp, &mz_repr::Diff) -> I + 'static, { use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB; let scope = trace.stream.scope(); @@ -788,7 +854,7 @@ where &mut datums_local, &temp_storage, event_time, - diff.clone(), + *diff, move |time| !until.less_equal(time), &mut row_builder, ) @@ -822,12 +888,14 @@ where /// Columnar variant of `as_collection_core`. /// /// Applies `MapFilterProject` to the bundle and returns a columnar collection. - /// For now, this converts to Vec internally and applies the existing row-at-a-time - /// MFP evaluation, then converts the result back to columnar. Vectorized evaluation - /// will be added in a future step. + /// + /// For identity MFPs, returns the columnar collection directly (no conversion). + /// For non-identity MFPs, the `flat_map` path iterates columnar data directly via + /// `&RowRef` (no owned Row allocation), but the output is Vec-based (due to + /// `map_fallible` Ok/Err split) and converted back to columnar at the end. pub fn as_columnar_collection_core( &self, - mfp: MapFilterProject, + mut mfp: MapFilterProject, key_val: Option<(Vec, Option)>, until: Antichain, config_set: &ConfigSet, @@ -835,8 +903,19 @@ where ColumnarCollection, VecCollection, ) { - // Delegate to Vec-based as_collection_core (which converts from columnar - // internally via as_vec_collection) and convert the result back to columnar. + mfp.optimize(); + let mfp_plan = mfp.clone().into_plan().unwrap(); + + // For identity MFPs without key_val seek, return the columnar collection + // directly — no Vec round-trip needed. + let has_key_val = matches!(&key_val, Some((_key, Some(_val)))); + if mfp_plan.is_identity() && !has_key_val { + let key = key_val.map(|(k, _v)| k); + return self.as_specific_columnar_collection(key.as_deref(), config_set); + } + + // Non-identity MFP: delegate to as_collection_core (uses columnar flat_map + // from 11.1 internally) and convert the Vec result back to columnar. let (oks, errs) = self.as_collection_core(mfp, key_val, until, config_set); (crate::render::columnar::vec_to_columnar(oks), errs) } @@ -852,13 +931,6 @@ where if collections == Default::default() { return self; } - // Cache collection to avoid reforming it each time. - // - // TODO(mcsherry): In theory this could be faster run out of another arrangement, - // as the `map_fallible` that follows could be run against an arrangement itself. - // - // Note(btv): If we ever do that, we would then only need to make the raw collection here - // if `collections.raw` is true. for (key, _, _) in collections.arranged.iter() { soft_assert_or_log!( @@ -874,9 +946,59 @@ where .iter() .any(|(key, _, _)| !self.arranged.contains_key(key)); - // Materialize a Vec collection for arrangement creation. - // This is a local cache; we convert back to columnar at the end if needed. - let mut cached_vec: Option<(VecCollection, VecCollection)> = None; + // Determine if we can feed columnar input directly (identity MFP, no key). + let mfp_is_identity = { + let mut mfp = input_mfp.clone(); + mfp.optimize(); + mfp.into_plan().map_or(false, |p| p.is_identity()) + }; + let use_columnar_direct = + form_raw_collection && mfp_is_identity && input_key.is_none() && self.columnar_collection.is_some(); + + if use_columnar_direct { + let (col_oks, col_errs) = self + .columnar_collection + .as_ref() + .expect("checked above") + .clone(); + + // Track the columnar collection and errors through the arrangement loop. + let mut cached_col = Some((col_oks, col_errs)); + + for (key, _, thinning) in collections.arranged { + if !self.arranged.contains_key(&key) { + let name = format!("ArrangeBy[{:?}]", key); + let (col_oks, errs) = cached_col.take().expect("Collection constructed above"); + let (oks, errs_keyed, passthrough) = Self::arrange_columnar_collection( + &name, + col_oks, + key.clone(), + thinning.clone(), + ); + let errs_concat: KeyCollection<_, _, _> = + errs.clone().concat(errs_keyed).into(); + cached_col = Some((passthrough, errs)); + let errs = errs_concat + .mz_arrange::, ErrBuilder<_, _>, ErrSpine<_, _>>( + &format!("{}-errors", name), + ); + self.arranged + .insert(key, ArrangementFlavor::Local(oks, errs)); + } + } + if collections.raw { + if let Some((oks, errs)) = cached_col { + self.columnar_collection = Some((oks, errs)); + } + } + return self; + } + + // Fallback: materialize a Vec collection for arrangement creation. + let mut cached_vec: Option<( + VecCollection, + VecCollection, + )> = None; if form_raw_collection { cached_vec = Some(self.as_collection_core( input_mfp, @@ -887,12 +1009,9 @@ where } for (key, _, thinning) in collections.arranged { if !self.arranged.contains_key(&key) { - // TODO: Consider allowing more expressive names. let name = format!("ArrangeBy[{:?}]", key); - let (oks, errs) = cached_vec - .take() - .expect("Collection constructed above"); + let (oks, errs) = cached_vec.take().expect("Collection constructed above"); let (oks, errs_keyed, passthrough) = Self::arrange_collection(&name, oks, key.clone(), thinning.clone()); let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into(); @@ -905,7 +1024,6 @@ where .insert(key, ArrangementFlavor::Local(oks, errs)); } } - // If the raw collection was demanded, store the passthrough as columnar. if collections.raw { if let Some((oks, errs)) = cached_vec { self.columnar_collection = @@ -999,6 +1117,89 @@ where passthrough_stream.as_collection(), ) } + + /// Like `arrange_collection`, but takes columnar input and produces a columnar passthrough. + /// + /// Iterates `&RowRef` directly from the columnar container without allocating owned Rows + /// for key/value expression evaluation. + fn arrange_columnar_collection( + name: &String, + oks: ColumnarCollection, + key: Vec, + thinning: Vec, + ) -> ( + Arranged>, + VecCollection, + ColumnarCollection, + ) { + let mut builder = + OperatorBuilder::new("FormArrangementKeyColumnar".to_string(), oks.inner.scope()); + let (ok_output, ok_stream) = builder.new_output(); + let mut ok_output = + OutputBuilder::<_, ColumnBuilder<((Row, Row), S::Timestamp, Diff)>>::from(ok_output); + let (err_output, err_stream) = builder.new_output(); + let mut err_output = OutputBuilder::from(err_output); + let (passthrough_output, passthrough_stream) = builder.new_output(); + let mut passthrough_output = + OutputBuilder::<_, ColumnBuilder<(Row, S::Timestamp, Diff)>>::from(passthrough_output); + let mut input = builder.new_input(oks.inner, Pipeline); + builder.set_notify(false); + builder.build(move |_capabilities| { + let mut key_buf = Row::default(); + let mut val_buf = Row::default(); + let mut datums = DatumVec::new(); + let mut temp_storage = RowArena::new(); + let mut t_buf = S::Timestamp::minimum(); + let mut r_buf = Diff::default(); + move |_frontiers| { + use columnar::{Columnar, Index}; + let mut ok_output = ok_output.activate(); + let mut err_output = err_output.activate(); + let mut passthrough_output = passthrough_output.activate(); + input.for_each(|time, data| { + let mut ok_session = ok_output.session_with_builder(&time); + let mut err_session = err_output.session(&time); + let mut pass_session = passthrough_output.session_with_builder(&time); + for (row_ref, t_ref, r_ref) in data.borrow().into_index_iter() { + t_buf.copy_from(t_ref); + r_buf.copy_from(r_ref); + temp_storage.clear(); + let datums = datums.borrow_with(row_ref); + let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage)); + match key_buf.packer().try_extend(key_iter) { + Ok(()) => { + let val_datum_iter = thinning.iter().map(|c| datums[*c]); + val_buf.packer().extend(val_datum_iter); + ok_session.give(((&*key_buf, &*val_buf), &t_buf, &r_buf)); + } + Err(e) => { + err_session.give((e.into(), t_buf.clone(), r_buf)); + } + } + pass_session.give((row_ref, &t_buf, &r_buf)); + } + }); + } + }); + + let oks = ok_stream + .mz_arrange_core::< + _, + Col2ValBatcher<_, _, _, _>, + RowRowBuilder<_, _>, + RowRowSpine<_, _>, + >( + ExchangeCore::, _>::new_core( + columnar_exchange::, + ), + name, + ); + ( + oks, + err_stream.as_collection(), + passthrough_stream.as_collection(), + ) + } } struct PendingWork @@ -1032,7 +1233,7 @@ where ) where I: IntoIterator, D: Data, - L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static, + L: FnMut(C::Key<'_>, C::Val<'_>, &C::Time, &C::Diff) -> I + 'static, { use differential_dataflow::consolidation::consolidate; @@ -1053,7 +1254,7 @@ where }); consolidate(&mut buffer); for (time, diff) in buffer.drain(..) { - for datum in logic(key, val, time, diff) { + for datum in logic(key, val, &time, &diff) { session.give(datum); work += 1; } @@ -1073,7 +1274,7 @@ where }); consolidate(&mut buffer); for (time, diff) in buffer.drain(..) { - for datum in logic(key, val, time, diff) { + for datum in logic(key, val, &time, &diff) { session.give(datum); work += 1; } diff --git a/src/compute/src/render/flat_map.rs b/src/compute/src/render/flat_map.rs index 914bec5c52d1c..0537f3435b02d 100644 --- a/src/compute/src/render/flat_map.rs +++ b/src/compute/src/render/flat_map.rs @@ -14,13 +14,14 @@ use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL; use mz_expr::MfpPlan; use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc}; use mz_repr::{DatumVec, RowArena, SharedRow}; -use mz_repr::{Diff, Row, Timestamp}; +use mz_repr::{Diff, Row, RowRef, Timestamp as MzTimestamp}; use mz_timely_util::operator::StreamExt; use timely::dataflow::Scope; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; use timely::dataflow::operators::generic::Session; use timely::progress::Antichain; +use timely::progress::Timestamp; use crate::render::DataflowError; use crate::render::context::{CollectionBundle, Context}; @@ -41,11 +42,6 @@ where ) -> CollectionBundle { let until = self.until.clone(); let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed"); - let has_columnar = input.columnar_collection.is_some(); - let (ok_collection, err_collection) = - input.as_specific_collection(input_key.as_deref(), &self.config_set); - let stream = ok_collection.inner; - let scope = input.scope(); // Budget to limit the number of rows processed in a single invocation. // @@ -53,6 +49,108 @@ where // a batch. A `generate_series` can still cause unavailability if it generates many rows. let budget = COMPUTE_FLAT_MAP_FUEL.get(&self.config_set); + // When we have columnar input and no arrangement key, iterate the columnar + // container directly — each item yields (&RowRef, T::Ref, Diff::Ref) without + // allocating owned Rows. + if input_key.is_none() { + if let Some((col_oks, col_errs)) = &input.columnar_collection { + let scope = input.scope(); + let (oks, errs) = col_oks.inner.clone().unary_fallible( + Pipeline, + "FlatMapStageColumnar", + move |_, info| { + let activator = scope.activator_for(info.address); + let mut queue = VecDeque::new(); + let mut t_buf = G::Timestamp::minimum(); + let mut r_buf = Diff::default(); + Box::new(move |input, ok_output, err_output| { + use columnar::{Columnar, Index}; + let mut datums = DatumVec::new(); + let mut datums_mfp = DatumVec::new(); + let mut table_func_output = Vec::new(); + let mut budget = budget; + + input.for_each(|cap, data| { + queue.push_back(( + cap.retain(0), + cap.retain(1), + std::mem::take(data), + )) + }); + + while let Some((ok_cap, err_cap, data)) = queue.pop_front() { + let mut ok_session = ok_output.session_with_builder(&ok_cap); + let mut err_session = err_output.session_with_builder(&err_cap); + + for (row_ref, t_ref, r_ref) in data.borrow().into_index_iter() { + t_buf.copy_from(t_ref); + r_buf.copy_from(r_ref); + let temp_storage = RowArena::new(); + + let datums_local = datums.borrow_with(row_ref); + let args = exprs + .iter() + .map(|e| e.eval(&datums_local, &temp_storage)) + .collect::, _>>(); + let args = match args { + Ok(args) => args, + Err(e) => { + err_session + .give((e.into(), t_buf.clone(), r_buf)); + continue; + } + }; + let mut extensions = match func.eval(&args, &temp_storage) { + Ok(exts) => exts.fuse(), + Err(e) => { + err_session + .give((e.into(), t_buf.clone(), r_buf)); + continue; + } + }; + + while let Some((extension, output_diff)) = extensions.next() { + table_func_output.push((extension, output_diff)); + table_func_output.extend((&mut extensions).take(1023)); + drain_through_mfp( + row_ref, + &t_buf, + &r_buf, + &mut datums_mfp, + &table_func_output, + &mfp_plan, + &until, + &mut ok_session, + &mut err_session, + &mut budget, + ); + table_func_output.clear(); + } + } + if budget == 0 { + activator.activate(); + break; + } + } + }) + }, + ); + + use differential_dataflow::AsCollection; + let ok_collection = oks.as_collection(); + let new_err_collection = errs.as_collection(); + let err_collection = col_errs.clone().concat(new_err_collection); + let col_oks = crate::render::columnar::vec_to_columnar(ok_collection); + return CollectionBundle::from_columnar_collections(col_oks, err_collection); + } + } + + // Vec fallback: arrangement key or no columnar collection. + let (ok_collection, err_collection) = + input.as_specific_collection(input_key.as_deref(), &self.config_set); + let stream = ok_collection.inner; + let scope = input.scope(); + let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, info| { let activator = scope.activator_for(info.address); let mut queue = VecDeque::new(); @@ -101,7 +199,6 @@ where while let Some((extension, output_diff)) = extensions.next() { table_func_output.push((extension, output_diff)); table_func_output.extend((&mut extensions).take(1023)); - // We could consolidate `table_func_output`, but it seems unlikely to be productive. drain_through_mfp( &input_row, &time, @@ -129,12 +226,7 @@ where let ok_collection = oks.as_collection(); let new_err_collection = errs.as_collection(); let err_collection = err_collection.concat(new_err_collection); - if has_columnar { - let col_oks = crate::render::columnar::vec_to_columnar(ok_collection); - CollectionBundle::from_columnar_collections(col_oks, err_collection) - } else { - CollectionBundle::from_collections(ok_collection, err_collection) - } + CollectionBundle::from_collections(ok_collection, err_collection) } } @@ -142,13 +234,13 @@ where /// /// The method decodes `input_row`, and should be amortized across non-trivial `extensions`. fn drain_through_mfp( - input_row: &Row, + input_row: &RowRef, input_time: &T, input_diff: &Diff, datum_vec: &mut DatumVec, extensions: &[(Row, Diff)], mfp_plan: &MfpPlan, - until: &Antichain, + until: &Antichain, ok_output: &mut Session< '_, '_, diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index dc5c215edd15d..5b807c7efa3e0 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -129,7 +129,7 @@ where key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder); let key = match key { Err(e) => { - return Some((Err(DataflowError::from(e)), time.clone(), diff.clone())); + return Some((Err(DataflowError::from(e)), time.clone(), *diff)); } Ok(Some(key)) => key.clone(), Ok(None) => panic!("Row expected as no predicate was used"), @@ -142,13 +142,13 @@ where val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder); let val = match val { Err(e) => { - return Some((Err(DataflowError::from(e)), time.clone(), diff.clone())); + return Some((Err(DataflowError::from(e)), time.clone(), *diff)); } Ok(Some(val)) => val.clone(), Ok(None) => panic!("Row expected as no predicate was used"), }; - Some((Ok((key, val)), time.clone(), diff.clone())) + Some((Ok((key, val)), time.clone(), *diff)) }, ); diff --git a/src/compute/src/typedefs.rs b/src/compute/src/typedefs.rs index f588b0fbf87b7..d389004c66239 100644 --- a/src/compute/src/typedefs.rs +++ b/src/compute/src/typedefs.rs @@ -129,8 +129,7 @@ pub type KeyValBatcher = /// This is the columnar equivalent of `VecCollection`. Data is stored in /// `Column<(D, T, R)>` containers which provide better cache locality and enable /// future vectorized evaluation. -pub type ColumnarCollection = - Collection::Timestamp, R)>>; +pub type ColumnarCollection = Collection::Timestamp, R)>>; /// Timestamp trait for rendering, constraint to support [`MzData`] and [timely::progress::Timestamp]. pub trait MzTimestamp: