Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ai/log/2026-01-07.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
title: Executor/Backend result path synced; protocol stable
updated_at: "2026-01-07"
---

- Verified protocol and dataflow match code: `ExecReady`, per-scan `Eof`, heap meta with SHM `vis_len`.
- Implemented result path: executor encodes rows via `encode_wire_tuple` into result ring; backend assembles `MinimalTuple` and stores in slot.
- ColumnLayout (`PgAttrWire`) sent during planning and used for encoding/alignment.
- Open items: apply vis bitmap in `PgScanStream`; rotate block indices for double buffering; broaden encoder type coverage.
8 changes: 8 additions & 0 deletions ai/memory/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@ In short: a PostgreSQL (pgrx) extension intercepts planning/execution and delega
- Executor (`executor/`): DataFusion planning/execution, heap requests, decode/encode results, backpressure.
- Protocol: stable binary formats/messages.
- Storage: precise heap/attribute decoder (zero‑copy where possible).

## Result Path Status

- Column layout: backend sends `ColumnLayout` with `PgAttrWire { atttypid, typmod, attlen, attalign, attbyval, nullable }` during planning; executor caches it.
- Wire tuples: executor encodes rows using `protocol::tuple::encode_wire_tuple` (header + optional null bitmap + aligned data area; byval in host‑endian; varlena as length-prefixed bytes, no TOAST/compression).
- Result ring: executor writes length‑prefixed wire tuples to the per‑connection lock‑free result ring and nudges backend (SIGUSR1).
- Backend assembly: backend reads frames, decodes wire header, reconstructs `Datum[]/isnull[]` by `attlen/attalign/attbyval`, forms `MinimalTuple` via `heap_form_minimal_tuple`, and stores into `TupleTableSlot`.
- Visibility: heap page visibility bitmap is carried out‑of‑band in SHM; currently parsed but not applied in `PgScanStream` (planned follow‑up).
28 changes: 8 additions & 20 deletions ai/memory/components/executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,14 @@
id: comp-executor-0001
type: fact
scope: executor
tags: ["datafusion", "runtime", "pgscan", "ipc", "arrow"]
updated_at: "2025-11-29"
importance: 0.8
tags: ["datafusion", "scan", "result-ring", "wire-tuple"]
updated_at: "2026-01-07"
importance: 0.7
---

# Component: Executor (DataFusion Runtime)
# Component: Executor

## Essentials

- Table source: `PgTableProvider` → physical node `PgScanExec` → `PgScanStream`.
- On `scan.execute()`, register the heap‑block receiver in a per‑connection `ScanRegistry`.
- `PgScanStream` decodes heap pages (via `storage::heap`) to Arrow, then results are encoded as wire MinimalTuple and written to the result ring.
- Partition strategy: temporarily force `target_partitions = 1` (see decision `dec-0004`).

## Public Surfaces

- `server::{parse, optimize, translate, start_data_flow, end_data_flow}` — control‑path FSM.
- `pgscan::{PgTableProvider, PgScanExec, ScanRegistry}` — sources/scans.

## Gotchas

- JOIN in multi‑partition mode emits no rows without partition‑aware reading — use single partition.
- SHM: borrowed slices must not outlive the producer’s write; do not cache references.
- Planning/exec: DataFusion with single partition; `PgTableProvider -> PgScanExec -> PgScanStream`.
- Scans: per‑connection `ScanRegistry` with bounded channels; issues `request_heap_block` and pipelines next on receipt.
- Heap: borrows page/bitmap from SHM; decodes tuples via `storage::heap::decode_tuple_project` using iterator projection; builds Arrow batches.
- Results: encodes each row via `encode_wire_tuple` and writes to per‑connection result ring; signals backend.
29 changes: 9 additions & 20 deletions ai/memory/components/postgres.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,15 @@
---
id: comp-postgres-0001
type: fact
scope: postgres
tags: ["pgrx", "executor", "TupleTableSlot", "heap", "ipc"]
updated_at: "2025-11-29"
importance: 0.8
scope: backend
tags: ["pgrx", "customscan", "heap", "shm", "result"]
updated_at: "2026-01-07"
importance: 0.7
---

# Component: Postgres (pgrx Extension)
# Component: PostgreSQL Extension (Backend)

## Responsibilities

- Drives lifecycle: Parse/Metadata/Bind/Optimize/Translate/BeginScan/ExecScan/EndScan.
- Reads heap blocks and publishes them into SHM slots + sends metadata (scan_id/slot_id/vis_len).
- Reads the result ring, decodes wire MinimalTuple frames, and fills a `TupleTableSlot`.

## Safety

- No panics; errors via `FusionError`.
- Minimize work outside PostgreSQL’s safe APIs; always unlock/release buffers/locks.

## Known Limitations

- MVCC visibility is simplified (all visible) — TODO: proper XMIN/XMAX/hints checks.
- SIGUSR1 requires a valid client PID; unsupported on non‑Unix.
- Planning: builds `TargetEntry` list and sends `ColumnLayout` for executor encoding.
- Scan lifecycle: `BeginScan` registers channels; `ExecScan` waits `ExecReady`, loops reading result ring and serving heap requests; `EndScan` closes.
- Heap path: reads `request_heap_block`, copies page + vis bitmap to SHM, sends metadata; on end, sends per‑scan `Eof`.
- Result path: reads frames from result ring, decodes wire tuple, assembles `MinimalTuple` via `heap_form_minimal_tuple`, and stores into `TupleTableSlot`.
24 changes: 7 additions & 17 deletions ai/memory/components/protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,14 @@
id: comp-protocol-0001
type: fact
scope: protocol
tags: ["wire", "tuple", "ipc", "msgpack"]
updated_at: "2025-11-29"
importance: 0.85
tags: ["ipc", "control", "data", "tuple", "bitmap"]
updated_at: "2026-01-07"
importance: 0.7
---

# Component: Protocol

## Control Packets

- Parse/Metadata/Bind/Optimize/Translate/BeginScan/ExecScan/EndScan/Explain/Failure/ExecReady.

## Data Packets

- Heap: meta with (scan_id, slot_id, table_oid, blkno, num_offsets, vis_len); page/bitmap bytes reside in SHM.
- Results: `result::write_frame(u32_len + payload)`; payload is a “WireMinimalTuple”:
- Header (nattrs, flags, hoff, bitmap_bytes, data_bytes), null bitmap (LSB‑first), aligned attributes.

## Supported Types

- boolean, int2/4/8, float4/8, utf8, date, time64(us), timestamp(us), interval month‑day‑nano.
- Alignment and attlen must match `PgAttrWire`.
- Control: `Parse/Metadata/Bind/Optimize/Translate/Explain/BeginScan/ExecScan/EndScan/ExecReady/ColumnLayout`.
- Data: `Heap` (requests + metadata for SHM page + vis bitmap `vis_len`), `Eof` per‑scan (u64 scan_id).
- Column layout: `PgAttrWire { atttypid, typmod, attlen, attalign, attbyval, nullable }` to drive result encoding.
- Tuples: `encode_wire_tuple` + `decode_wire_tuple` with header, null bitmap, and aligned data area.
25 changes: 7 additions & 18 deletions ai/memory/components/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,13 @@
id: comp-storage-0001
type: fact
scope: storage
tags: ["heap", "decoder", "scalarvalue", "varlena", "arrow"]
updated_at: "2025-11-29"
importance: 0.9
tags: ["heap", "tuple", "decoder", "varlena"]
updated_at: "2026-01-07"
importance: 0.75
---

# Component: Storage (Heap)
# Component: Storage

## Functions

- Read heap page; iterate LP_NORMAL by offsets.
- Zero‑allocation decoding: fixed‑width, date/time/timestamp/interval, inline varlena text.
- Projections: `decode_tuple_project(page_hdr, tuple, attrs, indices)`.

## Constraints & Gotchas

- Projected compressed/external varlena → error; if not projected — skip safely.
- Observe alignment when advancing across attributes (attalign).

## Tests

- `storage/pg_test`: pgrx‑based tests against a live PG; cover iteration and decoding.
- Heap page: iterators over line pointers; zero‑copy tuple slicing; sorted by offset variant for reuse.
- Decoder: `decode_tuple_project(page_hdr, tuple, attrs, indices: Iterator)`; supports fixed‑width, date/time/timestamp/interval, inline varlena text.
- Varlenas: projected compressed/external → error; non‑projected safely skipped.
29 changes: 29 additions & 0 deletions ai/memory/decisions/0001-wire-tuple-format.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
id: dec-wire-tuple-0001
type: decision
scope: protocol/executor/backend
tags: ["result", "wire-tuple", "minimal-tuple", "alignment", "varlena"]
updated_at: "2026-01-07"
importance: 0.85
---

# Decision: Result Rows as Protocol Wire Tuples

Context: We need a PG‑friendly row format across the executor↔backend boundary that avoids per‑type mapping churn and excessive allocations.

- Chosen: executor encodes rows as `protocol::tuple::encode_wire_tuple` — a protocol‑level wire format (header + optional null bitmap + aligned data area), not a literal `HeapTuple` header.
- Backend behavior: read frame → decode header/bitmap → reconstruct `Datum[]/isnull[]` using `attlen/attalign/attbyval` from `PgAttrWire` → form `MinimalTuple` via `heap_form_minimal_tuple` → `ExecStoreMinimalTuple`.

Why:
- Matches `TupleTableSlot` machinery; sidesteps per‑OID msgpack mapping and reduces copies.
- Stable across types; varlena handled uniformly without TOAST.

Format notes:
- Null bitmap: LSB‑first; bit set means NULL.
- Alignment: each attribute aligned to `attalign` (1/2/4/8) in the data area.
- Byval: 1/2/4/8‑byte little‑endian (host‑endian assumption; executor and backend co‑locate).
- Varlena: `u32 LE` length prefix + raw bytes (no compression/TOAST on wire).

Future:
- Option to switch to direct `HeapTuple` memcpy if benefits outweigh complexity.
- Expand type coverage (dates/time/interval already supported in storage decode; extend encoder/assembler accordingly).
6 changes: 5 additions & 1 deletion ai/memory/gotchas.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ id: gotchas-0001
type: gotcha
scope: repo
tags: ["joins", "partitions", "varlena", "signals"]
updated_at: "2025-11-29"
updated_at: "2025-12-07"
importance: 0.7
---

Expand All @@ -13,3 +13,7 @@ importance: 0.7
- TOAST/Compressed varlena: in projection → error; when not projected → skip, don’t crash.
- SIGUSR1: requires a valid client PID; not available on non‑Unix.
- SHM races: don’t cache borrowed slices beyond one read cycle; clamp lengths.

## Executor borrow patterns

- Caching tuple `(off,len)` pairs in `PgScanStream`: fill a self-owned `Vec<(u16,u16)>` and create the iterator inside a tight scope. Clone needed metadata (`attrs_full`, `proj_indices`) into locals and avoid using `self` while the iterator (borrowing the pairs slice) is alive. This sidesteps borrow checker conflicts between a mutable `&mut self` and an immutable borrow of `self.pairs`.
2 changes: 1 addition & 1 deletion ai/memory/index.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: Memory Bank Index
updated_at: "2025-11-29"
updated_at: "2026-01-07"
---

# Memory Bank Index
Expand Down
2 changes: 1 addition & 1 deletion executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ smallvec = { version = "1.14", features = ["const_generics", "union"] }
smol_str = "0.3"
thiserror = "2.0"
tokio = { version = "1.42", features = ["full"] }
tracing = "0.1"
tracing = { version = "0.1" }
protocol = { path = "../protocol", version = "25.0" }
common = { path = "../common", version = "25.0" }
async-trait = "0.1"
Expand Down
20 changes: 12 additions & 8 deletions executor/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,23 @@ pub async fn signal_listener(state: Arc<SharedState<'_>>) {
if state.flags[i].load(Ordering::Acquire) {
state.wakers[i].wake();
woke += 1;
if tracing::enabled!(target: "executor::ipc", tracing::Level::TRACE) {
tracing::trace!(
target = "executor::ipc",
conn_id = i,
"signal_listener: woke socket"
);
}
}
}
if woke == 0 {
if tracing::enabled!(target: "executor::ipc", tracing::Level::TRACE) {
tracing::trace!(
target = "executor::ipc",
conn_id = i,
"signal_listener: woke socket"
"signal_listener: signal received but no flags set"
);
}
}
if woke == 0 {
tracing::trace!(
target = "executor::ipc",
"signal_listener: signal received but no flags set"
);
}
}
}

Expand Down
82 changes: 39 additions & 43 deletions executor/src/pgscan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ pub struct PgScanStream {
proj_indices: Arc<Vec<usize>>,
rx: Option<mpsc::Receiver<HeapBlock>>,
conn_id: usize,
pairs: Vec<(u16, u16)>,
}

impl PgScanStream {
Expand All @@ -363,6 +364,7 @@ impl PgScanStream {
proj_indices,
rx,
conn_id,
pairs: Vec::new(),
}
}
}
Expand Down Expand Up @@ -392,57 +394,49 @@ impl Stream for PgScanStream {

// Create a HeapPage view and iterate tuples
let hp = unsafe { HeapPage::from_slice(page) };
let batch = RecordBatch::new_empty(Arc::clone(&this.proj_schema));
let Ok(hp) = hp else {
// On error decoding page, return empty batch for resilience
let batch = RecordBatch::new_empty(Arc::clone(&this.proj_schema));
return Poll::Ready(Some(Ok(batch)));
};

// Prepare per-column builders
let col_count = total_cols;
let mut builders = make_builders(&this.proj_schema, block.num_offsets as usize)
.map_err(|e| datafusion::error::DataFusionError::Execution(format!("{e}")))?;
// Use tuples_by_offset to iterate LP_NORMAL tuples in page order
let mut pairs: Vec<(u16, u16)> = Vec::new();
// Pre-scan to populate pairs and log LP_NORMAL count
{
let _ = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut pairs);
}
let pairs_len = pairs.len();
// Create iterator borrowing the filled pairs slice
let it = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut pairs);
tracing::trace!(
target = "executor::server",
blkno = block.blkno,
num_offsets = block.num_offsets,
lp_normal = pairs_len,
"pgscan: tuples_by_offset summary"
);
// Take local clones of shared metadata to avoid borrowing `this` during tuple iteration
let proj_indices = Arc::clone(&this.proj_indices);
let attrs_full = Arc::clone(&this.attrs_full);
let page_hdr = unsafe { &*(page.as_ptr() as *const pg_sys::PageHeaderData) }
as *const pg_sys::PageHeaderData;
let mut decoded_rows = 0usize;
for tup in it {
// Decode projected columns for tuple using iterator over requested projection
let iter = unsafe {
decode_tuple_project(
page_hdr,
tup,
&this.attrs_full,
this.proj_indices.iter().copied(),
)
};
let Ok(mut iter) = iter else {
continue;
};
// Iterate over projected columns in order
for b in builders.iter_mut().take(total_cols) {
match iter.next() {
Some(Ok(v)) => append_scalar(b, v),
Some(Err(_e)) => append_null(b),
None => append_null(b),
// Limit the borrow of `this.pairs` to this inner scope to satisfy borrow checker
{
// Populate pairs once and create iterator borrowing the filled pairs slice
let it = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut this.pairs);
for tup in it {
// Decode projected columns for tuple using iterator over requested projection
let iter = unsafe {
decode_tuple_project(
page_hdr,
tup,
&attrs_full,
proj_indices.iter().copied(),
)
};
let Ok(mut iter) = iter else {
continue;
};
// Iterate over projected columns in order
for b in builders.iter_mut().take(total_cols) {
match iter.next() {
Some(Ok(v)) => append_scalar(b, v),
Some(Err(_e)) => append_null(b),
None => append_null(b),
}
}
decoded_rows += 1;
}
decoded_rows += 1;
}

// Build Arrow arrays and RecordBatch
Expand All @@ -462,12 +456,14 @@ impl Stream for PgScanStream {
datafusion::error::DataFusionError::Execution(format!("{e}"))
})?
};
tracing::trace!(
target = "executor::server",
rows = decoded_rows,
blkno = block.blkno,
"pgscan: decoded rows"
);
if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) {
tracing::trace!(
target = "executor::server",
rows = decoded_rows,
blkno = block.blkno,
"pgscan: decoded rows"
);
}
Poll::Ready(Some(Ok(rb)))
}
Poll::Ready(None) => Poll::Ready(None),
Expand Down
Loading
Loading