Skip to content

Commit 332d920

Browse files
AlpAlp
authored andcommitted
perf(zql): fuse fetch pipeline, PK fast path, and reduce allocations
1 parent a20de33 commit 332d920

File tree

1 file changed

+190
-83
lines changed

1 file changed

+190
-83
lines changed

packages/zql/src/ivm/memory-source.ts

Lines changed: 190 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import {
2222
type NoSubqueryCondition,
2323
} from '../builder/filter.ts';
2424
import {assertOrderingIncludesPK} from '../query/complete-ordering.ts';
25-
import type {Change, EditChange} from './change.ts';
25+
import type {Change} from './change.ts';
2626
import {
2727
constraintMatchesPrimaryKey,
2828
constraintMatchesRow,
@@ -39,7 +39,6 @@ import {
3939
} from './data.ts';
4040
import {filterPush} from './filter-push.ts';
4141
import {
42-
skipYields,
4342
type FetchRequest,
4443
type Input,
4544
type Output,
@@ -269,50 +268,81 @@ export class MemorySource implements Source {
269268
return [...this.#indexes.keys()];
270269
}
271270

272-
*#fetch(req: FetchRequest, conn: Connection): Stream<Node | 'yield'> {
271+
#fetch(
272+
req: FetchRequest,
273+
conn: Connection,
274+
): Iterable<Node | 'yield'> {
273275
const {sort: requestedSort, compareRows} = conn;
274-
const connectionComparator = (r1: Row, r2: Row) =>
275-
compareRows(r1, r2) * (req.reverse ? -1 : 1);
276+
const connectionComparator: Comparator = req.reverse
277+
? (r1, r2) => -compareRows(r1, r2)
278+
: compareRows;
276279

277280
const pkConstraint = conn.pkConstraint;
278281
// The primary key constraint will be more limiting than the constraint
279282
// so swap out to that if it exists.
280283
const fetchOrPkConstraint = pkConstraint ?? req.constraint;
281284

282-
// If there is a constraint, we need an index sorted by it first.
283-
const indexSort: OrderPart[] = [];
284-
if (fetchOrPkConstraint) {
285-
for (const key of Object.keys(fetchOrPkConstraint)) {
286-
indexSort.push([key, 'asc']);
285+
// Determine overlay state once
286+
const overlay = this.#overlay;
287+
const hasActiveOverlay =
288+
overlay !== undefined && conn.lastPushedEpoch >= overlay.epoch;
289+
290+
// PK fast path: direct BTree.get() for single-row constrained lookups.
291+
// When filters constrain to a single PK value and no overlay is active,
292+
// skip the entire generator pipeline and do a direct O(log n) lookup.
293+
if (pkConstraint && !hasActiveOverlay) {
294+
const row = this.#getPrimaryIndex().data.get(pkConstraint as Row);
295+
if (row !== undefined) {
296+
if (!conn.filters || conn.filters.predicate(row)) {
297+
if (!req.constraint || constraintMatchesRow(req.constraint, row)) {
298+
const start = req.start;
299+
if (
300+
!start ||
301+
(start.basis === 'at'
302+
? connectionComparator(row, start.row) >= 0
303+
: connectionComparator(row, start.row) > 0)
304+
) {
305+
return [{row, relationships: EMPTY_RELATIONSHIPS}];
306+
}
307+
}
308+
}
287309
}
310+
return [];
288311
}
289312

290-
// For the special case of constraining by PK, we don't need to worry about
291-
// any requested sort since there can only be one result. Otherwise we also
292-
// need the index sorted by the requested sort.
313+
// Standard path: index-based scan
293314
const includeRequestedSort =
294315
this.#primaryKey.length > 1 ||
295316
!fetchOrPkConstraint ||
296317
!constraintMatchesPrimaryKey(fetchOrPkConstraint, this.#primaryKey);
297-
if (includeRequestedSort) {
298-
indexSort.push(...requestedSort);
299-
}
300318

301319
let constraintShapeKey = '';
302320
if (fetchOrPkConstraint) {
303321
for (const key of Object.keys(fetchOrPkConstraint)) {
304322
constraintShapeKey += key + '|';
305323
}
306324
}
325+
326+
// If there is a constraint, we need an index sorted by it first.
327+
const indexSort: OrderPart[] = [];
328+
if (fetchOrPkConstraint) {
329+
for (const key of Object.keys(fetchOrPkConstraint)) {
330+
indexSort.push([key, 'asc']);
331+
}
332+
}
333+
334+
if (includeRequestedSort) {
335+
indexSort.push(...requestedSort);
336+
}
337+
307338
const indexCacheKey = `${constraintShapeKey}::${includeRequestedSort ? conn.requestedSortKey : 'pk'}`;
308339
let index = conn.indexCache.get(indexCacheKey);
309340
if (!index) {
310341
index = this.#getOrCreateIndex(indexSort, conn);
311342
conn.indexCache.set(indexCacheKey, index);
312343
}
344+
313345
const {data, comparator: compare} = index;
314-
const indexComparator = (r1: Row, r2: Row) =>
315-
compare(r1, r2) * (req.reverse ? -1 : 1);
316346

317347
const startAt = req.start?.row;
318348

@@ -345,41 +375,63 @@ export class MemorySource implements Source {
345375
scanStart = startAt;
346376
}
347377

348-
const rowsIterable = generateRows(data, scanStart, req.reverse);
349-
const withOverlay = generateWithOverlay(
378+
// Fused fetch paths: eliminate generator frame overhead by combining
379+
// overlay/start/constraint/filter into minimal generators.
380+
if (!hasActiveOverlay) {
381+
// No overlay: fuse all 5 generator stages into 1
382+
return generateFetchDirect(
383+
data,
384+
scanStart,
385+
req.reverse,
386+
req.start,
387+
connectionComparator,
388+
req.constraint,
389+
conn.filters?.predicate,
390+
);
391+
}
392+
393+
// Overlay active: compute overlay effects
394+
const indexComparator: Comparator = (r1, r2) =>
395+
compare(r1, r2) * (req.reverse ? -1 : 1);
396+
const overlays = computeOverlays(
350397
startAt,
351-
pkConstraint ? once(rowsIterable) : rowsIterable,
352-
// use `req.constraint` here and not `fetchOrPkConstraint` since `fetchOrPkConstraint` could be the
353-
// primary key constraint. The primary key constraint comes from filters and is acting as a filter
354-
// rather than as the fetch constraint.
355398
req.constraint,
356-
this.#overlay,
357-
conn.lastPushedEpoch,
358-
// Use indexComparator, generateWithOverlayInner has a subtle dependency
359-
// on this. Since generateWithConstraint is done after
360-
// generateWithOverlay, the generator consumed by generateWithOverlayInner
361-
// does not end when the constraint stops matching and so the final
362-
// check to yield an add overlay if not yet yielded is not reached.
363-
// However, using the indexComparator the add overlay will be less than
364-
// the first row that does not match the constraint, and so any
365-
// not yet yielded add overlay will be yielded when the first row
366-
// not matching the constraint is reached.
399+
overlay,
367400
indexComparator,
368401
conn.filters?.predicate,
369402
);
370403

371-
const withConstraint = generateWithConstraint(
372-
skipYields(
373-
generateWithStart(withOverlay, req.start, connectionComparator),
374-
),
375-
// we use `req.constraint` and not `fetchOrPkConstraint` here because we need to
376-
// AND the constraint with what could have been the primary key constraint
404+
if (overlays.add === undefined && overlays.remove === undefined) {
405+
// Overlay doesn't affect this fetch: use fused no-overlay path
406+
return generateFetchDirect(
407+
data,
408+
scanStart,
409+
req.reverse,
410+
req.start,
411+
connectionComparator,
412+
req.constraint,
413+
conn.filters?.predicate,
414+
);
415+
}
416+
417+
// Overlay has actual changes: use overlay inner + fused post-processing.
418+
// This reduces from 4 generators (overlay+start+constraint+filter) to 2.
419+
const rowsSource = data[req.reverse ? 'valuesFromReversed' : 'valuesFrom'](
420+
scanStart as Row | undefined,
421+
);
422+
const rowsIterable = pkConstraint ? once(rowsSource) : rowsSource;
423+
const overlayedNodes = generateWithOverlayInner(
424+
rowsIterable,
425+
overlays,
426+
indexComparator,
427+
);
428+
return generatePostOverlayFused(
429+
overlayedNodes,
430+
req.start,
431+
connectionComparator,
377432
req.constraint,
433+
conn.filters?.predicate,
378434
);
379-
380-
yield* conn.filters
381-
? generateWithFilter(withConstraint, conn.filters.predicate)
382-
: withConstraint;
383435
}
384436

385437
*push(change: SourceChange): Stream<'yield'> {
@@ -448,26 +500,6 @@ export class MemorySource implements Source {
448500
}
449501
}
450502

451-
function* generateWithConstraint(
452-
it: Stream<Node>,
453-
constraint: Constraint | undefined,
454-
) {
455-
for (const node of it) {
456-
if (constraint && !constraintMatchesRow(constraint, node.row)) {
457-
break;
458-
}
459-
yield node;
460-
}
461-
}
462-
463-
function* generateWithFilter(it: Stream<Node>, filter: (row: Row) => boolean) {
464-
for (const node of it) {
465-
if (filter(node.row)) {
466-
yield node;
467-
}
468-
}
469-
}
470-
471503
export function* genPushAndWriteWithSplitEdit(
472504
connections: readonly Connection[],
473505
change: SourceChange,
@@ -563,11 +595,23 @@ function* genPush(
563595
unreachable(change);
564596
}
565597

566-
const PLACEHOLDER_ROW: Row = {};
567-
const REUSE_NODE: Node = {row: PLACEHOLDER_ROW, relationships: EMPTY_RELATIONSHIPS};
568-
const REUSE_OLD_NODE: Node = {row: PLACEHOLDER_ROW, relationships: EMPTY_RELATIONSHIPS};
569-
const REUSE_ADD_REMOVE: {type: 'add' | 'remove'; node: Node} = {type: 'add', node: REUSE_NODE};
570-
const REUSE_EDIT: EditChange = {type: 'edit', oldNode: REUSE_OLD_NODE, node: REUSE_NODE};
598+
const REUSE_NODE: Node = {
599+
row: undefined as unknown as Row,
600+
relationships: EMPTY_RELATIONSHIPS,
601+
};
602+
const REUSE_OLD_NODE: Node = {
603+
row: undefined as unknown as Row,
604+
relationships: EMPTY_RELATIONSHIPS,
605+
};
606+
const REUSE_ADD_REMOVE = {
607+
type: undefined as unknown as 'add' | 'remove',
608+
node: REUSE_NODE,
609+
};
610+
const REUSE_EDIT = {
611+
type: 'edit' as const,
612+
oldNode: REUSE_OLD_NODE,
613+
node: REUSE_NODE,
614+
};
571615

572616
for (const conn of connections) {
573617
const {output, filters, input} = conn;
@@ -580,8 +624,8 @@ function* genPush(
580624
REUSE_NODE.row = change.row;
581625
outputChange = REUSE_EDIT;
582626
} else {
583-
REUSE_NODE.row = change.row;
584627
REUSE_ADD_REMOVE.type = change.type;
628+
REUSE_NODE.row = change.row;
585629
outputChange = REUSE_ADD_REMOVE;
586630
}
587631
yield* filterPush(outputChange, output, input, filters?.predicate);
@@ -645,17 +689,25 @@ export function* generateWithOverlay(
645689
compare: Comparator,
646690
filterPredicate?: (row: Row) => boolean | undefined,
647691
) {
648-
let overlayToApply: Overlay | undefined = undefined;
649-
if (overlay && lastPushedEpoch >= overlay.epoch) {
650-
overlayToApply = overlay;
692+
if (!overlay || lastPushedEpoch < overlay.epoch) {
693+
for (const row of rows) {
694+
yield {row, relationships: EMPTY_RELATIONSHIPS};
695+
}
696+
return;
651697
}
652698
const overlays = computeOverlays(
653699
startAt,
654700
constraint,
655-
overlayToApply,
701+
overlay,
656702
compare,
657703
filterPredicate,
658704
);
705+
if (overlays.add === undefined && overlays.remove === undefined) {
706+
for (const row of rows) {
707+
yield {row, relationships: EMPTY_RELATIONSHIPS};
708+
}
709+
return;
710+
}
659711
yield* generateWithOverlayInner(rows, overlays, compare);
660712
}
661713

@@ -847,18 +899,73 @@ function makeBoundComparator(sort: Ordering) {
847899
};
848900
}
849901

850-
function* generateRows(
902+
export function stringify(change: SourceChange) {
903+
return JSON.stringify(change, (_, v) =>
904+
typeof v === 'bigint' ? v.toString() : v,
905+
);
906+
}
907+
908+
// Fused fetch for no-overlay case.
909+
// Replaces the 5-generator chain (generateRows -> generateWithOverlay ->
910+
// generateWithStart -> generateWithConstraint -> generateWithFilter)
911+
// with a single generator, eliminating 4 generator frame suspend/resume costs.
912+
function* generateFetchDirect(
851913
data: BTreeSet<Row>,
852914
scanStart: RowBound | undefined,
853915
reverse: boolean | undefined,
854-
) {
855-
yield* data[reverse ? 'valuesFromReversed' : 'valuesFrom'](
916+
start: Start | undefined,
917+
connectionComparator: Comparator,
918+
constraint: Constraint | undefined,
919+
filterPredicate: ((row: Row) => boolean) | undefined,
920+
): Stream<Node> {
921+
let started = !start;
922+
for (const row of data[reverse ? 'valuesFromReversed' : 'valuesFrom'](
856923
scanStart as Row | undefined,
857-
);
924+
)) {
925+
if (!started) {
926+
const cmp = connectionComparator(row, start!.row);
927+
if (start!.basis === 'at' ? cmp >= 0 : cmp > 0) {
928+
started = true;
929+
} else {
930+
continue;
931+
}
932+
}
933+
if (constraint && !constraintMatchesRow(constraint, row)) {
934+
break;
935+
}
936+
if (filterPredicate && !filterPredicate(row)) {
937+
continue;
938+
}
939+
yield {row, relationships: EMPTY_RELATIONSHIPS};
940+
}
858941
}
859942

860-
export function stringify(change: SourceChange) {
861-
return JSON.stringify(change, (_, v) =>
862-
typeof v === 'bigint' ? v.toString() : v,
863-
);
943+
// Fused post-overlay processing.
944+
// Replaces generateWithStart + generateWithConstraint + generateWithFilter
945+
// (3 generators) with a single generator after overlay interleaving.
946+
function* generatePostOverlayFused(
947+
nodes: Iterable<Node>,
948+
start: Start | undefined,
949+
connectionComparator: Comparator,
950+
constraint: Constraint | undefined,
951+
filterPredicate: ((row: Row) => boolean) | undefined,
952+
): Stream<Node> {
953+
let started = !start;
954+
for (const node of nodes) {
955+
if (!started) {
956+
const cmp = connectionComparator(node.row, start!.row);
957+
if (start!.basis === 'at' ? cmp >= 0 : cmp > 0) {
958+
started = true;
959+
} else {
960+
continue;
961+
}
962+
}
963+
if (constraint && !constraintMatchesRow(constraint, node.row)) {
964+
break;
965+
}
966+
if (filterPredicate && !filterPredicate(node.row)) {
967+
continue;
968+
}
969+
yield node;
970+
}
864971
}

0 commit comments

Comments
 (0)