Skip to content

Commit 874afb0

Browse files
committed
Fixes to electric collection + unit test
1 parent 3eb3319 commit 874afb0

File tree

4 files changed

+127
-25
lines changed

4 files changed

+127
-25
lines changed

packages/electric-db-collection/src/electric.ts

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
import { compileSQL } from "./sql-compiler"
1616
import type {
1717
BaseCollectionConfig,
18+
Collection,
1819
CollectionConfig,
1920
DeleteMutationFnParams,
2021
Fn,
@@ -419,7 +420,10 @@ function createElectricSync<T extends Row<unknown>>(
419420
})
420421

421422
return {
422-
onLoadMore: (opts) => onLoadMore(stream, params, opts),
423+
onLoadMore: (opts) => {
424+
const snapshotParams = compileSQL<T>(opts)
425+
return stream.requestSnapshot(snapshotParams)
426+
},
423427
cleanup: () => {
424428
// Unsubscribe from the stream
425429
unsubscribeStream()
@@ -433,21 +437,3 @@ function createElectricSync<T extends Row<unknown>>(
433437
}
434438
}
435439

436-
async function onLoadMore<T extends Row<unknown>>(
437-
stream: ShapeStream<T>,
438-
syncParams: Parameters<SyncConfig<T>[`sync`]>[0],
439-
options: OnLoadMoreOptions
440-
) {
441-
const { begin, write, commit } = syncParams
442-
const snapshotParams = compileSQL<T>(options)
443-
const snapshot = await stream.requestSnapshot(snapshotParams)
444-
445-
begin()
446-
snapshot.data.forEach((row) => {
447-
write({
448-
type: `insert`,
449-
value: row.value,
450-
})
451-
})
452-
commit()
453-
}

packages/electric-db-collection/src/pg-serializer.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ export function serialize(value: unknown): string {
33
return `'${value}'`
44
}
55

6+
if (typeof value === `number`) {
7+
return value.toString()
8+
}
9+
610
if (value === null || value === undefined) {
711
return `NULL`
812
}
@@ -19,9 +23,5 @@ export function serialize(value: unknown): string {
1923
return `ARRAY[${value.map(serialize).join(`,`)}]`
2024
}
2125

22-
if (typeof value === `object`) {
23-
throw new Error(`Cannot serialize object: ${JSON.stringify(value)}`)
24-
}
25-
26-
return value.toString()
26+
throw new Error(`Cannot serialize value: ${JSON.stringify(value)}`)
2727
}

packages/electric-db-collection/src/sql-compiler.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export function compileSQL<T>(
3030

3131
// Serialize the values in the params array into PG formatted strings
3232
// and transform the array into a Record<string, string>
33+
console.log("params", params)
3334
const paramsRecord = params.reduce(
3435
(acc, param, index) => {
3536
acc[`${index + 1}`] = serialize(param)
@@ -58,7 +59,8 @@ function compileBasicExpression(
5859
case `val`:
5960
params.push(exp.value)
6061
return `$${params.length}`
61-
case `ref`:
62+
case `ref`:
63+
// TODO: doesn't yet support JSON(B) values which could be accessed with nested props
6264
if (exp.path.length !== 1) {
6365
throw new Error(
6466
`Compiler can't handle nested properties: ${exp.path.join(`.`)}`

packages/electric-db-collection/tests/electric-live-query.test.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,30 @@ const sampleUsers: Array<User> = [
5454

5555
// Mock the ShapeStream module
5656
const mockSubscribe = vi.fn()
57+
const mockRequestSnapshot = vi.fn()
5758
const mockStream = {
5859
subscribe: mockSubscribe,
60+
requestSnapshot: (...args: any) => {
61+
mockRequestSnapshot(...args)
62+
const results = mockRequestSnapshot.mock.results
63+
const lastResult = results[results.length - 1]!.value
64+
65+
const subscribers = mockSubscribe.mock.calls.map(args => args[0])
66+
subscribers.forEach(subscriber => subscriber(lastResult.data.map((row: any) => ({
67+
type: `insert`,
68+
value: row.value,
69+
key: row.key,
70+
}))))
71+
}
5972
}
6073

74+
// Mock the requestSnapshot method
75+
// to return an empty array of data
76+
// since most tests don't use it
77+
mockRequestSnapshot.mockResolvedValue({
78+
data: []
79+
})
80+
6181
vi.mock(`@electric-sql/client`, async () => {
6282
const actual = await vi.importActual(`@electric-sql/client`)
6383
return {
@@ -437,4 +457,98 @@ describe.each([
437457
// Clean up
438458
subscription.unsubscribe()
439459
})
460+
if (autoIndex === `eager`) {
461+
it.only(`should load more data via requestSnapshot when creating live query with higher limit`, async () => {
462+
// Reset mocks
463+
vi.clearAllMocks()
464+
mockRequestSnapshot.mockResolvedValue({
465+
data: [
466+
{ key: 5, value: { id: 5, name: `Eve`, age: 30, email: `[email protected]`, active: true } },
467+
{ key: 6, value: { id: 6, name: `Frank`, age: 35, email: `[email protected]`, active: true } },
468+
],
469+
})
470+
471+
// Initial sync with limited data
472+
simulateInitialSync()
473+
expect(electricCollection.status).toBe(`ready`)
474+
expect(electricCollection.size).toBe(4)
475+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(0)
476+
477+
// Create first live query with limit of 2
478+
const limitedLiveQuery = createLiveQueryCollection({
479+
id: `limited-users-live-query`,
480+
startSync: true,
481+
query: (q) =>
482+
q
483+
.from({ user: electricCollection })
484+
.where(({ user }) => eq(user.active, true))
485+
.select(({ user }) => ({
486+
id: user.id,
487+
name: user.name,
488+
active: user.active,
489+
age: user.age,
490+
}))
491+
.orderBy(({ user }) => user.age, `asc`)
492+
.limit(2),
493+
})
494+
495+
expect(limitedLiveQuery.status).toBe(`ready`)
496+
expect(limitedLiveQuery.size).toBe(2) // Only first 2 active users
497+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(1)
498+
499+
const callArgs = (index: number) => mockRequestSnapshot.mock.calls[index]?.[0]
500+
expect(callArgs(0)).toMatchObject({
501+
params: { "1": "true" },
502+
where: "active = $1",
503+
orderBy: "age NULLS FIRST",
504+
limit: 2,
505+
})
506+
507+
// Create second live query with higher limit of 5
508+
const expandedLiveQuery = createLiveQueryCollection({
509+
id: `expanded-users-live-query`,
510+
startSync: true,
511+
query: (q) =>
512+
q
513+
.from({ user: electricCollection })
514+
.where(({ user }) => eq(user.active, true))
515+
.select(({ user }) => ({
516+
id: user.id,
517+
name: user.name,
518+
active: user.active,
519+
}))
520+
.orderBy(({ user }) => user.age, `asc`)
521+
.limit(6),
522+
})
523+
524+
// Wait for the live query to process
525+
await new Promise((resolve) => setTimeout(resolve, 0))
526+
527+
// Verify that requestSnapshot was called with the correct parameters
528+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(3)
529+
530+
// Check that first it requested a limit of 6 users
531+
expect(callArgs(1)).toMatchObject({
532+
params: { "1": "true" },
533+
where: "active = $1",
534+
orderBy: "age NULLS FIRST",
535+
limit: 6,
536+
})
537+
538+
// After this initial snapshot for the new live query it receives all 3 users from the local collection
539+
// so it still needs 3 more users to reach the limit of 6 so it requests 3 more to the sync layer
540+
expect(callArgs(2)).toMatchObject({
541+
params: { "1": "true", "2": "25" },
542+
where: "active = $1 AND age > $2",
543+
orderBy: "age NULLS FIRST",
544+
limit: 3,
545+
})
546+
547+
// The sync layer won't provide any more users so the DB is exhausted and it stops (i.e. doesn't request more)
548+
549+
// The expanded live query should now have more data
550+
expect(expandedLiveQuery.status).toBe(`ready`)
551+
expect(expandedLiveQuery.size).toBe(5) // Alice, Bob, Dave from initial + Eve and Frank from additional data
552+
})
553+
}
440554
})

0 commit comments

Comments
 (0)