Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions log-detailed.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,118 @@ The current `DatumContainer` is already reasonably efficient (contiguous bytes,

### Issues
- None. Research/design prompt only.

## Prompt 11.1: Columnar `flat_map` — direct &RowRef processing

### What was done
- Added a columnar path to the `flat_map` method in `CollectionBundle` (context.rs).
- When `key_val` is `None` and `columnar_collection` is present, the new path iterates the columnar container directly using a bespoke `unary` operator named `ColumnarFlatMap`.
- Each columnar item yields `(&RowRef, T::Ref, Diff::Ref)` via `into_index_iter()`. The `&RowRef` is passed directly to `datums.borrow_with_limit(d, max_demand)` — no owned `Row` is ever allocated.
- Only the timestamp and diff are converted to owned via `Columnar::into_owned` (these are cheap scalar copies).
- The existing Vec fallback is retained for bundles that lack a columnar collection (e.g., arrangement-only bundles).

### Key decisions
- Used `StreamCore::unary` with `CapacityContainerBuilder<Vec<I::Item>>` as the output builder, matching the return type `StreamVec<S, I::Item>`. This avoids changing the method signature.
- The `logic` closure signature (`FnMut(&mut DatumVecBorrow, T, Diff) -> I`) is unchanged. Callers (MFP evaluate, Reduce key/value extraction) work without modification because `DatumVecBorrow` is populated from `&RowRef` the same way as from `&Row`.
- The arrangement path (`key_val` is `Some`) is unchanged — it always uses the arrangement's own flat_map.

### Files changed
- `src/compute/src/render/context.rs` — Added columnar branch in `flat_map` method.

### Issues
- `unary` takes ownership of the stream, requiring `.clone()` on `col_oks.inner`. Stream clones are cheap (reference-counted handles).

## Prompt 11.2: Columnar `as_specific_collection` (identity path)

### What was done
- Added `as_specific_columnar_collection` method on `CollectionBundle` that returns `(ColumnarCollection, VecCollection<Err>)`.
- When `key` is `None`, returns the columnar collection directly by cloning the handles — no conversion at all.
- When `key` is `Some`, delegates to `as_specific_collection` (arrangement path) and converts the result to columnar.
- Optimized `as_columnar_collection_core` to detect identity MFPs and use `as_specific_columnar_collection` directly, eliminating the columnar→Vec→columnar round-trip.

### Key decisions
- Added a new method rather than changing `as_specific_collection`'s return type, since many callers need `VecCollection` and changing the signature would be a larger refactor.
- The identity MFP detection in `as_columnar_collection_core` mirrors the same logic in `as_collection_core` (check `mfp_plan.is_identity() && !has_key_val`).
- The arrangement path (`key` is `Some`) still converts Vec→columnar since arrangement output is inherently Vec-based.

### Files changed
- `src/compute/src/render/context.rs` — Added `as_specific_columnar_collection` method; optimized `as_columnar_collection_core` identity path.

### Issues
- None.

## Prompt 11.3: Columnar `as_collection_core` (MFP path) — verification

### What was done
- Verified that `as_columnar_collection_core` no longer needs the Vec round-trip for identity MFPs (handled by 11.2's `as_specific_columnar_collection`).
- Verified that for non-identity MFPs, `flat_map` (11.1) iterates columnar data directly via `&RowRef` without allocating owned Rows. The output remains Vec-based due to `map_fallible`'s Ok/Err split, with a final `vec_to_columnar` conversion.
- Updated the doc comment on `as_columnar_collection_core` to accurately describe the current behavior.

### Key decisions
- No further code changes needed beyond updating documentation. The Vec→columnar conversion on the non-identity MFP output path is inherent to `map_fallible` producing Vec and cannot be avoided without rewriting the Ok/Err split to produce columnar output directly.
- The important optimization (avoiding owned Row allocation) is already achieved by 11.1's columnar `flat_map`.

### Files changed
- `src/compute/src/render/context.rs` — Updated doc comment on `as_columnar_collection_core`.

### Issues
- None. Verification-only prompt.

## Prompt 11.4: Columnar Reduce input (direct) — verification

### What was done

- Verified that `render_reduce` calls `entered.flat_map(input_key.map(|k| (k, None)), max_demand, ...)`.
- When `input_key` is `None`, `flat_map` uses the columnar path from 11.1 — iterating `&RowRef` directly without Vec conversion.
- When `input_key` is `Some`, `flat_map` uses the arrangement path (no collection conversion needed).
- The logic closure receives `DatumVecBorrow` populated from `&RowRef` for key/value expression evaluation. No owned Row allocation.

### Key decisions
- No code changes needed. Reduce automatically benefits from 11.1's columnar `flat_map`.

### Files changed
- None (verification only).

### Issues
- None.

## Prompt 11.5: Columnar FlatMap input (direct)

### What was done
- Added a columnar path to `render_flat_map` that uses `unary_fallible` directly on the columnar inner stream (`Column<(Row, T, Diff)>`).
- `unary_fallible` accepts `Column<...>` because `Column` implements `Container + DrainContainer + Clone + Default`.
- The inner loop iterates columnar items via `data.borrow().into_index_iter()`, yielding `(&RowRef, T::Ref, Diff::Ref)`. The `&RowRef` is passed directly to `datums.borrow_with(row_ref)` for expression evaluation and to `drain_through_mfp(row_ref, ...)` for MFP application.
- Changed `drain_through_mfp` parameter from `&Row` to `&RowRef` (transparent since `Row: Deref<Target=RowRef>`).
- The queue buffers `Column<...>` containers instead of `Vec<...>` containers.
- Vec fallback retained for arrangement key paths and non-columnar bundles.

### Key decisions
- Created a full parallel columnar path rather than trying to make the existing code generic, because the iteration patterns differ (`for (row, t, d) in data` for Vec vs `for (ref, t_ref, d_ref) in data.borrow().into_index_iter()` for columnar).
- The columnar path does not use `'input` labeled break since columnar iteration doesn't yield owned items that can be pattern-matched the same way. Uses `continue` instead.
- Output is still Vec-based (`ConsolidatingContainerBuilder<Vec<...>>`) for the ok/err streams, with a final `vec_to_columnar` conversion. The inner table function evaluation inherently produces owned Rows.

### Files changed
- `src/compute/src/render/flat_map.rs` — Added columnar `unary_fallible` path; changed `drain_through_mfp` to accept `&RowRef`.

### Issues
- `col_errs` needed `.clone()` for `concat` since it's behind a shared reference.

## Prompt 11.6: Columnar ArrangeBy input (direct)

### What was done
- Added `arrange_columnar_collection` method that takes `ColumnarCollection<S, Row, Diff>` and iterates `&RowRef` directly from columnar containers for key/value expression evaluation.
- The method mirrors `arrange_collection` but: iterates via `data.borrow().into_index_iter()` yielding `(&RowRef, T::Ref, Diff::Ref)`, passes `&RowRef` to `datums.borrow_with(row_ref)`, and produces a columnar passthrough via `ColumnBuilder<(Row, S::Timestamp, Diff)>`.
- Modified `ensure_collections` to detect when identity MFP + no input_key + columnar available, and use `arrange_columnar_collection` directly instead of converting columnar→Vec via `as_collection_core`.
- The columnar path tracks a `cached_col: Option<(ColumnarCollection, VecCollection<Err>)>` through the arrangement loop, keeping the passthrough columnar throughout.
- The existing Vec fallback path is retained for non-identity MFPs and arrangement-key cases.

### Key decisions
- Only use the columnar direct path when MFP is identity and `input_key` is None. When MFP is non-identity, `as_collection_core` → `flat_map` already uses the columnar flat_map path from 11.1.
- The passthrough stream is columnar (`ColumnBuilder<(Row, T, Diff)>`), forwarding each item individually. This is slightly less efficient than the Vec path's `give_container` (which forwards entire containers), but avoids a columnar→Vec→columnar round-trip for subsequent arrangements.
- Key/value expression evaluation produces owned `Row`s via `key_buf.packer()` / `val_buf.packer()` — this is inherent to the arrangement format and unaffected by the input representation.

### Files changed
- `src/compute/src/render/context.rs` — Added `arrange_columnar_collection` method; modified `ensure_collections` to prefer columnar path.

### Issues
- None.
6 changes: 6 additions & 0 deletions log.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@
| 16 | 9.1: Audit Vec paths; remove redundant Vec from sources; add logging | Done | 2026-03-25 |
| 17 | 9.2: Remove collection field and ensure_vec_collection from CollectionBundle | Done | 2026-03-25 |
| 18 | 10.1: Investigate columnar arrangement spines — research complete | Done | 2026-03-25 |
| 19 | 11.1: Columnar flat_map — iterate &RowRef directly without Vec conversion | Done | 2026-03-25 |
| 20 | 11.2: Columnar as_specific_collection — identity path returns columnar directly | Done | 2026-03-25 |
| 21 | 11.3: Columnar as_collection_core — verified, identity avoids round-trip | Done | 2026-03-25 |
| 22 | 11.4: Columnar Reduce input — verified, flat_map uses &RowRef directly | Done | 2026-03-25 |
| 23 | 11.5: Columnar FlatMap — direct &RowRef processing via unary_fallible on Column | Done | 2026-03-25 |
| 24 | 11.6: Columnar ArrangeBy — arrange_columnar_collection iterates &RowRef directly | Done | 2026-03-25 |
100 changes: 100 additions & 0 deletions prompts.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,106 @@ direct vectorized evaluation from arrangements without materializing collections

---

## Phase 11: Direct columnar processing (eliminate Vec escape hatches)

The columnar `Ref<'_, Row>` is `&RowRef`, and `DatumVec::borrow_with` / `borrow_with_limit`
already accept `&RowRef`. Operators that unpack rows via `DatumVec` can iterate columnar
containers directly without materializing owned `Row` values.

### Prompt 11.1: Columnar `flat_map` in `CollectionBundle`

[*] The `flat_map` method in `CollectionBundle` (context.rs) is the core building block used by
MFP, Reduce, and other operators. When `key_val` is `None`, it currently calls
`as_vec_collection()` to get a `VecCollection`, then iterates `(Row, T, Diff)` tuples.

[*] Add a columnar path: when `columnar_collection` is present and `key_val` is `None`, iterate
the columnar container directly using `into_index_iter()`. Each item yields `(&RowRef, &T, &Diff)`.
Pass `&RowRef` directly to `datums.borrow_with_limit(row_ref, max_demand)` — this already works
since `borrow_with_limit` accepts `&RowRef`.

[*] The `logic` closure signature uses `DatumVecBorrow<'_>` which is populated from `&RowRef`,
so no changes to callers (MFP evaluate, Reduce key/value extraction) are needed.

**Files**: `src/compute/src/render/context.rs`

---

### Prompt 11.2: Columnar `as_specific_collection` (identity path)

[*] `as_specific_collection(None)` currently calls `as_vec_collection()` which converts the
entire columnar stream to Vec. For the identity case (no arrangement key), this is pure overhead.

[*] When the bundle has a columnar collection, return it directly as a `ColumnarCollection`
(or convert to Vec only at the caller boundary). This requires either changing the return type
to be generic over container, or providing a separate `as_specific_columnar_collection` method.

[*] The callers that use this for identity passthrough (e.g., `as_collection_core` when MFP is
identity) should prefer the columnar variant.

**Files**: `src/compute/src/render/context.rs`

---

### Prompt 11.3: Columnar `as_collection_core` (MFP path)

[*] `as_collection_core` calls `flat_map` (converted in 11.1) and then applies
`map_fallible` to split Ok/Err. With 11.1 done, this method already processes columnar
data without the Vec escape hatch when `key_val` is `None`.

[*] For the identity MFP case, it currently calls `as_specific_collection` (converted in 11.2).
Wire this to return columnar directly.

[*] Verify that `as_columnar_collection_core` no longer needs the Vec round-trip.

**Files**: `src/compute/src/render/context.rs`

---

### Prompt 11.4: Columnar Reduce input (direct)

[*] `render_reduce` calls `flat_map` on the entered bundle for key/value extraction. With 11.1
done, this already processes columnar data directly — the `flat_map` logic closure receives
`DatumVecBorrow` populated from `&RowRef`.

[*] Verify that Reduce works end-to-end with columnar input without any Vec conversion.

**Files**: `src/compute/src/render/reduce.rs`

---

### Prompt 11.5: Columnar FlatMap input (direct)

[*] `render_flat_map` calls `as_specific_collection` to get a Vec collection. With 11.2 done,
investigate whether the FlatMap inner loop can operate on columnar refs directly.

[*] The FlatMap inner loop unpacks `input_row` via `datums.borrow_with(&input_row)` and evaluates
expressions. Since `borrow_with` accepts `&RowRef`, the loop body can work on columnar refs.
However, the `drain_through_mfp` helper also takes `&Row` — update it to accept `&RowRef`.

[*] The FlatMap operator uses `unary_fallible` which expects a specific input container type.
Investigate whether it can accept `Column<(Row, T, Diff)>` directly or whether a columnar-aware
operator variant is needed.

**Files**: `src/compute/src/render/flat_map.rs`

---

### Prompt 11.6: Columnar ArrangeBy input (direct)

[*] `arrange_collection` takes a `VecCollection<S, Row, Diff>` and iterates `(row, time, diff)`
to evaluate key/value expressions. The inner loop uses `datums.borrow_with(row)`.

[*] Add a columnar variant `arrange_columnar_collection` that iterates `Column<(Row, T, Diff)>`
directly. Each `&RowRef` from the columnar container can be passed to `borrow_with` for
expression evaluation. The key/value `Row`s are built by `SharedRow::pack()` (owned output),
and the arrangement batcher accepts `((Row, Row), T, Diff)` — these owned outputs are unaffected.

[*] Wire `ensure_collections` to prefer the columnar path when `columnar_collection` is present.

**Files**: `src/compute/src/render/context.rs`

---

## Notes

### Conversion cost awareness
Expand Down
49 changes: 21 additions & 28 deletions src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,9 @@ pub fn build_compute_dataflow<A: Allocate>(
for (id, (oks, errs)) in imported_sources.into_iter() {
let oks_entered = oks.enter(region);
let errs_entered = errs.enter(region);
let columnar_oks =
crate::render::columnar::vec_to_columnar(oks_entered);
let bundle = CollectionBundle::from_columnar_collections(
columnar_oks,
errs_entered,
);
let columnar_oks = crate::render::columnar::vec_to_columnar(oks_entered);
let bundle =
CollectionBundle::from_columnar_collections(columnar_oks, errs_entered);
// Associate collection bundle with the source identifier.
context.insert_id(id, bundle);
}
Expand Down Expand Up @@ -492,12 +489,9 @@ pub fn build_compute_dataflow<A: Allocate>(
};
let oks_entered = oks.enter_region(region);
let errs_entered = errs.enter_region(region);
let columnar_oks =
crate::render::columnar::vec_to_columnar(oks_entered);
let bundle = CollectionBundle::from_columnar_collections(
columnar_oks,
errs_entered,
);
let columnar_oks = crate::render::columnar::vec_to_columnar(oks_entered);
let bundle =
CollectionBundle::from_columnar_collections(columnar_oks, errs_entered);
// Associate collection bundle with the source identifier.
context.insert_id(id, bundle);
}
Expand Down Expand Up @@ -760,7 +754,10 @@ where
compute_state.traces.set(idx_id, trace);
}
None => {
println!("columnar_collection available: {:?}", bundle.columnar_collection.is_some());
println!(
"columnar_collection available: {:?}",
bundle.columnar_collection.is_some()
);
println!(
"keys available: {:?}",
bundle.arranged.keys().collect::<Vec<_>>()
Expand Down Expand Up @@ -856,7 +853,10 @@ where
compute_state.traces.set(idx_id, trace);
}
None => {
println!("columnar_collection available: {:?}", bundle.columnar_collection.is_some());
println!(
"columnar_collection available: {:?}",
bundle.columnar_collection.is_some()
);
println!(
"keys available: {:?}",
bundle.arranged.keys().collect::<Vec<_>>()
Expand Down Expand Up @@ -1219,8 +1219,7 @@ where
.as_collection();

// Produce a columnar-only collection for downstream operators.
let columnar_oks =
crate::render::columnar::vec_to_columnar(ok_collection);
let columnar_oks = crate::render::columnar::vec_to_columnar(ok_collection);
CollectionBundle::from_columnar_collections(columnar_oks, err_collection)
}
Get { id, keys, plan } => {
Expand Down Expand Up @@ -1343,8 +1342,7 @@ where
let negated = crate::render::columnar::negate_columnar(col_oks.clone());
CollectionBundle::from_columnar_collections(negated, col_errs.clone())
} else {
let (oks, errs) =
input.as_specific_collection(None, &self.config_set);
let (oks, errs) = input.as_specific_collection(None, &self.config_set);
CollectionBundle::from_collections(oks.negate(), errs)
}
}
Expand All @@ -1370,14 +1368,10 @@ where
col_oks.push(oks.clone());
col_errs.push(errs.clone());
}
let col_oks = differential_dataflow::collection::concatenate(
&mut self.scope,
col_oks,
);
let col_errs = differential_dataflow::collection::concatenate(
&mut self.scope,
col_errs,
);
let col_oks =
differential_dataflow::collection::concatenate(&mut self.scope, col_oks);
let col_errs =
differential_dataflow::collection::concatenate(&mut self.scope, col_errs);
if consolidate_output {
// Consolidation requires Vec-based collections; convert, consolidate, convert back.
let vec_oks = crate::render::columnar::columnar_to_vec(col_oks);
Expand All @@ -1394,8 +1388,7 @@ where
let mut oks = Vec::new();
let mut errs = Vec::new();
for bundle in bundles {
let (os, es) =
bundle.as_specific_collection(None, &self.config_set);
let (os, es) = bundle.as_specific_collection(None, &self.config_set);
oks.push(os);
errs.push(es);
}
Expand Down
Loading