Skip to content

Commit 81adfde

Browse files
committed
Unit test for snapshot loading + fixes
1 parent d63d303 commit 81adfde

File tree

7 files changed

+188
-39
lines changed

7 files changed

+188
-39
lines changed

packages/db/src/collection-subscription.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import {
33
createFilteredCallback,
44
} from "./change-events.js"
55
import { ensureIndexForExpression } from "./indexes/auto-index.js"
6-
import { and } from "./query/index.js"
7-
import type { BasicExpression, OrderBy } from "./query/ir.js"
6+
import { and, gt, lt } from "./query/index.js"
7+
import { Value, type BasicExpression, type OrderBy } from "./query/ir.js"
88
import type { BaseIndex } from "./indexes/base-index.js"
99
import type { ChangeMessage } from "./types.js"
1010
import type { Collection } from "./collection.js"
@@ -153,6 +153,7 @@ export class CollectionSubscription {
153153
limit,
154154
minValue,
155155
}: RequestLimitedSnapshotOptions) {
156+
console.log("in requestLimitedSnapshot")
156157
if (!limit) throw new Error(`limit is required`)
157158

158159
if (!this.orderByIndex) {
@@ -203,10 +204,19 @@ export class CollectionSubscription {
203204

204205
this.callback(changes)
205206

207+
let whereWithValueFilter = where
208+
if (typeof minValue !== `undefined`) {
209+
// Only request data that we haven't seen yet (i.e. is bigger than the minValue)
210+
const { expression, compareOptions } = orderBy[0]!
211+
const operator = compareOptions.direction === `asc` ? gt : lt
212+
const valueFilter = operator(expression, new Value(minValue))
213+
whereWithValueFilter = where ? and(where, valueFilter) : valueFilter
214+
}
215+
206216
// Request the sync layer to load more data
207217
// don't await it, we will load the data into the collection when it comes in
208218
this.collection.syncMore({
209-
where,
219+
where: whereWithValueFilter,
210220
limit,
211221
orderBy,
212222
})

packages/db/src/query/compiler/expressions.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Func, PropRef, Value } from "../ir.js"
2-
import type { BasicExpression } from "../ir.js"
2+
import type { BasicExpression, OrderBy } from "../ir.js"
33

44
/**
55
* Functions supported by the collection index system.
@@ -90,3 +90,26 @@ export function convertToBasicExpression(
9090
return new Func(whereClause.name, args)
9191
}
9292
}
93+
94+
export function convertOrderByToBasicExpression(
95+
orderBy: OrderBy,
96+
collectionAlias: string
97+
): OrderBy {
98+
const normalizedOrderBy = orderBy.map((clause) => {
99+
const basicExp = convertToBasicExpression(
100+
clause.expression,
101+
collectionAlias
102+
)
103+
104+
if (!basicExp) {
105+
throw new Error(`Failed to convert orderBy expression to a basic expression: ${clause.expression}`)
106+
}
107+
108+
return {
109+
...clause,
110+
expression: basicExp,
111+
}
112+
})
113+
114+
return normalizedOrderBy
115+
}

packages/db/src/query/live/collection-subscriber.ts

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { MultiSet } from "@tanstack/db-ivm"
2-
import { convertToBasicExpression } from "../compiler/expressions.js"
2+
import { convertOrderByToBasicExpression, convertToBasicExpression } from "../compiler/expressions.js"
33
import type { FullSyncState } from "./types.js"
44
import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm"
55
import type { Collection } from "../../collection.js"
@@ -16,26 +16,29 @@ export class CollectionSubscriber<
1616
// Keep track of the biggest value we've sent so far (needed for orderBy optimization)
1717
private biggest: any = undefined
1818

19+
private collectionAlias: string
20+
1921
constructor(
2022
private collectionId: string,
2123
private collection: Collection,
2224
private config: Parameters<SyncConfig<TResult>[`sync`]>[0],
2325
private syncState: FullSyncState,
2426
private collectionConfigBuilder: CollectionConfigBuilder<TContext, TResult>
25-
) {}
26-
27-
subscribe(): CollectionSubscription {
28-
const collectionAlias = findCollectionAlias(
27+
) {
28+
this.collectionAlias = findCollectionAlias(
2929
this.collectionId,
3030
this.collectionConfigBuilder.query
31-
)
32-
const whereClause = this.getWhereClauseFromAlias(collectionAlias)
31+
)!
32+
}
33+
34+
subscribe(): CollectionSubscription {
35+
const whereClause = this.getWhereClauseFromAlias(this.collectionAlias)
3336

3437
if (whereClause) {
3538
// Convert WHERE clause to BasicExpression format for collection subscription
3639
const whereExpression = convertToBasicExpression(
3740
whereClause,
38-
collectionAlias!
41+
this.collectionAlias
3942
)
4043

4144
if (whereExpression) {
@@ -164,11 +167,14 @@ export class CollectionSubscriber<
164167

165168
subscription.setOrderByIndex(index)
166169

170+
// Normalize the orderBy clauses such that the references are relative to the collection
171+
const normalizedOrderBy = convertOrderByToBasicExpression(orderBy, this.collectionAlias)
172+
167173
// Load the first `offset + limit` values from the index
168174
// i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[
169175
subscription.requestLimitedSnapshot({
170176
limit: offset + limit,
171-
orderBy,
177+
orderBy: normalizedOrderBy,
172178
})
173179

174180
return subscription
@@ -234,9 +240,16 @@ export class CollectionSubscriber<
234240
const biggestSentValue = biggestSentRow
235241
? valueExtractorForRawRow(biggestSentRow)
236242
: biggestSentRow
243+
244+
// Normalize the orderBy clauses such that the references are relative to the collection
245+
const normalizedOrderBy = convertOrderByToBasicExpression(
246+
orderBy,
247+
this.collectionAlias
248+
)
249+
237250
// Take the `n` items after the biggest sent value
238251
subscription.requestLimitedSnapshot({
239-
orderBy,
252+
orderBy: normalizedOrderBy,
240253
limit: n,
241254
minValue: biggestSentValue,
242255
})

packages/electric-db-collection/src/electric.ts

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
import { compileSQL } from "./sql-compiler"
1616
import type {
1717
BaseCollectionConfig,
18+
Collection,
1819
CollectionConfig,
1920
DeleteMutationFnParams,
2021
Fn,
@@ -419,7 +420,10 @@ function createElectricSync<T extends Row<unknown>>(
419420
})
420421

421422
return {
422-
onLoadMore: (opts) => onLoadMore(stream, params, opts),
423+
onLoadMore: (opts) => {
424+
const snapshotParams = compileSQL<T>(opts)
425+
return stream.requestSnapshot(snapshotParams)
426+
},
423427
cleanup: () => {
424428
// Unsubscribe from the stream
425429
unsubscribeStream()
@@ -433,21 +437,3 @@ function createElectricSync<T extends Row<unknown>>(
433437
}
434438
}
435439

436-
async function onLoadMore<T extends Row<unknown>>(
437-
stream: ShapeStream<T>,
438-
syncParams: Parameters<SyncConfig<T>[`sync`]>[0],
439-
options: OnLoadMoreOptions
440-
) {
441-
const { begin, write, commit } = syncParams
442-
const snapshotParams = compileSQL<T>(options)
443-
const snapshot = await stream.requestSnapshot(snapshotParams)
444-
445-
begin()
446-
snapshot.data.forEach((row) => {
447-
write({
448-
type: `insert`,
449-
value: row.value,
450-
})
451-
})
452-
commit()
453-
}

packages/electric-db-collection/src/pg-serializer.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ export function serialize(value: unknown): string {
33
return `'${value}'`
44
}
55

6+
if (typeof value === `number`) {
7+
return value.toString()
8+
}
9+
610
if (value === null || value === undefined) {
711
return `NULL`
812
}
@@ -19,9 +23,5 @@ export function serialize(value: unknown): string {
1923
return `ARRAY[${value.map(serialize).join(`,`)}]`
2024
}
2125

22-
if (typeof value === `object`) {
23-
throw new Error(`Cannot serialize object: ${JSON.stringify(value)}`)
24-
}
25-
26-
return value.toString()
26+
throw new Error(`Cannot serialize value: ${JSON.stringify(value)}`)
2727
}

packages/electric-db-collection/src/sql-compiler.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export function compileSQL<T>(
3030

3131
// Serialize the values in the params array into PG formatted strings
3232
// and transform the array into a Record<string, string>
33+
console.log("params", params)
3334
const paramsRecord = params.reduce(
3435
(acc, param, index) => {
3536
acc[`${index + 1}`] = serialize(param)
@@ -58,7 +59,8 @@ function compileBasicExpression(
5859
case `val`:
5960
params.push(exp.value)
6061
return `$${params.length}`
61-
case `ref`:
62+
case `ref`:
63+
// TODO: doesn't yet support JSON(B) values which could be accessed with nested props
6264
if (exp.path.length !== 1) {
6365
throw new Error(
6466
`Compiler can't handle nested properties: ${exp.path.join(`.`)}`

packages/electric-db-collection/tests/electric-live-query.test.ts

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,30 @@ const sampleUsers: Array<User> = [
5454

5555
// Mock the ShapeStream module
5656
const mockSubscribe = vi.fn()
57+
const mockRequestSnapshot = vi.fn()
5758
const mockStream = {
5859
subscribe: mockSubscribe,
60+
requestSnapshot: (...args: any) => {
61+
mockRequestSnapshot(...args)
62+
const results = mockRequestSnapshot.mock.results
63+
const lastResult = results[results.length - 1]!.value
64+
65+
const subscribers = mockSubscribe.mock.calls.map(args => args[0])
66+
subscribers.forEach(subscriber => subscriber(lastResult.data.map((row: any) => ({
67+
type: `insert`,
68+
value: row.value,
69+
key: row.key,
70+
}))))
71+
}
5972
}
6073

74+
// Mock the requestSnapshot method
75+
// to return an empty array of data
76+
// since most tests don't use it
77+
mockRequestSnapshot.mockResolvedValue({
78+
data: []
79+
})
80+
6181
vi.mock(`@electric-sql/client`, async () => {
6282
const actual = await vi.importActual(`@electric-sql/client`)
6383
return {
@@ -338,4 +358,99 @@ describe.each([
338358
expect(testElectricCollection.status).toBe(`ready`)
339359
expect(liveQuery.status).toBe(`ready`)
340360
})
361+
362+
if (autoIndex === `eager`) {
363+
it.only(`should load more data via requestSnapshot when creating live query with higher limit`, async () => {
364+
// Reset mocks
365+
vi.clearAllMocks()
366+
mockRequestSnapshot.mockResolvedValue({
367+
data: [
368+
{ key: 5, value: { id: 5, name: `Eve`, age: 30, email: `[email protected]`, active: true } },
369+
{ key: 6, value: { id: 6, name: `Frank`, age: 35, email: `[email protected]`, active: true } },
370+
],
371+
})
372+
373+
// Initial sync with limited data
374+
simulateInitialSync()
375+
expect(electricCollection.status).toBe(`ready`)
376+
expect(electricCollection.size).toBe(4)
377+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(0)
378+
379+
// Create first live query with limit of 2
380+
const limitedLiveQuery = createLiveQueryCollection({
381+
id: `limited-users-live-query`,
382+
startSync: true,
383+
query: (q) =>
384+
q
385+
.from({ user: electricCollection })
386+
.where(({ user }) => eq(user.active, true))
387+
.select(({ user }) => ({
388+
id: user.id,
389+
name: user.name,
390+
active: user.active,
391+
age: user.age,
392+
}))
393+
.orderBy(({ user }) => user.age, `asc`)
394+
.limit(2),
395+
})
396+
397+
expect(limitedLiveQuery.status).toBe(`ready`)
398+
expect(limitedLiveQuery.size).toBe(2) // Only first 2 active users
399+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(1)
400+
401+
const callArgs = (index: number) => mockRequestSnapshot.mock.calls[index]?.[0]
402+
expect(callArgs(0)).toMatchObject({
403+
params: { "1": "true" },
404+
where: "active = $1",
405+
orderBy: "age NULLS FIRST",
406+
limit: 2,
407+
})
408+
409+
// Create second live query with higher limit of 5
410+
const expandedLiveQuery = createLiveQueryCollection({
411+
id: `expanded-users-live-query`,
412+
startSync: true,
413+
query: (q) =>
414+
q
415+
.from({ user: electricCollection })
416+
.where(({ user }) => eq(user.active, true))
417+
.select(({ user }) => ({
418+
id: user.id,
419+
name: user.name,
420+
active: user.active,
421+
}))
422+
.orderBy(({ user }) => user.age, `asc`)
423+
.limit(6),
424+
})
425+
426+
// Wait for the live query to process
427+
await new Promise((resolve) => setTimeout(resolve, 0))
428+
429+
// Verify that requestSnapshot was called with the correct parameters
430+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(3)
431+
432+
// Check that first it requested a limit of 6 users
433+
expect(callArgs(1)).toMatchObject({
434+
params: { "1": "true" },
435+
where: "active = $1",
436+
orderBy: "age NULLS FIRST",
437+
limit: 6,
438+
})
439+
440+
// After this initial snapshot for the new live query it receives all 3 users from the local collection
441+
// so it still needs 3 more users to reach the limit of 6 so it requests 3 more to the sync layer
442+
expect(callArgs(2)).toMatchObject({
443+
params: { "1": "true", "2": "25" },
444+
where: "active = $1 AND age > $2",
445+
orderBy: "age NULLS FIRST",
446+
limit: 3,
447+
})
448+
449+
// The sync layer won't provide any more users so the DB is exhausted and it stops (i.e. doesn't request more)
450+
451+
// The expanded live query should now have more data
452+
expect(expandedLiveQuery.status).toBe(`ready`)
453+
expect(expandedLiveQuery.size).toBe(5) // Alice, Bob, Dave from initial + Eve and Frank from additional data
454+
})
455+
}
341456
})

0 commit comments

Comments
 (0)