Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 65 additions & 34 deletions packages/zql/src/ivm/memory-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
type NoSubqueryCondition,
} from '../builder/filter.ts';
import {assertOrderingIncludesPK} from '../query/complete-ordering.ts';
import type {Change} from './change.ts';
import type {Change, EditChange} from './change.ts';
import {
constraintMatchesPrimaryKey,
constraintMatchesRow,
Expand Down Expand Up @@ -50,6 +50,10 @@ import type {
} from './source.ts';
import type {Stream} from './stream.ts';

// Shared frozen sentinel for nodes with no relationships. Avoids allocating
// a fresh {} on every node creation in the fetch and push hot paths.
const EMPTY_RELATIONSHIPS: Record<string, never> = Object.freeze({});

export type Overlay = {
epoch: number;
change: SourceChange;
Expand Down Expand Up @@ -80,6 +84,12 @@ export type Connection = {
| undefined;
readonly debug?: DebugDelegate | undefined;
lastPushedEpoch: number;
/** Pre-computed on connect so #fetch avoids re-deriving it every call. */
pkConstraint: Constraint | undefined;
/** Per-connection cache of constraint+sort -> Index to skip Map lookups. */
indexCache: Map<string, Index>;
/** Stringified sort key for this connection, cached to build index cache keys cheaply. */
requestedSortKey: string;
};

/**
Expand All @@ -94,6 +104,8 @@ export class MemorySource implements Source {
readonly #columns: Record<string, SchemaValue>;
readonly #primaryKey: PrimaryKey;
readonly #primaryIndexSort: Ordering;
/** Cached JSON key for the primary index to avoid repeated JSON.stringify. */
readonly #primaryIndexKey: string;
readonly #indexes: Map<string, Index> = new Map();
readonly #connections: Connection[] = [];

Expand All @@ -110,8 +122,9 @@ export class MemorySource implements Source {
this.#columns = columns;
this.#primaryKey = primaryKey;
this.#primaryIndexSort = primaryKey.map(k => [k, 'asc']);
this.#primaryIndexKey = JSON.stringify(this.#primaryIndexSort);
const comparator = makeBoundComparator(this.#primaryIndexSort);
this.#indexes.set(JSON.stringify(this.#primaryIndexSort), {
this.#indexes.set(this.#primaryIndexKey, {
comparator,
data: primaryIndexData ?? new BTreeSet<Row>(comparator),
usedBy: new Set(),
Expand Down Expand Up @@ -172,6 +185,7 @@ export class MemorySource implements Source {
fullyAppliedFilters: !transformedFilters.conditionsRemoved,
};

const requestedSortKey = sort.map(p => `${p[0]}:${p[1]}`).join('|');
const connection: Connection = {
input,
output: undefined,
Expand All @@ -185,6 +199,12 @@ export class MemorySource implements Source {
}
: undefined,
lastPushedEpoch: 0,
pkConstraint: primaryKeyConstraintFromFilters(
transformedFilters.filters,
this.#primaryKey,
),
indexCache: new Map(),
requestedSortKey,
};
const schema = this.#getSchema(connection);
assertOrderingIncludesPK(sort, this.#primaryKey);
Expand All @@ -207,7 +227,7 @@ export class MemorySource implements Source {
}

#getPrimaryIndex(): Index {
const index = this.#indexes.get(JSON.stringify(this.#primaryIndexSort));
const index = this.#indexes.get(this.#primaryIndexKey);
assert(index, 'Primary index not found');
return index;
}
Expand Down Expand Up @@ -254,10 +274,7 @@ export class MemorySource implements Source {
const connectionComparator = (r1: Row, r2: Row) =>
compareRows(r1, r2) * (req.reverse ? -1 : 1);

const pkConstraint = primaryKeyConstraintFromFilters(
conn.filters?.condition,
this.#primaryKey,
);
const pkConstraint = conn.pkConstraint;
// The primary key constraint will be more limiting than the constraint
// so swap out to that if it exists.
const fetchOrPkConstraint = pkConstraint ?? req.constraint;
Expand All @@ -273,15 +290,26 @@ export class MemorySource implements Source {
// For the special case of constraining by PK, we don't need to worry about
// any requested sort since there can only be one result. Otherwise we also
// need the index sorted by the requested sort.
if (
const includeRequestedSort =
this.#primaryKey.length > 1 ||
!fetchOrPkConstraint ||
!constraintMatchesPrimaryKey(fetchOrPkConstraint, this.#primaryKey)
) {
!constraintMatchesPrimaryKey(fetchOrPkConstraint, this.#primaryKey);
if (includeRequestedSort) {
indexSort.push(...requestedSort);
}

const index = this.#getOrCreateIndex(indexSort, conn);
let constraintShapeKey = '';
if (fetchOrPkConstraint) {
for (const key of Object.keys(fetchOrPkConstraint)) {
constraintShapeKey += key + '|';
}
}
const indexCacheKey = `${constraintShapeKey}::${includeRequestedSort ? conn.requestedSortKey : 'pk'}`;
let index = conn.indexCache.get(indexCacheKey);
if (!index) {
index = this.#getOrCreateIndex(indexSort, conn);
conn.indexCache.set(indexCacheKey, index);
}
const {data, comparator: compare} = index;
const indexComparator = (r1: Row, r2: Row) =>
compare(r1, r2) * (req.reverse ? -1 : 1);
Expand Down Expand Up @@ -535,31 +563,34 @@ function* genPush(
unreachable(change);
}

// Reuse a small set of objects across the connection loop below to avoid
// allocating fresh Node/Change objects per connection per push. The row
// fields are overwritten before each use. This is safe because filterPush
// and its downstream consumers process each change synchronously within
// the generator chain -- yield* completes fully before the next iteration
// mutates the objects. In a workload with 135 connections, this eliminates
// thousands of short-lived allocations per push cycle.
const placeholder: Row = {};
const reuseNode: Node = {row: placeholder, relationships: EMPTY_RELATIONSHIPS};
const reuseOldNode: Node = {row: placeholder, relationships: EMPTY_RELATIONSHIPS};
const reuseAddRemove: {type: 'add' | 'remove'; node: Node} = {type: 'add', node: reuseNode};
const reuseEdit: EditChange = {type: 'edit', oldNode: reuseOldNode, node: reuseNode};

for (const conn of connections) {
const {output, filters, input} = conn;
if (output) {
conn.lastPushedEpoch = pushEpoch;
setOverlay({epoch: pushEpoch, change});
const outputChange: Change =
change.type === 'edit'
? {
type: change.type,
oldNode: {
row: change.oldRow,
relationships: {},
},
node: {
row: change.row,
relationships: {},
},
}
: {
type: change.type,
node: {
row: change.row,
relationships: {},
},
};
let outputChange: Change;
if (change.type === 'edit') {
reuseOldNode.row = change.oldRow;
reuseNode.row = change.row;
outputChange = reuseEdit;
} else {
reuseNode.row = change.row;
reuseAddRemove.type = change.type;
outputChange = reuseAddRemove;
}
yield* filterPush(outputChange, output, input, filters?.predicate);
yield undefined;
}
Expand Down Expand Up @@ -739,7 +770,7 @@ export function* generateWithOverlayInner(
const cmp = compare(overlays.add, row);
if (cmp < 0) {
addOverlayYielded = true;
yield {row: overlays.add, relationships: {}};
yield {row: overlays.add, relationships: EMPTY_RELATIONSHIPS};
}
}

Expand All @@ -750,11 +781,11 @@ export function* generateWithOverlayInner(
continue;
}
}
yield {row, relationships: {}};
yield {row, relationships: EMPTY_RELATIONSHIPS};
}

if (!addOverlayYielded && overlays.add) {
yield {row: overlays.add, relationships: {}};
yield {row: overlays.add, relationships: EMPTY_RELATIONSHIPS};
}
}

Expand Down