Skip to content

Commit 55bfd94

Browse files
committed
cap operator
1 parent 8e0f600 commit 55bfd94

File tree

21 files changed

+2601
-69
lines changed

21 files changed

+2601
-69
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,5 @@ bun.lock
102102

103103
.claude/
104104
packages/zql-integration-tests/Chinook_Sqlite.sqlite
105+
106+
*.ignore.*
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
import {consoleLogSink, LogContext} from '@rocicorp/logger';
2+
import {beforeAll, expect, test} from 'vitest';
3+
import {testLogConfig} from '../../otel/src/test-log-config.ts';
4+
import {createSilentLogContext} from '../../shared/src/logging-test-utils.ts';
5+
import {must} from '../../shared/src/must.ts';
6+
import {initialSync} from '../../zero-cache/src/services/change-source/pg/initial-sync.ts';
7+
import {getConnectionURI, testDBs} from '../../zero-cache/src/test/db.ts';
8+
import type {PostgresDB} from '../../zero-cache/src/types/pg.ts';
9+
import {consume} from '../../zql/src/ivm/stream.ts';
10+
import type {QueryDelegate} from '../../zql/src/query/query-delegate.ts';
11+
import {newQuery} from '../../zql/src/query/query-impl.ts';
12+
import type {Query} from '../../zql/src/query/query.ts';
13+
import {createTableSQL, schema} from '../../zql/src/query/test/test-schemas.ts';
14+
import {Database} from '../../zqlite/src/db.ts';
15+
import {newQueryDelegate} from '../../zqlite/src/test/source-factory.ts';
16+
17+
const lc = createSilentLogContext();
18+
19+
let pg: PostgresDB;
20+
let sqlite: Database;
21+
type Schema = typeof schema;
22+
let issueQuery: Query<'issue', Schema>;
23+
let queryDelegate: QueryDelegate;
24+
25+
beforeAll(async () => {
26+
pg = await testDBs.create('cap-integration');
27+
await pg.unsafe(createTableSQL);
28+
sqlite = new Database(lc, ':memory:');
29+
30+
await pg.unsafe(/*sql*/ `
31+
INSERT INTO "users" ("id", "name") VALUES
32+
('user1', 'User 1');
33+
34+
INSERT INTO "issues" ("id", "title", "description", "closed", "owner_id", "createdAt") VALUES
35+
('issue1', 'Issue 1', 'Desc 1', false, 'user1', TIMESTAMPTZ '2001-01-01T00:00:00.000Z'),
36+
('issue2', 'Issue 2', 'Desc 2', false, 'user1', TIMESTAMPTZ '2001-01-02T00:00:00.000Z'),
37+
('issue3', 'Issue 3', 'Desc 3', false, 'user1', TIMESTAMPTZ '2001-01-03T00:00:00.000Z'),
38+
('issue4', 'Issue 4', 'Desc 4', false, 'user1', TIMESTAMPTZ '2001-01-04T00:00:00.000Z');
39+
40+
-- issue1: no comments (excluded by EXISTS)
41+
-- issue2: 1 comment (below cap of 3)
42+
INSERT INTO "comments" ("id", "authorId", "issue_id", "text", "createdAt") VALUES
43+
('c2a', 'user1', 'issue2', 'Comment 2a', TIMESTAMP '2002-01-01 00:00:00');
44+
45+
-- issue3: 3 comments (at cap limit)
46+
INSERT INTO "comments" ("id", "authorId", "issue_id", "text", "createdAt") VALUES
47+
('c3a', 'user1', 'issue3', 'Comment 3a', TIMESTAMP '2002-02-01 00:00:00'),
48+
('c3b', 'user1', 'issue3', 'Comment 3b', TIMESTAMP '2002-02-02 00:00:00'),
49+
('c3c', 'user1', 'issue3', 'Comment 3c', TIMESTAMP '2002-02-03 00:00:00');
50+
51+
-- issue4: 5 comments (3 tracked + 2 overflow)
52+
INSERT INTO "comments" ("id", "authorId", "issue_id", "text", "createdAt") VALUES
53+
('c4a', 'user1', 'issue4', 'Comment 4a', TIMESTAMP '2002-03-01 00:00:00'),
54+
('c4b', 'user1', 'issue4', 'Comment 4b', TIMESTAMP '2002-03-02 00:00:00'),
55+
('c4c', 'user1', 'issue4', 'Comment 4c', TIMESTAMP '2002-03-03 00:00:00'),
56+
('c4d', 'user1', 'issue4', 'Comment 4d', TIMESTAMP '2002-03-04 00:00:00'),
57+
('c4e', 'user1', 'issue4', 'Comment 4e', TIMESTAMP '2002-03-05 00:00:00');
58+
`);
59+
60+
await initialSync(
61+
new LogContext('debug', {}, consoleLogSink),
62+
{appID: 'cap_integration', shardNum: 0, publications: []},
63+
sqlite,
64+
getConnectionURI(pg),
65+
{tableCopyWorkers: 1},
66+
{},
67+
);
68+
69+
queryDelegate = newQueryDelegate(lc, testLogConfig, sqlite, schema);
70+
issueQuery = newQuery(schema, 'issue');
71+
});
72+
73+
function makeQuery() {
74+
return issueQuery.whereExists('comments').related('comments');
75+
}
76+
77+
test('initial materialization — issue1 excluded, issue2/3/4 included', () => {
78+
const q = makeQuery();
79+
const view = queryDelegate.materialize(q);
80+
const data = view.data as ReadonlyArray<{
81+
readonly id: string;
82+
readonly comments: ReadonlyArray<{readonly id: string}>;
83+
}>;
84+
85+
// issue1 has no comments → excluded by EXISTS
86+
const ids = data.map(r => r.id);
87+
expect(ids).not.toContain('issue1');
88+
expect(ids).toContain('issue2');
89+
expect(ids).toContain('issue3');
90+
expect(ids).toContain('issue4');
91+
92+
// issue4 has all 5 comments in related (Cap only affects EXISTS child, not related)
93+
const issue4 = must(data.find(r => r.id === 'issue4'));
94+
expect(issue4.comments).toHaveLength(5);
95+
96+
expect(view.data).toEqual(queryDelegate.materialize(q).data);
97+
});
98+
99+
test('add comment to commentless issue → issue appears', () => {
100+
const q = makeQuery();
101+
const view = queryDelegate.materialize(q);
102+
103+
consume(
104+
must(queryDelegate.getSource('comments')).push({
105+
type: 'add',
106+
row: {
107+
id: 'c1a',
108+
authorId: 'user1',
109+
issue_id: 'issue1',
110+
text: 'Comment 1a',
111+
createdAt: 1100000000000,
112+
},
113+
}),
114+
);
115+
116+
const data = view.data as ReadonlyArray<{readonly id: string}>;
117+
expect(data.map(r => r.id)).toContain('issue1');
118+
119+
expect(view.data).toEqual(queryDelegate.materialize(q).data);
120+
});
121+
122+
test('add beyond cap limit → issue stays, related shows all', () => {
123+
const q = makeQuery();
124+
const view = queryDelegate.materialize(q);
125+
126+
// issue3 had 3 comments (at cap). Add a 4th.
127+
consume(
128+
must(queryDelegate.getSource('comments')).push({
129+
type: 'add',
130+
row: {
131+
id: 'c3d',
132+
authorId: 'user1',
133+
issue_id: 'issue3',
134+
text: 'Comment 3d',
135+
createdAt: 1100000001000,
136+
},
137+
}),
138+
);
139+
140+
const data = view.data as ReadonlyArray<{
141+
readonly id: string;
142+
readonly comments: ReadonlyArray<{readonly id: string}>;
143+
}>;
144+
expect(data.map(r => r.id)).toContain('issue3');
145+
146+
// Related shows all 4 comments
147+
const issue3 = must(data.find(r => r.id === 'issue3'));
148+
expect(issue3.comments).toHaveLength(4);
149+
150+
expect(view.data).toEqual(queryDelegate.materialize(q).data);
151+
});
152+
153+
test('remove tracked comment with overflow → issue stays', () => {
154+
const q = makeQuery();
155+
const view = queryDelegate.materialize(q);
156+
157+
// issue4 has 5 comments. Remove c4a (tracked by cap). Cap refills from overflow.
158+
consume(
159+
must(queryDelegate.getSource('comments')).push({
160+
type: 'remove',
161+
row: {
162+
id: 'c4a',
163+
authorId: 'user1',
164+
issue_id: 'issue4',
165+
text: 'Comment 4a',
166+
createdAt: 1015027200000,
167+
},
168+
}),
169+
);
170+
171+
const data = view.data as ReadonlyArray<{readonly id: string}>;
172+
expect(data.map(r => r.id)).toContain('issue4');
173+
174+
expect(view.data).toEqual(queryDelegate.materialize(q).data);
175+
});
176+
177+
test('remove untracked overflow comment → issue stays', () => {
178+
const q = makeQuery();
179+
const view = queryDelegate.materialize(q);
180+
181+
// Remove c4e (overflow, not tracked by cap)
182+
consume(
183+
must(queryDelegate.getSource('comments')).push({
184+
type: 'remove',
185+
row: {
186+
id: 'c4e',
187+
authorId: 'user1',
188+
issue_id: 'issue4',
189+
text: 'Comment 4e',
190+
createdAt: 1015372800000,
191+
},
192+
}),
193+
);
194+
195+
const data = view.data as ReadonlyArray<{readonly id: string}>;
196+
expect(data.map(r => r.id)).toContain('issue4');
197+
198+
expect(view.data).toEqual(queryDelegate.materialize(q).data);
199+
});
200+
201+
test('remove only comment → issue disappears', () => {
202+
const q = makeQuery();
203+
const view = queryDelegate.materialize(q);
204+
205+
// Remove c1a from issue1 (the only comment, added in test 2)
206+
consume(
207+
must(queryDelegate.getSource('comments')).push({
208+
type: 'remove',
209+
row: {
210+
id: 'c1a',
211+
authorId: 'user1',
212+
issue_id: 'issue1',
213+
text: 'Comment 1a',
214+
createdAt: 1100000000000,
215+
},
216+
}),
217+
);
218+
219+
const data = view.data as ReadonlyArray<{readonly id: string}>;
220+
expect(data.map(r => r.id)).not.toContain('issue1');
221+
222+
expect(view.data).toEqual(queryDelegate.materialize(q).data);
223+
});
224+
225+
test('re-add comment → issue reappears', () => {
226+
const q = makeQuery();
227+
const view = queryDelegate.materialize(q);
228+
229+
consume(
230+
must(queryDelegate.getSource('comments')).push({
231+
type: 'add',
232+
row: {
233+
id: 'c1b',
234+
authorId: 'user1',
235+
issue_id: 'issue1',
236+
text: 'Comment 1b',
237+
createdAt: 1100000002000,
238+
},
239+
}),
240+
);
241+
242+
const data = view.data as ReadonlyArray<{readonly id: string}>;
243+
expect(data.map(r => r.id)).toContain('issue1');
244+
245+
expect(view.data).toEqual(queryDelegate.materialize(q).data);
246+
});
247+
248+
test('join-level unordered overlay — remove comment triggers overlay for multiple parent issues', () => {
249+
// Uses ownerComments: issue.ownerId = comment.authorId
250+
// All 4 issues have ownerId='user1', all comments have authorId='user1'
251+
// So a single comment change matches ALL 4 issues as parents.
252+
// With flip: false, the planner builds a regular Join + Cap(limit=3, unordered).
253+
// When Cap pushes a remove+refill to Join, Join iterates all 4 parent issues,
254+
// and for issues 2-4, generateWithOverlayUnordered (join-utils.ts) is called.
255+
const q = issueQuery.whereExists('ownerComments', {flip: false});
256+
const view = queryDelegate.materialize(q);
257+
258+
// All 4 issues should be present (all have ownerComments via ownerId='user1')
259+
const initialData = view.data as ReadonlyArray<{readonly id: string}>;
260+
const initialIds = initialData.map(r => r.id);
261+
expect(initialIds).toContain('issue1');
262+
expect(initialIds).toContain('issue2');
263+
expect(initialIds).toContain('issue3');
264+
expect(initialIds).toContain('issue4');
265+
266+
expect(view.data).toEqual(queryDelegate.materialize(q).data);
267+
268+
// Remove comments to ensure we hit a tracked one.
269+
// Cap tracks the first 3 it encounters (unordered). Removing multiple
270+
// guarantees at least one hits a tracked comment, triggering Cap refill → Join overlay.
271+
// After prior tests, the remaining comments are:
272+
// c1b (issue1), c2a (issue2), c3a/c3b/c3c/c3d (issue3), c4b/c4c/c4d (issue4)
273+
const commentsToRemove = [
274+
{
275+
id: 'c2a',
276+
authorId: 'user1',
277+
issue_id: 'issue2',
278+
text: 'Comment 2a',
279+
createdAt: 1009843200000,
280+
},
281+
{
282+
id: 'c3a',
283+
authorId: 'user1',
284+
issue_id: 'issue3',
285+
text: 'Comment 3a',
286+
createdAt: 1012521600000,
287+
},
288+
{
289+
id: 'c3b',
290+
authorId: 'user1',
291+
issue_id: 'issue3',
292+
text: 'Comment 3b',
293+
createdAt: 1012608000000,
294+
},
295+
{
296+
id: 'c4b',
297+
authorId: 'user1',
298+
issue_id: 'issue4',
299+
text: 'Comment 4b',
300+
createdAt: 1015113600000,
301+
},
302+
];
303+
304+
const source = must(queryDelegate.getSource('comments'));
305+
for (const row of commentsToRemove) {
306+
consume(source.push({type: 'remove', row}));
307+
expect(view.data).toEqual(queryDelegate.materialize(q).data);
308+
}
309+
});

packages/zql-integration-tests/src/chinook/planner-exec.pg.test.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -224,13 +224,13 @@ describe('Chinook planner execution cost validation', () => {
224224
.limit(15),
225225
validations: [
226226
['correlation', 0],
227-
['within-optimal', 1.7],
228-
['within-baseline', 1.7],
227+
['within-optimal', 1.75],
228+
['within-baseline', 1.75],
229229
],
230230
extraIndexValidations: [
231231
['correlation', 0],
232-
['within-optimal', 1.7],
233-
['within-baseline', 1.7],
232+
['within-optimal', 1.75],
233+
['within-baseline', 1.75],
234234
],
235235
},
236236

@@ -255,7 +255,7 @@ describe('Chinook planner execution cost validation', () => {
255255
extraIndexValidations: [
256256
['correlation', 0.8],
257257
['within-optimal', 1],
258-
['within-baseline', 0.025],
258+
['within-baseline', 0.04],
259259
],
260260
},
261261

@@ -425,12 +425,12 @@ describe('Chinook planner execution cost validation', () => {
425425
.where('name', 'LIKE', 'Music%')
426426
.whereExists('tracks', t => t.where('name', 'LIKE', 'A%')),
427427
validations: [
428-
['correlation', 0.8],
428+
['correlation', 0.4],
429429
['within-optimal', 1],
430430
['within-baseline', 1],
431431
],
432432
extraIndexValidations: [
433-
['correlation', 0.8],
433+
['correlation', 0.4],
434434
['within-optimal', 1],
435435
['within-baseline', 1],
436436
],

packages/zql-integration-tests/src/discord-repro.pg.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,19 @@ test('discord report https://discord.com/channels/830183651022471199/13475501749
116116
title: 'Test Issue 1',
117117
description: 'Description for issue 1',
118118
closed: false,
119-
ownerId: 'user1',
119+
owner_id: 'user1',
120+
createdAt: 982355920000,
120121
},
121122
row: {
122123
id: 'issue1',
123124
title: 'Test Issue 1',
124125
description: 'Description for issue 1',
125126
closed: true,
126-
ownerId: 'user1',
127+
owner_id: 'user1',
128+
createdAt: 982355920000,
127129
},
128130
}),
129131
);
130132

131-
expect(mapResultToClientNames(view.data, schema, 'issue')).toEqual(
132-
queryDelegate.materialize(q).data,
133-
);
133+
expect(view.data).toEqual(queryDelegate.materialize(q).data);
134134
});

0 commit comments

Comments
 (0)