Skip to content

Commit be6b85b

Browse files
AlpAlp
authored andcommitted
perf(zql): fuse fetch pipeline, PK fast path, and reduce allocations
1 parent 49b7023 commit be6b85b

File tree

1 file changed

+194
-83
lines changed

1 file changed

+194
-83
lines changed

packages/zql/src/ivm/memory-source.ts

Lines changed: 194 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import {
2222
type NoSubqueryCondition,
2323
} from '../builder/filter.ts';
2424
import {assertOrderingIncludesPK} from '../query/complete-ordering.ts';
25-
import type {Change, EditChange} from './change.ts';
25+
import type {Change} from './change.ts';
2626
import {
2727
constraintMatchesPrimaryKey,
2828
constraintMatchesRow,
@@ -39,7 +39,6 @@ import {
3939
} from './data.ts';
4040
import {filterPush} from './filter-push.ts';
4141
import {
42-
skipYields,
4342
type FetchRequest,
4443
type Input,
4544
type Output,
@@ -275,50 +274,85 @@ export class MemorySource implements Source {
275274
return [...this.#indexes.keys()];
276275
}
277276

278-
*#fetch(req: FetchRequest, conn: Connection): Stream<Node | 'yield'> {
277+
// Non-generator: returns an Iterable directly rather than using function*.
278+
// This avoids one generator frame allocation per fetch call. The returned
279+
// iterable comes from one of the fused generator helpers below, chosen
280+
// based on whether an overlay is active and whether the PK fast path applies.
281+
#fetch(
282+
req: FetchRequest,
283+
conn: Connection,
284+
): Iterable<Node | 'yield'> {
279285
const {sort: requestedSort, compareRows} = conn;
280-
const connectionComparator = (r1: Row, r2: Row) =>
281-
compareRows(r1, r2) * (req.reverse ? -1 : 1);
286+
const connectionComparator: Comparator = req.reverse
287+
? (r1, r2) => -compareRows(r1, r2)
288+
: compareRows;
282289

283290
const pkConstraint = conn.pkConstraint;
284291
// The primary key constraint will be more limiting than the constraint
285292
// so swap out to that if it exists.
286293
const fetchOrPkConstraint = pkConstraint ?? req.constraint;
287294

288-
// If there is a constraint, we need an index sorted by it first.
289-
const indexSort: OrderPart[] = [];
290-
if (fetchOrPkConstraint) {
291-
for (const key of Object.keys(fetchOrPkConstraint)) {
292-
indexSort.push([key, 'asc']);
295+
// Determine overlay state once
296+
const overlay = this.#overlay;
297+
const hasActiveOverlay =
298+
overlay !== undefined && conn.lastPushedEpoch >= overlay.epoch;
299+
300+
// PK fast path: direct BTree.get() for single-row constrained lookups.
301+
// When filters constrain to a single PK value and no overlay is active,
302+
// skip the entire generator pipeline and do a direct O(log n) lookup.
303+
if (pkConstraint && !hasActiveOverlay) {
304+
const row = this.#getPrimaryIndex().data.get(pkConstraint as Row);
305+
if (row !== undefined) {
306+
if (!conn.filters || conn.filters.predicate(row)) {
307+
if (!req.constraint || constraintMatchesRow(req.constraint, row)) {
308+
const start = req.start;
309+
if (
310+
!start ||
311+
(start.basis === 'at'
312+
? connectionComparator(row, start.row) >= 0
313+
: connectionComparator(row, start.row) > 0)
314+
) {
315+
return [{row, relationships: EMPTY_RELATIONSHIPS}];
316+
}
317+
}
318+
}
293319
}
320+
return [];
294321
}
295322

296-
// For the special case of constraining by PK, we don't need to worry about
297-
// any requested sort since there can only be one result. Otherwise we also
298-
// need the index sorted by the requested sort.
323+
// Standard path: index-based scan
299324
const includeRequestedSort =
300325
this.#primaryKey.length > 1 ||
301326
!fetchOrPkConstraint ||
302327
!constraintMatchesPrimaryKey(fetchOrPkConstraint, this.#primaryKey);
303-
if (includeRequestedSort) {
304-
indexSort.push(...requestedSort);
305-
}
306328

307329
let constraintShapeKey = '';
308330
if (fetchOrPkConstraint) {
309331
for (const key of Object.keys(fetchOrPkConstraint)) {
310332
constraintShapeKey += key + '|';
311333
}
312334
}
335+
336+
// If there is a constraint, we need an index sorted by it first.
337+
const indexSort: OrderPart[] = [];
338+
if (fetchOrPkConstraint) {
339+
for (const key of Object.keys(fetchOrPkConstraint)) {
340+
indexSort.push([key, 'asc']);
341+
}
342+
}
343+
344+
if (includeRequestedSort) {
345+
indexSort.push(...requestedSort);
346+
}
347+
313348
const indexCacheKey = `${constraintShapeKey}::${includeRequestedSort ? conn.requestedSortKey : 'pk'}`;
314349
let index = conn.indexCache.get(indexCacheKey);
315350
if (!index) {
316351
index = this.#getOrCreateIndex(indexSort, conn);
317352
conn.indexCache.set(indexCacheKey, index);
318353
}
354+
319355
const {data, comparator: compare} = index;
320-
const indexComparator = (r1: Row, r2: Row) =>
321-
compare(r1, r2) * (req.reverse ? -1 : 1);
322356

323357
const startAt = req.start?.row;
324358

@@ -351,41 +385,63 @@ export class MemorySource implements Source {
351385
scanStart = startAt;
352386
}
353387

354-
const rowsIterable = generateRows(data, scanStart, req.reverse);
355-
const withOverlay = generateWithOverlay(
388+
// Fused fetch paths: eliminate generator frame overhead by combining
389+
// overlay/start/constraint/filter into minimal generators.
390+
if (!hasActiveOverlay) {
391+
// No overlay: fuse all 5 generator stages into 1
392+
return generateFetchDirect(
393+
data,
394+
scanStart,
395+
req.reverse,
396+
req.start,
397+
connectionComparator,
398+
req.constraint,
399+
conn.filters?.predicate,
400+
);
401+
}
402+
403+
// Overlay active: compute overlay effects
404+
const indexComparator: Comparator = (r1, r2) =>
405+
compare(r1, r2) * (req.reverse ? -1 : 1);
406+
const overlays = computeOverlays(
356407
startAt,
357-
pkConstraint ? once(rowsIterable) : rowsIterable,
358-
// use `req.constraint` here and not `fetchOrPkConstraint` since `fetchOrPkConstraint` could be the
359-
// primary key constraint. The primary key constraint comes from filters and is acting as a filter
360-
// rather than as the fetch constraint.
361408
req.constraint,
362-
this.#overlay,
363-
conn.lastPushedEpoch,
364-
// Use indexComparator, generateWithOverlayInner has a subtle dependency
365-
// on this. Since generateWithConstraint is done after
366-
// generateWithOverlay, the generator consumed by generateWithOverlayInner
367-
// does not end when the constraint stops matching and so the final
368-
// check to yield an add overlay if not yet yielded is not reached.
369-
// However, using the indexComparator the add overlay will be less than
370-
// the first row that does not match the constraint, and so any
371-
// not yet yielded add overlay will be yielded when the first row
372-
// not matching the constraint is reached.
409+
overlay,
373410
indexComparator,
374411
conn.filters?.predicate,
375412
);
376413

377-
const withConstraint = generateWithConstraint(
378-
skipYields(
379-
generateWithStart(withOverlay, req.start, connectionComparator),
380-
),
381-
// we use `req.constraint` and not `fetchOrPkConstraint` here because we need to
382-
// AND the constraint with what could have been the primary key constraint
414+
if (overlays.add === undefined && overlays.remove === undefined) {
415+
// Overlay doesn't affect this fetch: use fused no-overlay path
416+
return generateFetchDirect(
417+
data,
418+
scanStart,
419+
req.reverse,
420+
req.start,
421+
connectionComparator,
422+
req.constraint,
423+
conn.filters?.predicate,
424+
);
425+
}
426+
427+
// Overlay has actual changes: use overlay inner + fused post-processing.
428+
// This reduces from 4 generators (overlay+start+constraint+filter) to 2.
429+
const rowsSource = data[req.reverse ? 'valuesFromReversed' : 'valuesFrom'](
430+
scanStart as Row | undefined,
431+
);
432+
const rowsIterable = pkConstraint ? once(rowsSource) : rowsSource;
433+
const overlayedNodes = generateWithOverlayInner(
434+
rowsIterable,
435+
overlays,
436+
indexComparator,
437+
);
438+
return generatePostOverlayFused(
439+
overlayedNodes,
440+
req.start,
441+
connectionComparator,
383442
req.constraint,
443+
conn.filters?.predicate,
384444
);
385-
386-
yield* conn.filters
387-
? generateWithFilter(withConstraint, conn.filters.predicate)
388-
: withConstraint;
389445
}
390446

391447
*push(change: SourceChange): Stream<'yield'> {
@@ -454,26 +510,6 @@ export class MemorySource implements Source {
454510
}
455511
}
456512

457-
function* generateWithConstraint(
458-
it: Stream<Node>,
459-
constraint: Constraint | undefined,
460-
) {
461-
for (const node of it) {
462-
if (constraint && !constraintMatchesRow(constraint, node.row)) {
463-
break;
464-
}
465-
yield node;
466-
}
467-
}
468-
469-
function* generateWithFilter(it: Stream<Node>, filter: (row: Row) => boolean) {
470-
for (const node of it) {
471-
if (filter(node.row)) {
472-
yield node;
473-
}
474-
}
475-
}
476-
477513
export function* genPushAndWriteWithSplitEdit(
478514
connections: readonly Connection[],
479515
change: SourceChange,
@@ -576,11 +612,23 @@ function* genPush(
576612
// the generator chain -- yield* completes fully before the next iteration
577613
// mutates the objects. In a workload with 135 connections, this eliminates
578614
// thousands of short-lived allocations per push cycle.
579-
const placeholder: Row = {};
580-
const reuseNode: Node = {row: placeholder, relationships: EMPTY_RELATIONSHIPS};
581-
const reuseOldNode: Node = {row: placeholder, relationships: EMPTY_RELATIONSHIPS};
582-
const reuseAddRemove: {type: 'add' | 'remove'; node: Node} = {type: 'add', node: reuseNode};
583-
const reuseEdit: EditChange = {type: 'edit', oldNode: reuseOldNode, node: reuseNode};
615+
const reuseNode: Node = {
616+
row: undefined as unknown as Row,
617+
relationships: EMPTY_RELATIONSHIPS,
618+
};
619+
const reuseOldNode: Node = {
620+
row: undefined as unknown as Row,
621+
relationships: EMPTY_RELATIONSHIPS,
622+
};
623+
const reuseAddRemove = {
624+
type: undefined as unknown as 'add' | 'remove',
625+
node: reuseNode,
626+
};
627+
const reuseEdit = {
628+
type: 'edit' as const,
629+
oldNode: reuseOldNode,
630+
node: reuseNode,
631+
};
584632

585633
for (const conn of connections) {
586634
const {output, filters, input} = conn;
@@ -593,8 +641,8 @@ function* genPush(
593641
reuseNode.row = change.row;
594642
outputChange = reuseEdit;
595643
} else {
596-
reuseNode.row = change.row;
597644
reuseAddRemove.type = change.type;
645+
reuseNode.row = change.row;
598646
outputChange = reuseAddRemove;
599647
}
600648
yield* filterPush(outputChange, output, input, filters?.predicate);
@@ -658,17 +706,25 @@ export function* generateWithOverlay(
658706
compare: Comparator,
659707
filterPredicate?: (row: Row) => boolean | undefined,
660708
) {
661-
let overlayToApply: Overlay | undefined = undefined;
662-
if (overlay && lastPushedEpoch >= overlay.epoch) {
663-
overlayToApply = overlay;
709+
if (!overlay || lastPushedEpoch < overlay.epoch) {
710+
for (const row of rows) {
711+
yield {row, relationships: EMPTY_RELATIONSHIPS};
712+
}
713+
return;
664714
}
665715
const overlays = computeOverlays(
666716
startAt,
667717
constraint,
668-
overlayToApply,
718+
overlay,
669719
compare,
670720
filterPredicate,
671721
);
722+
if (overlays.add === undefined && overlays.remove === undefined) {
723+
for (const row of rows) {
724+
yield {row, relationships: EMPTY_RELATIONSHIPS};
725+
}
726+
return;
727+
}
672728
yield* generateWithOverlayInner(rows, overlays, compare);
673729
}
674730

@@ -860,18 +916,73 @@ function makeBoundComparator(sort: Ordering) {
860916
};
861917
}
862918

863-
function* generateRows(
919+
export function stringify(change: SourceChange) {
920+
return JSON.stringify(change, (_, v) =>
921+
typeof v === 'bigint' ? v.toString() : v,
922+
);
923+
}
924+
925+
// Fused fetch for no-overlay case.
926+
// Replaces the 5-generator chain (generateRows -> generateWithOverlay ->
927+
// generateWithStart -> generateWithConstraint -> generateWithFilter)
928+
// with a single generator, eliminating 4 generator frame suspend/resume costs.
929+
function* generateFetchDirect(
864930
data: BTreeSet<Row>,
865931
scanStart: RowBound | undefined,
866932
reverse: boolean | undefined,
867-
) {
868-
yield* data[reverse ? 'valuesFromReversed' : 'valuesFrom'](
933+
start: Start | undefined,
934+
connectionComparator: Comparator,
935+
constraint: Constraint | undefined,
936+
filterPredicate: ((row: Row) => boolean) | undefined,
937+
): Stream<Node> {
938+
let started = !start;
939+
for (const row of data[reverse ? 'valuesFromReversed' : 'valuesFrom'](
869940
scanStart as Row | undefined,
870-
);
941+
)) {
942+
if (!started) {
943+
const cmp = connectionComparator(row, start!.row);
944+
if (start!.basis === 'at' ? cmp >= 0 : cmp > 0) {
945+
started = true;
946+
} else {
947+
continue;
948+
}
949+
}
950+
if (constraint && !constraintMatchesRow(constraint, row)) {
951+
break;
952+
}
953+
if (filterPredicate && !filterPredicate(row)) {
954+
continue;
955+
}
956+
yield {row, relationships: EMPTY_RELATIONSHIPS};
957+
}
871958
}
872959

873-
export function stringify(change: SourceChange) {
874-
return JSON.stringify(change, (_, v) =>
875-
typeof v === 'bigint' ? v.toString() : v,
876-
);
960+
// Fused post-overlay processing.
961+
// Replaces generateWithStart + generateWithConstraint + generateWithFilter
962+
// (3 generators) with a single generator after overlay interleaving.
963+
function* generatePostOverlayFused(
964+
nodes: Iterable<Node>,
965+
start: Start | undefined,
966+
connectionComparator: Comparator,
967+
constraint: Constraint | undefined,
968+
filterPredicate: ((row: Row) => boolean) | undefined,
969+
): Stream<Node> {
970+
let started = !start;
971+
for (const node of nodes) {
972+
if (!started) {
973+
const cmp = connectionComparator(node.row, start!.row);
974+
if (start!.basis === 'at' ? cmp >= 0 : cmp > 0) {
975+
started = true;
976+
} else {
977+
continue;
978+
}
979+
}
980+
if (constraint && !constraintMatchesRow(constraint, node.row)) {
981+
break;
982+
}
983+
if (filterPredicate && !filterPredicate(node.row)) {
984+
continue;
985+
}
986+
yield node;
987+
}
877988
}

0 commit comments

Comments
 (0)