Skip to content

Commit 446daf5

Browse files
committed
Unit test for snapshot loading + fixes
1 parent cf842ab commit 446daf5

File tree

7 files changed

+187
-39
lines changed

7 files changed

+187
-39
lines changed

packages/db/src/collection/subscription.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { ensureIndexForExpression } from "../indexes/auto-index.js"
2-
import { and } from "../query/builder/functions.js"
2+
import { and, gt, lt } from "../query/index.js"
33
import {
44
createFilterFunctionFromExpression,
55
createFilteredCallback,
66
} from "./change-events.js"
7-
import type { BasicExpression, OrderBy } from "../query/ir.js"
7+
import { Value, type BasicExpression, type OrderBy } from "../query/ir.js"
88
import type { BaseIndex } from "../indexes/base-index.js"
99
import type { ChangeMessage } from "../types.js"
1010
import type { CollectionImpl } from "./index.js"
@@ -153,6 +153,7 @@ export class CollectionSubscription {
153153
limit,
154154
minValue,
155155
}: RequestLimitedSnapshotOptions) {
156+
console.log("in requestLimitedSnapshot")
156157
if (!limit) throw new Error(`limit is required`)
157158

158159
if (!this.orderByIndex) {
@@ -203,10 +204,19 @@ export class CollectionSubscription {
203204

204205
this.callback(changes)
205206

207+
let whereWithValueFilter = where
208+
if (typeof minValue !== `undefined`) {
209+
// Only request data that we haven't seen yet (i.e. is bigger than the minValue)
210+
const { expression, compareOptions } = orderBy[0]!
211+
const operator = compareOptions.direction === `asc` ? gt : lt
212+
const valueFilter = operator(expression, new Value(minValue))
213+
whereWithValueFilter = where ? and(where, valueFilter) : valueFilter
214+
}
215+
206216
// Request the sync layer to load more data
207217
// don't await it, we will load the data into the collection when it comes in
208218
this.collection.syncMore({
209-
where,
219+
where: whereWithValueFilter,
210220
limit,
211221
orderBy,
212222
})

packages/db/src/query/compiler/expressions.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Func, PropRef, Value } from "../ir.js"
2-
import type { BasicExpression } from "../ir.js"
2+
import type { BasicExpression, OrderBy } from "../ir.js"
33

44
/**
55
* Functions supported by the collection index system.
@@ -90,3 +90,26 @@ export function convertToBasicExpression(
9090
return new Func(whereClause.name, args)
9191
}
9292
}
93+
94+
export function convertOrderByToBasicExpression(
95+
orderBy: OrderBy,
96+
collectionAlias: string
97+
): OrderBy {
98+
const normalizedOrderBy = orderBy.map((clause) => {
99+
const basicExp = convertToBasicExpression(
100+
clause.expression,
101+
collectionAlias
102+
)
103+
104+
if (!basicExp) {
105+
throw new Error(`Failed to convert orderBy expression to a basic expression: ${clause.expression}`)
106+
}
107+
108+
return {
109+
...clause,
110+
expression: basicExp,
111+
}
112+
})
113+
114+
return normalizedOrderBy
115+
}

packages/db/src/query/live/collection-subscriber.ts

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { MultiSet } from "@tanstack/db-ivm"
2-
import { convertToBasicExpression } from "../compiler/expressions.js"
2+
import { convertOrderByToBasicExpression, convertToBasicExpression } from "../compiler/expressions.js"
33
import type { FullSyncState } from "./types.js"
44
import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm"
55
import type { Collection } from "../../collection/index.js"
@@ -16,26 +16,29 @@ export class CollectionSubscriber<
1616
// Keep track of the biggest value we've sent so far (needed for orderBy optimization)
1717
private biggest: any = undefined
1818

19+
private collectionAlias: string
20+
1921
constructor(
2022
private collectionId: string,
2123
private collection: Collection,
2224
private config: Parameters<SyncConfig<TResult>[`sync`]>[0],
2325
private syncState: FullSyncState,
2426
private collectionConfigBuilder: CollectionConfigBuilder<TContext, TResult>
25-
) {}
26-
27-
subscribe(): CollectionSubscription {
28-
const collectionAlias = findCollectionAlias(
27+
) {
28+
this.collectionAlias = findCollectionAlias(
2929
this.collectionId,
3030
this.collectionConfigBuilder.query
31-
)
32-
const whereClause = this.getWhereClauseFromAlias(collectionAlias)
31+
)!
32+
}
33+
34+
subscribe(): CollectionSubscription {
35+
const whereClause = this.getWhereClauseFromAlias(this.collectionAlias)
3336

3437
if (whereClause) {
3538
// Convert WHERE clause to BasicExpression format for collection subscription
3639
const whereExpression = convertToBasicExpression(
3740
whereClause,
38-
collectionAlias!
41+
this.collectionAlias
3942
)
4043

4144
if (whereExpression) {
@@ -164,11 +167,14 @@ export class CollectionSubscriber<
164167

165168
subscription.setOrderByIndex(index)
166169

170+
// Normalize the orderBy clauses such that the references are relative to the collection
171+
const normalizedOrderBy = convertOrderByToBasicExpression(orderBy, this.collectionAlias)
172+
167173
// Load the first `offset + limit` values from the index
168174
// i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[
169175
subscription.requestLimitedSnapshot({
170176
limit: offset + limit,
171-
orderBy,
177+
orderBy: normalizedOrderBy,
172178
})
173179

174180
return subscription
@@ -234,9 +240,16 @@ export class CollectionSubscriber<
234240
const biggestSentValue = biggestSentRow
235241
? valueExtractorForRawRow(biggestSentRow)
236242
: biggestSentRow
243+
244+
// Normalize the orderBy clauses such that the references are relative to the collection
245+
const normalizedOrderBy = convertOrderByToBasicExpression(
246+
orderBy,
247+
this.collectionAlias
248+
)
249+
237250
// Take the `n` items after the biggest sent value
238251
subscription.requestLimitedSnapshot({
239-
orderBy,
252+
orderBy: normalizedOrderBy,
240253
limit: n,
241254
minValue: biggestSentValue,
242255
})

packages/electric-db-collection/src/electric.ts

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
import { compileSQL } from "./sql-compiler"
1616
import type {
1717
BaseCollectionConfig,
18+
Collection,
1819
CollectionConfig,
1920
DeleteMutationFnParams,
2021
Fn,
@@ -419,7 +420,10 @@ function createElectricSync<T extends Row<unknown>>(
419420
})
420421

421422
return {
422-
onLoadMore: (opts) => onLoadMore(stream, params, opts),
423+
onLoadMore: (opts) => {
424+
const snapshotParams = compileSQL<T>(opts)
425+
return stream.requestSnapshot(snapshotParams)
426+
},
423427
cleanup: () => {
424428
// Unsubscribe from the stream
425429
unsubscribeStream()
@@ -433,21 +437,3 @@ function createElectricSync<T extends Row<unknown>>(
433437
}
434438
}
435439

436-
async function onLoadMore<T extends Row<unknown>>(
437-
stream: ShapeStream<T>,
438-
syncParams: Parameters<SyncConfig<T>[`sync`]>[0],
439-
options: OnLoadMoreOptions
440-
) {
441-
const { begin, write, commit } = syncParams
442-
const snapshotParams = compileSQL<T>(options)
443-
const snapshot = await stream.requestSnapshot(snapshotParams)
444-
445-
begin()
446-
snapshot.data.forEach((row) => {
447-
write({
448-
type: `insert`,
449-
value: row.value,
450-
})
451-
})
452-
commit()
453-
}

packages/electric-db-collection/src/pg-serializer.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ export function serialize(value: unknown): string {
33
return `'${value}'`
44
}
55

6+
if (typeof value === `number`) {
7+
return value.toString()
8+
}
9+
610
if (value === null || value === undefined) {
711
return `NULL`
812
}
@@ -19,9 +23,5 @@ export function serialize(value: unknown): string {
1923
return `ARRAY[${value.map(serialize).join(`,`)}]`
2024
}
2125

22-
if (typeof value === `object`) {
23-
throw new Error(`Cannot serialize object: ${JSON.stringify(value)}`)
24-
}
25-
26-
return value.toString()
26+
throw new Error(`Cannot serialize value: ${JSON.stringify(value)}`)
2727
}

packages/electric-db-collection/src/sql-compiler.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export function compileSQL<T>(
3030

3131
// Serialize the values in the params array into PG formatted strings
3232
// and transform the array into a Record<string, string>
33+
console.log("params", params)
3334
const paramsRecord = params.reduce(
3435
(acc, param, index) => {
3536
acc[`${index + 1}`] = serialize(param)
@@ -58,7 +59,8 @@ function compileBasicExpression(
5859
case `val`:
5960
params.push(exp.value)
6061
return `$${params.length}`
61-
case `ref`:
62+
case `ref`:
63+
// TODO: doesn't yet support JSON(B) values which could be accessed with nested props
6264
if (exp.path.length !== 1) {
6365
throw new Error(
6466
`Compiler can't handle nested properties: ${exp.path.join(`.`)}`

packages/electric-db-collection/tests/electric-live-query.test.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,30 @@ const sampleUsers: Array<User> = [
5454

5555
// Mock the ShapeStream module
5656
const mockSubscribe = vi.fn()
57+
const mockRequestSnapshot = vi.fn()
5758
const mockStream = {
5859
subscribe: mockSubscribe,
60+
requestSnapshot: (...args: any) => {
61+
mockRequestSnapshot(...args)
62+
const results = mockRequestSnapshot.mock.results
63+
const lastResult = results[results.length - 1]!.value
64+
65+
const subscribers = mockSubscribe.mock.calls.map(args => args[0])
66+
subscribers.forEach(subscriber => subscriber(lastResult.data.map((row: any) => ({
67+
type: `insert`,
68+
value: row.value,
69+
key: row.key,
70+
}))))
71+
}
5972
}
6073

74+
// Mock the requestSnapshot method
75+
// to return an empty array of data
76+
// since most tests don't use it
77+
mockRequestSnapshot.mockResolvedValue({
78+
data: []
79+
})
80+
6181
vi.mock(`@electric-sql/client`, async () => {
6282
const actual = await vi.importActual(`@electric-sql/client`)
6383
return {
@@ -437,4 +457,98 @@ describe.each([
437457
// Clean up
438458
subscription.unsubscribe()
439459
})
460+
if (autoIndex === `eager`) {
461+
it.only(`should load more data via requestSnapshot when creating live query with higher limit`, async () => {
462+
// Reset mocks
463+
vi.clearAllMocks()
464+
mockRequestSnapshot.mockResolvedValue({
465+
data: [
466+
{ key: 5, value: { id: 5, name: `Eve`, age: 30, email: `[email protected]`, active: true } },
467+
{ key: 6, value: { id: 6, name: `Frank`, age: 35, email: `[email protected]`, active: true } },
468+
],
469+
})
470+
471+
// Initial sync with limited data
472+
simulateInitialSync()
473+
expect(electricCollection.status).toBe(`ready`)
474+
expect(electricCollection.size).toBe(4)
475+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(0)
476+
477+
// Create first live query with limit of 2
478+
const limitedLiveQuery = createLiveQueryCollection({
479+
id: `limited-users-live-query`,
480+
startSync: true,
481+
query: (q) =>
482+
q
483+
.from({ user: electricCollection })
484+
.where(({ user }) => eq(user.active, true))
485+
.select(({ user }) => ({
486+
id: user.id,
487+
name: user.name,
488+
active: user.active,
489+
age: user.age,
490+
}))
491+
.orderBy(({ user }) => user.age, `asc`)
492+
.limit(2),
493+
})
494+
495+
expect(limitedLiveQuery.status).toBe(`ready`)
496+
expect(limitedLiveQuery.size).toBe(2) // Only first 2 active users
497+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(1)
498+
499+
const callArgs = (index: number) => mockRequestSnapshot.mock.calls[index]?.[0]
500+
expect(callArgs(0)).toMatchObject({
501+
params: { "1": "true" },
502+
where: "active = $1",
503+
orderBy: "age NULLS FIRST",
504+
limit: 2,
505+
})
506+
507+
// Create second live query with higher limit of 5
508+
const expandedLiveQuery = createLiveQueryCollection({
509+
id: `expanded-users-live-query`,
510+
startSync: true,
511+
query: (q) =>
512+
q
513+
.from({ user: electricCollection })
514+
.where(({ user }) => eq(user.active, true))
515+
.select(({ user }) => ({
516+
id: user.id,
517+
name: user.name,
518+
active: user.active,
519+
}))
520+
.orderBy(({ user }) => user.age, `asc`)
521+
.limit(6),
522+
})
523+
524+
// Wait for the live query to process
525+
await new Promise((resolve) => setTimeout(resolve, 0))
526+
527+
// Verify that requestSnapshot was called with the correct parameters
528+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(3)
529+
530+
// Check that first it requested a limit of 6 users
531+
expect(callArgs(1)).toMatchObject({
532+
params: { "1": "true" },
533+
where: "active = $1",
534+
orderBy: "age NULLS FIRST",
535+
limit: 6,
536+
})
537+
538+
// After this initial snapshot for the new live query it receives all 3 users from the local collection
539+
// so it still needs 3 more users to reach the limit of 6 so it requests 3 more to the sync layer
540+
expect(callArgs(2)).toMatchObject({
541+
params: { "1": "true", "2": "25" },
542+
where: "active = $1 AND age > $2",
543+
orderBy: "age NULLS FIRST",
544+
limit: 3,
545+
})
546+
547+
// The sync layer won't provide any more users so the DB is exhausted and it stops (i.e. doesn't request more)
548+
549+
// The expanded live query should now have more data
550+
expect(expandedLiveQuery.status).toBe(`ready`)
551+
expect(expandedLiveQuery.size).toBe(5) // Alice, Bob, Dave from initial + Eve and Frank from additional data
552+
})
553+
}
440554
})

0 commit comments

Comments
 (0)