Skip to content

Commit d35cb73

Browse files
AlpAlp
authored andcommitted
perf(zql): fuse fetch pipeline, PK fast path, and reduce allocations
1 parent eac60c0 commit d35cb73

File tree

1 file changed

+193
-83
lines changed

1 file changed

+193
-83
lines changed

packages/zql/src/ivm/memory-source.ts

Lines changed: 193 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import {
2222
type NoSubqueryCondition,
2323
} from '../builder/filter.ts';
2424
import {assertOrderingIncludesPK} from '../query/complete-ordering.ts';
25-
import type {Change, EditChange} from './change.ts';
25+
import type {Change} from './change.ts';
2626
import {
2727
constraintMatchesPrimaryKey,
2828
constraintMatchesRow,
@@ -39,7 +39,6 @@ import {
3939
} from './data.ts';
4040
import {filterPush} from './filter-push.ts';
4141
import {
42-
skipYields,
4342
type FetchRequest,
4443
type Input,
4544
type Output,
@@ -278,50 +277,84 @@ export class MemorySource implements Source {
278277
return [...this.#indexes.keys()];
279278
}
280279

281-
*#fetch(req: FetchRequest, conn: Connection): Stream<Node | 'yield'> {
280+
// Returns an iterable instead of being a generator (*#fetch). Callers
281+
// consume via for...of which works identically. Removing the generator
282+
// eliminates one suspend/resume frame per iteration.
283+
#fetch(
284+
req: FetchRequest,
285+
conn: Connection,
286+
): Iterable<Node | 'yield'> {
282287
const {sort: requestedSort, compareRows} = conn;
283-
const connectionComparator = (r1: Row, r2: Row) =>
284-
compareRows(r1, r2) * (req.reverse ? -1 : 1);
288+
const connectionComparator: Comparator = req.reverse
289+
? (r1, r2) => -compareRows(r1, r2)
290+
: compareRows;
285291

286292
const pkConstraint = conn.pkConstraint;
287293
// The primary key constraint will be more limiting than the constraint
288294
// so swap out to that if it exists.
289295
const fetchOrPkConstraint = pkConstraint ?? req.constraint;
290296

291-
// If there is a constraint, we need an index sorted by it first.
292-
const indexSort: OrderPart[] = [];
293-
if (fetchOrPkConstraint) {
294-
for (const key of Object.keys(fetchOrPkConstraint)) {
295-
indexSort.push([key, 'asc']);
297+
// Determine overlay state once
298+
const overlay = this.#overlay;
299+
const hasActiveOverlay =
300+
overlay !== undefined && conn.lastPushedEpoch >= overlay.epoch;
301+
302+
// PK fast path: when filters fully constrain to a single primary key
303+
// and no overlay is active, skip the entire generator pipeline and do
304+
// a direct O(log n) BTree lookup. Returns 0 or 1 results.
305+
if (pkConstraint && !hasActiveOverlay) {
306+
const row = this.#getPrimaryIndex().data.get(pkConstraint as Row);
307+
if (row !== undefined) {
308+
if (!conn.filters || conn.filters.predicate(row)) {
309+
if (!req.constraint || constraintMatchesRow(req.constraint, row)) {
310+
const start = req.start;
311+
if (
312+
!start ||
313+
(start.basis === 'at'
314+
? connectionComparator(row, start.row) >= 0
315+
: connectionComparator(row, start.row) > 0)
316+
) {
317+
return [{row, relationships: EMPTY_RELATIONSHIPS}];
318+
}
319+
}
320+
}
296321
}
322+
return [];
297323
}
298324

299-
// For the special case of constraining by PK, we don't need to worry about
300-
// any requested sort since there can only be one result. Otherwise we also
301-
// need the index sorted by the requested sort.
325+
// Standard path: index-based scan
302326
const includeRequestedSort =
303327
this.#primaryKey.length > 1 ||
304328
!fetchOrPkConstraint ||
305329
!constraintMatchesPrimaryKey(fetchOrPkConstraint, this.#primaryKey);
306-
if (includeRequestedSort) {
307-
indexSort.push(...requestedSort);
308-
}
309330

310331
let constraintShapeKey = '';
311332
if (fetchOrPkConstraint) {
312333
for (const key of Object.keys(fetchOrPkConstraint)) {
313334
constraintShapeKey += key + '|';
314335
}
315336
}
337+
338+
// If there is a constraint, we need an index sorted by it first.
339+
const indexSort: OrderPart[] = [];
340+
if (fetchOrPkConstraint) {
341+
for (const key of Object.keys(fetchOrPkConstraint)) {
342+
indexSort.push([key, 'asc']);
343+
}
344+
}
345+
346+
if (includeRequestedSort) {
347+
indexSort.push(...requestedSort);
348+
}
349+
316350
const indexCacheKey = `${constraintShapeKey}::${includeRequestedSort ? conn.requestedSortKey : 'pk'}`;
317351
let index = conn.indexCache.get(indexCacheKey);
318352
if (!index) {
319353
index = this.#getOrCreateIndex(indexSort, conn);
320354
conn.indexCache.set(indexCacheKey, index);
321355
}
356+
322357
const {data, comparator: compare} = index;
323-
const indexComparator = (r1: Row, r2: Row) =>
324-
compare(r1, r2) * (req.reverse ? -1 : 1);
325358

326359
const startAt = req.start?.row;
327360

@@ -354,41 +387,63 @@ export class MemorySource implements Source {
354387
scanStart = startAt;
355388
}
356389

357-
const rowsIterable = generateRows(data, scanStart, req.reverse);
358-
const withOverlay = generateWithOverlay(
390+
// Fused fetch paths: eliminate generator frame overhead by combining
391+
// overlay/start/constraint/filter into minimal generators.
392+
if (!hasActiveOverlay) {
393+
// No overlay: fuse all 5 generator stages into 1
394+
return generateFetchDirect(
395+
data,
396+
scanStart,
397+
req.reverse,
398+
req.start,
399+
connectionComparator,
400+
req.constraint,
401+
conn.filters?.predicate,
402+
);
403+
}
404+
405+
// Overlay active: compute overlay effects
406+
const indexComparator: Comparator = (r1, r2) =>
407+
compare(r1, r2) * (req.reverse ? -1 : 1);
408+
const overlays = computeOverlays(
359409
startAt,
360-
pkConstraint ? once(rowsIterable) : rowsIterable,
361-
// use `req.constraint` here and not `fetchOrPkConstraint` since `fetchOrPkConstraint` could be the
362-
// primary key constraint. The primary key constraint comes from filters and is acting as a filter
363-
// rather than as the fetch constraint.
364410
req.constraint,
365-
this.#overlay,
366-
conn.lastPushedEpoch,
367-
// Use indexComparator, generateWithOverlayInner has a subtle dependency
368-
// on this. Since generateWithConstraint is done after
369-
// generateWithOverlay, the generator consumed by generateWithOverlayInner
370-
// does not end when the constraint stops matching and so the final
371-
// check to yield an add overlay if not yet yielded is not reached.
372-
// However, using the indexComparator the add overlay will be less than
373-
// the first row that does not match the constraint, and so any
374-
// not yet yielded add overlay will be yielded when the first row
375-
// not matching the constraint is reached.
411+
overlay,
376412
indexComparator,
377413
conn.filters?.predicate,
378414
);
379415

380-
const withConstraint = generateWithConstraint(
381-
skipYields(
382-
generateWithStart(withOverlay, req.start, connectionComparator),
383-
),
384-
// we use `req.constraint` and not `fetchOrPkConstraint` here because we need to
385-
// AND the constraint with what could have been the primary key constraint
416+
if (overlays.add === undefined && overlays.remove === undefined) {
417+
// Overlay doesn't affect this fetch: use fused no-overlay path
418+
return generateFetchDirect(
419+
data,
420+
scanStart,
421+
req.reverse,
422+
req.start,
423+
connectionComparator,
424+
req.constraint,
425+
conn.filters?.predicate,
426+
);
427+
}
428+
429+
// Overlay has actual changes: use overlay inner + fused post-processing.
430+
// This reduces from 4 generators (overlay+start+constraint+filter) to 2.
431+
const rowsSource = data[req.reverse ? 'valuesFromReversed' : 'valuesFrom'](
432+
scanStart as Row | undefined,
433+
);
434+
const rowsIterable = pkConstraint ? once(rowsSource) : rowsSource;
435+
const overlayedNodes = generateWithOverlayInner(
436+
rowsIterable,
437+
overlays,
438+
indexComparator,
439+
);
440+
return generatePostOverlayFused(
441+
overlayedNodes,
442+
req.start,
443+
connectionComparator,
386444
req.constraint,
445+
conn.filters?.predicate,
387446
);
388-
389-
yield* conn.filters
390-
? generateWithFilter(withConstraint, conn.filters.predicate)
391-
: withConstraint;
392447
}
393448

394449
*push(change: SourceChange): Stream<'yield'> {
@@ -457,26 +512,6 @@ export class MemorySource implements Source {
457512
}
458513
}
459514

460-
function* generateWithConstraint(
461-
it: Stream<Node>,
462-
constraint: Constraint | undefined,
463-
) {
464-
for (const node of it) {
465-
if (constraint && !constraintMatchesRow(constraint, node.row)) {
466-
break;
467-
}
468-
yield node;
469-
}
470-
}
471-
472-
function* generateWithFilter(it: Stream<Node>, filter: (row: Row) => boolean) {
473-
for (const node of it) {
474-
if (filter(node.row)) {
475-
yield node;
476-
}
477-
}
478-
}
479-
480515
export function* genPushAndWriteWithSplitEdit(
481516
connections: readonly Connection[],
482517
change: SourceChange,
@@ -579,11 +614,23 @@ function* genPush(
579614
// the generator chain -- yield* completes fully before the next iteration
580615
// mutates the objects. In a workload with 135 connections, this eliminates
581616
// thousands of short-lived allocations per push cycle.
582-
const placeholder: Row = {};
583-
const reuseNode: Node = {row: placeholder, relationships: EMPTY_RELATIONSHIPS};
584-
const reuseOldNode: Node = {row: placeholder, relationships: EMPTY_RELATIONSHIPS};
585-
const reuseAddRemove: {type: 'add' | 'remove'; node: Node} = {type: 'add', node: reuseNode};
586-
const reuseEdit: EditChange = {type: 'edit', oldNode: reuseOldNode, node: reuseNode};
617+
const reuseNode: Node = {
618+
row: undefined as unknown as Row,
619+
relationships: EMPTY_RELATIONSHIPS,
620+
};
621+
const reuseOldNode: Node = {
622+
row: undefined as unknown as Row,
623+
relationships: EMPTY_RELATIONSHIPS,
624+
};
625+
const reuseAddRemove = {
626+
type: undefined as unknown as 'add' | 'remove',
627+
node: reuseNode,
628+
};
629+
const reuseEdit = {
630+
type: 'edit' as const,
631+
oldNode: reuseOldNode,
632+
node: reuseNode,
633+
};
587634

588635
for (const conn of connections) {
589636
const {output, filters, input} = conn;
@@ -596,8 +643,8 @@ function* genPush(
596643
reuseNode.row = change.row;
597644
outputChange = reuseEdit;
598645
} else {
599-
reuseNode.row = change.row;
600646
reuseAddRemove.type = change.type;
647+
reuseNode.row = change.row;
601648
outputChange = reuseAddRemove;
602649
}
603650
yield* filterPush(outputChange, output, input, filters?.predicate);
@@ -661,17 +708,25 @@ export function* generateWithOverlay(
661708
compare: Comparator,
662709
filterPredicate?: (row: Row) => boolean | undefined,
663710
) {
664-
let overlayToApply: Overlay | undefined = undefined;
665-
if (overlay && lastPushedEpoch >= overlay.epoch) {
666-
overlayToApply = overlay;
711+
if (!overlay || lastPushedEpoch < overlay.epoch) {
712+
for (const row of rows) {
713+
yield {row, relationships: EMPTY_RELATIONSHIPS};
714+
}
715+
return;
667716
}
668717
const overlays = computeOverlays(
669718
startAt,
670719
constraint,
671-
overlayToApply,
720+
overlay,
672721
compare,
673722
filterPredicate,
674723
);
724+
if (overlays.add === undefined && overlays.remove === undefined) {
725+
for (const row of rows) {
726+
yield {row, relationships: EMPTY_RELATIONSHIPS};
727+
}
728+
return;
729+
}
675730
yield* generateWithOverlayInner(rows, overlays, compare);
676731
}
677732

@@ -863,18 +918,73 @@ function makeBoundComparator(sort: Ordering) {
863918
};
864919
}
865920

866-
function* generateRows(
921+
export function stringify(change: SourceChange) {
922+
return JSON.stringify(change, (_, v) =>
923+
typeof v === 'bigint' ? v.toString() : v,
924+
);
925+
}
926+
927+
// Fused fetch for the no-overlay case. Replaces the 5-generator chain
928+
// (generateRows -> generateWithOverlay -> generateWithStart ->
929+
// generateWithConstraint -> generateWithFilter) with a single generator,
930+
// eliminating 4 generator frame suspend/resume costs per row.
931+
function* generateFetchDirect(
867932
data: BTreeSet<Row>,
868933
scanStart: RowBound | undefined,
869934
reverse: boolean | undefined,
870-
) {
871-
yield* data[reverse ? 'valuesFromReversed' : 'valuesFrom'](
935+
start: Start | undefined,
936+
connectionComparator: Comparator,
937+
constraint: Constraint | undefined,
938+
filterPredicate: ((row: Row) => boolean) | undefined,
939+
): Stream<Node> {
940+
let started = !start;
941+
for (const row of data[reverse ? 'valuesFromReversed' : 'valuesFrom'](
872942
scanStart as Row | undefined,
873-
);
943+
)) {
944+
if (!started) {
945+
const cmp = connectionComparator(row, start!.row);
946+
if (start!.basis === 'at' ? cmp >= 0 : cmp > 0) {
947+
started = true;
948+
} else {
949+
continue;
950+
}
951+
}
952+
if (constraint && !constraintMatchesRow(constraint, row)) {
953+
break;
954+
}
955+
if (filterPredicate && !filterPredicate(row)) {
956+
continue;
957+
}
958+
yield {row, relationships: EMPTY_RELATIONSHIPS};
959+
}
874960
}
875961

876-
export function stringify(change: SourceChange) {
877-
return JSON.stringify(change, (_, v) =>
878-
typeof v === 'bigint' ? v.toString() : v,
879-
);
962+
// Fused post-overlay processing. After overlay interleaving, replaces
963+
// 3 generators (generateWithStart + generateWithConstraint +
964+
// generateWithFilter) with a single generator.
965+
function* generatePostOverlayFused(
966+
nodes: Iterable<Node>,
967+
start: Start | undefined,
968+
connectionComparator: Comparator,
969+
constraint: Constraint | undefined,
970+
filterPredicate: ((row: Row) => boolean) | undefined,
971+
): Stream<Node> {
972+
let started = !start;
973+
for (const node of nodes) {
974+
if (!started) {
975+
const cmp = connectionComparator(node.row, start!.row);
976+
if (start!.basis === 'at' ? cmp >= 0 : cmp > 0) {
977+
started = true;
978+
} else {
979+
continue;
980+
}
981+
}
982+
if (constraint && !constraintMatchesRow(constraint, node.row)) {
983+
break;
984+
}
985+
if (filterPredicate && !filterPredicate(node.row)) {
986+
continue;
987+
}
988+
yield node;
989+
}
880990
}

0 commit comments

Comments
 (0)