diff --git a/.changeset/tender-carpets-cheat.md b/.changeset/tender-carpets-cheat.md new file mode 100644 index 00000000..77c9dfd7 --- /dev/null +++ b/.changeset/tender-carpets-cheat.md @@ -0,0 +1,5 @@ +--- +"@tanstack/electric-db-collection": patch +--- + +Handle predicates that are pushed down. diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 3dcb54b6..019e3f96 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -13,6 +13,7 @@ import { ExpectedNumberInAwaitTxIdError, TimeoutWaitingForTxIdError, } from "./errors" +import { compileSQL } from "./sql-compiler" import type { BaseCollectionConfig, CollectionConfig, @@ -494,12 +495,17 @@ function createElectricSync>( } }) - // Return the unsubscribe function - return () => { - // Unsubscribe from the stream - unsubscribeStream() - // Abort the abort controller to stop the stream - abortController.abort() + return { + onLoadMore: async (opts) => { + const snapshotParams = compileSQL(opts) + await stream.requestSnapshot(snapshotParams) + }, + cleanup: () => { + // Unsubscribe from the stream + unsubscribeStream() + // Abort the abort controller to stop the stream + abortController.abort() + }, } }, // Expose the getSyncMetadata function diff --git a/packages/electric-db-collection/src/pg-serializer.ts b/packages/electric-db-collection/src/pg-serializer.ts new file mode 100644 index 00000000..707c4e1b --- /dev/null +++ b/packages/electric-db-collection/src/pg-serializer.ts @@ -0,0 +1,27 @@ +export function serialize(value: unknown): string { + if (typeof value === `string`) { + return `'${value}'` + } + + if (typeof value === `number`) { + return value.toString() + } + + if (value === null || value === undefined) { + return `NULL` + } + + if (typeof value === `boolean`) { + return value ? `true` : `false` + } + + if (value instanceof Date) { + return `'${value.toISOString()}'` + } + + if (Array.isArray(value)) { + return `ARRAY[${value.map(serialize).join(`,`)}]` + } + + throw new Error(`Cannot serialize value: ${JSON.stringify(value)}`) +} diff --git a/packages/electric-db-collection/src/sql-compiler.ts b/packages/electric-db-collection/src/sql-compiler.ts new file mode 100644 index 00000000..421c2ab7 --- /dev/null +++ b/packages/electric-db-collection/src/sql-compiler.ts @@ -0,0 +1,163 @@ +import { serialize } from "./pg-serializer" +import type { SubsetParams } from "@electric-sql/client" +import type { IR, OnLoadMoreOptions } from "@tanstack/db" + +export type CompiledSqlRecord = Omit & { + params?: Array +} + +export function compileSQL(options: OnLoadMoreOptions): SubsetParams { + const { where, orderBy, limit } = options + + const params: Array = [] + const compiledSQL: CompiledSqlRecord = { params } + + if (where) { + // TODO: this only works when the where expression's PropRefs directly reference a column of the collection + // doesn't work if it goes through aliases because then we need to know the entire query to be able to follow the reference until the base collection (cf. followRef function) + compiledSQL.where = compileBasicExpression(where, params) + } + + if (orderBy) { + compiledSQL.orderBy = compileOrderBy(orderBy, params) + } + + if (limit) { + compiledSQL.limit = limit + } + + // Serialize the values in the params array into PG formatted strings + // and transform the array into a Record + const paramsRecord = params.reduce( + (acc, param, index) => { + acc[`${index + 1}`] = serialize(param) + return acc + }, + {} as Record + ) + + return { + ...compiledSQL, + params: paramsRecord, + } +} + +/** + * Compiles the expression to a SQL string and mutates the params array with the values. + * @param exp - The expression to compile + * @param params - The params array + * @returns The compiled SQL string + */ +function compileBasicExpression( + exp: IR.BasicExpression, + params: Array +): string { + switch (exp.type) { + case `val`: + params.push(exp.value) + return `$${params.length}` + case `ref`: + // TODO: doesn't yet support JSON(B) values which could be accessed with nested props + if (exp.path.length !== 1) { + throw new Error( + `Compiler can't handle nested properties: ${exp.path.join(`.`)}` + ) + } + return exp.path[0]! + case `func`: + return compileFunction(exp, params) + default: + throw new Error(`Unknown expression type`) + } +} + +function compileOrderBy(orderBy: IR.OrderBy, params: Array): string { + const compiledOrderByClauses = orderBy.map((clause: IR.OrderByClause) => + compileOrderByClause(clause, params) + ) + return compiledOrderByClauses.join(`,`) +} + +function compileOrderByClause( + clause: IR.OrderByClause, + params: Array +): string { + // TODO: what to do with stringSort and locale? + // Correctly supporting them is tricky as it depends on Postgres' collation + const { expression, compareOptions } = clause + let sql = compileBasicExpression(expression, params) + + if (compareOptions.direction === `desc`) { + sql = `${sql} DESC` + } + + if (compareOptions.nulls === `first`) { + sql = `${sql} NULLS FIRST` + } + + if (compareOptions.nulls === `last`) { + sql = `${sql} NULLS LAST` + } + + return sql +} + +function compileFunction( + exp: IR.Func, + params: Array = [] +): string { + const { name, args } = exp + + const opName = getOpName(name) + + const compiledArgs = args.map((arg: IR.BasicExpression) => + compileBasicExpression(arg, params) + ) + + if (isBinaryOp(name)) { + if (compiledArgs.length !== 2) { + throw new Error(`Binary operator ${name} expects 2 arguments`) + } + const [lhs, rhs] = compiledArgs + return `${lhs} ${opName} ${rhs}` + } + + return `${opName}(${compiledArgs.join(`,`)})` +} + +function isBinaryOp(name: string): boolean { + const binaryOps = [`eq`, `gt`, `gte`, `lt`, `lte`, `and`, `or`] + return binaryOps.includes(name) +} + +function getOpName(name: string): string { + const opNames = { + eq: `=`, + gt: `>`, + gte: `>=`, + lt: `<`, + lte: `<=`, + add: `+`, + and: `AND`, + or: `OR`, + not: `NOT`, + isUndefined: `IS NULL`, + isNull: `IS NULL`, + in: `IN`, + like: `LIKE`, + ilike: `ILIKE`, + upper: `UPPER`, + lower: `LOWER`, + length: `LENGTH`, + concat: `CONCAT`, + coalesce: `COALESCE`, + } + + const opName = opNames[name as keyof typeof opNames] + + if (!opName) { + throw new Error(`Unknown operator/function: ${name}`) + } + + return opName +} diff --git a/packages/electric-db-collection/tests/electric-live-query.test.ts b/packages/electric-db-collection/tests/electric-live-query.test.ts index b387f175..48b5aef0 100644 --- a/packages/electric-db-collection/tests/electric-live-query.test.ts +++ b/packages/electric-db-collection/tests/electric-live-query.test.ts @@ -54,10 +54,39 @@ const sampleUsers: Array = [ // Mock the ShapeStream module const mockSubscribe = vi.fn() +const mockRequestSnapshot = vi.fn() const mockStream = { subscribe: mockSubscribe, + requestSnapshot: async (...args: any) => { + const result = await mockRequestSnapshot(...args) + const subscribers = mockSubscribe.mock.calls.map((args) => args[0]) + const data = [...result.data] + + const messages: Array> = data.map((row: any) => ({ + value: row.value, + key: row.key, + headers: row.headers, + })) + + if (messages.length > 0) { + // add an up-to-date message + messages.push({ + headers: { control: `up-to-date` }, + }) + } + + subscribers.forEach((subscriber) => subscriber(messages)) + return result + }, } +// Mock the requestSnapshot method +// to return an empty array of data +// since most tests don't use it +mockRequestSnapshot.mockResolvedValue({ + data: [], +}) + vi.mock(`@electric-sql/client`, async () => { const actual = await vi.importActual(`@electric-sql/client`) return { @@ -437,4 +466,131 @@ describe.each([ // Clean up subscription.unsubscribe() }) + if (autoIndex === `eager`) { + it(`should load more data via requestSnapshot when creating live query with higher limit`, async () => { + mockRequestSnapshot.mockResolvedValue({ + data: [], + }) + + // Initial sync with limited data + simulateInitialSync() + expect(electricCollection.status).toBe(`ready`) + expect(electricCollection.size).toBe(4) + expect(mockRequestSnapshot).toHaveBeenCalledTimes(0) + + // Create first live query with limit of 2 + const limitedLiveQuery = createLiveQueryCollection({ + id: `limited-users-live-query`, + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, true)) + .select(({ user }) => ({ + id: user.id, + name: user.name, + active: user.active, + age: user.age, + })) + .orderBy(({ user }) => user.age, `asc`) + .limit(2), + }) + + expect(limitedLiveQuery.status).toBe(`ready`) + expect(limitedLiveQuery.size).toBe(2) // Only first 2 active users + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + + const callArgs = (index: number) => + mockRequestSnapshot.mock.calls[index]?.[0] + expect(callArgs(0)).toMatchObject({ + params: { "1": `true` }, + where: `active = $1`, + orderBy: `age NULLS FIRST`, + limit: 2, + }) + + // Next call will return a snapshot containing 2 rows + // Calls after that will return the default empty snapshot + mockRequestSnapshot.mockResolvedValueOnce({ + data: [ + { + headers: { operation: `insert` }, + key: 5, + value: { + id: 5, + name: `Eve`, + age: 30, + email: `eve@example.com`, + active: true, + }, + }, + { + headers: { operation: `insert` }, + key: 6, + value: { + id: 6, + name: `Frank`, + age: 35, + email: `frank@example.com`, + active: true, + }, + }, + ], + }) + + // Create second live query with higher limit of 6 + const expandedLiveQuery = createLiveQueryCollection({ + id: `expanded-users-live-query`, + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, true)) + .select(({ user }) => ({ + id: user.id, + name: user.name, + active: user.active, + })) + .orderBy(({ user }) => user.age, `asc`) + .limit(6), + }) + + // Wait for the live query to process + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Verify that requestSnapshot was called with the correct parameters + expect(mockRequestSnapshot).toHaveBeenCalledTimes(4) + + // Check that first it requested a limit of 6 users + expect(callArgs(1)).toMatchObject({ + params: { "1": `true` }, + where: `active = $1`, + orderBy: `age NULLS FIRST`, + limit: 6, + }) + + // After this initial snapshot for the new live query it receives all 3 users from the local collection + // so it still needs 3 more users to reach the limit of 6 so it requests 3 more to the sync layer + expect(callArgs(2)).toMatchObject({ + params: { "1": `true`, "2": `25` }, + where: `active = $1 AND age > $2`, + orderBy: `age NULLS FIRST`, + limit: 3, + }) + + // The previous snapshot returned 2 more users so it still needs 1 more user to reach the limit of 6 + expect(callArgs(3)).toMatchObject({ + params: { "1": `true`, "2": `35` }, + where: `active = $1 AND age > $2`, + orderBy: `age NULLS FIRST`, + limit: 1, + }) + + // The sync layer won't provide any more users so the DB is exhausted and it stops (i.e. doesn't request more) + + // The expanded live query should now have more data + expect(expandedLiveQuery.status).toBe(`ready`) + expect(expandedLiveQuery.size).toBe(5) // Alice, Bob, Dave from initial + Eve and Frank from additional data + }) + } })