Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions packages/zql/src/ivm/array-view.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,23 +154,15 @@ test('single-format', () => {

// trying to add another element should be an error
// pipeline should have been configured with a limit of one
// With batched change application, the error is thrown when changes are applied
// (at flush() or .data access), not at push() time.
consume(ms.push({row: {a: 2, b: 'b'}, type: 'add'}));
expect(() => view.flush()).toThrow(
// Changes are applied immediately in push(), so the error is thrown at push time.
expect(() => consume(ms.push({row: {a: 2, b: 'b'}, type: 'add'}))).toThrow(
"Singular relationship '' should not have multiple rows. You may need to declare this relationship with the `many` helper instead of the `one` helper in your schema.",
);

// Adding the same element is not an error in the ArrayView but it is an error
// in the Source. This case is tested in view-apply-change.ts.

// Note: After the failed flush, the pending change is still there. Let's verify
// that accessing .data also throws (auto-flush safety net).
expect(() => view.data).toThrow(
"Singular relationship '' should not have multiple rows. You may need to declare this relationship with the `many` helper instead of the `one` helper in your schema.",
);

// The listener's data is still the old value since the flush failed
// The listener's data is still the old value since the push failed
expect(data).toEqual({a: 1, b: 'a'});
expect(callCount).toBe(1);

Expand Down
95 changes: 12 additions & 83 deletions packages/zql/src/ivm/array-view.ts
Original file line number Diff line number Diff line change
@@ -1,60 +1,15 @@
import {assert} from '../../../shared/src/asserts.ts';
import type {Immutable} from '../../../shared/src/immutable.ts';
import {mapValues} from '../../../shared/src/objects.ts';
import {emptyArray} from '../../../shared/src/sentinels.ts';
import type {ErroredQuery} from '../../../zero-protocol/src/custom-queries.ts';
import type {TTL} from '../query/ttl.ts';
import type {Listener, ResultType, TypedView} from '../query/typed-view.ts';
import type {Change} from './change.ts';
import type {Node} from './data.ts';
import {skipYields, type Input, type Output} from './operator.ts';
import type {SourceSchema} from './schema.ts';
import {
applyChanges,
type ExpandedNode,
type ViewChange,
} from './view-apply-change.ts';
import {applyChange} from './view-apply-change.ts';
import type {Entry, Format, View} from './view.ts';

/**
* Eagerly expand a Node's lazy relationship generators into arrays.
* This captures the current state of the source at the moment of expansion.
*/
function expandNode(node: Node): ExpandedNode {
return {
row: node.row,
relationships: mapValues(node.relationships, v =>
Array.from(skipYields(v()), expandNode),
),
};
}

/**
* Expand a Change by eagerly evaluating all lazy relationship generators.
*/
function expandChange(change: Change): ViewChange {
switch (change.type) {
case 'add':
case 'remove':
return {type: change.type, node: expandNode(change.node)};
case 'edit':
return {
type: 'edit',
node: expandNode(change.node),
oldNode: expandNode(change.oldNode),
};
case 'child':
return {
type: 'child',
node: expandNode(change.node),
child: {
relationshipName: change.child.relationshipName,
change: expandChange(change.child.change),
},
};
}
}

/**
* Implements a materialized view of the output of an operator.
*
Expand Down Expand Up @@ -83,9 +38,6 @@ export class ArrayView<V extends View> implements Output, TypedView<V> {
#error: ErroredQuery | undefined;
readonly #updateTTL: (ttl: TTL) => void;

// Pending changes buffered for batch application (O(N + K) optimization)
#pendingChanges: ViewChange[] = [];

constructor(
input: Input,
format: Format,
Expand Down Expand Up @@ -122,32 +74,9 @@ export class ArrayView<V extends View> implements Output, TypedView<V> {
}

get data() {
// Auto-flush for backwards compatibility. Recommended: push() then flush().
//
// push(A) ──► buffer ──► push(B) ──► buffer ──► flush() ──► apply all
// │
// Legacy code may read .data here ─────────────────┘ (before flush)
// │
// ▼
// Without auto-flush: stale data (missing A, B)
// With auto-flush: current data (has A, B)
this.#applyPendingChanges();
return this.#root[''] as V;
}

#applyPendingChanges() {
if (this.#pendingChanges.length > 0) {
this.#root = applyChanges(
this.#root,
this.#pendingChanges,
this.#schema,
'',
this.#format,
);
this.#pendingChanges = [];
}
}

addListener(listener: Listener<V>) {
assert(!this.#listeners.has(listener), 'Listener already registered');
this.#listeners.add(listener);
Expand Down Expand Up @@ -175,12 +104,10 @@ export class ArrayView<V extends View> implements Output, TypedView<V> {

#hydrate() {
this.#dirty = true;
// During hydration, expand and apply nodes immediately
for (const node of skipYields(this.#input.fetch({}))) {
const expanded = expandNode(node);
this.#root = applyChanges(
this.#root = applyChange(
this.#root,
[{type: 'add', node: expanded}],
{type: 'add', node},
this.#schema,
'',
this.#format,
Expand All @@ -191,11 +118,15 @@ export class ArrayView<V extends View> implements Output, TypedView<V> {

push(change: Change) {
this.#dirty = true;
// Eagerly expand the change to capture current source state.
// This is critical: lazy generators would see stale data if deferred.
// Buffer the change for batch application (O(N + K) optimization).
const expanded = expandChange(change);
this.#pendingChanges.push(expanded);
// Apply immediately to capture current source state. Lazy relationship
// thunks must be evaluated now, before subsequent pushes mutate the source.
this.#root = applyChange(
this.#root,
change,
this.#schema,
'',
this.#format,
);
return emptyArray;
}

Expand All @@ -204,8 +135,6 @@ export class ArrayView<V extends View> implements Output, TypedView<V> {
return;
}
this.#dirty = false;
// Apply all pending changes in one batch (O(N + K) optimization)
this.#applyPendingChanges();
this.#fireListeners();
}

Expand Down