diff --git a/.changeset/poor-wasps-stand.md b/.changeset/poor-wasps-stand.md new file mode 100644 index 000000000..6a8747ef6 --- /dev/null +++ b/.changeset/poor-wasps-stand.md @@ -0,0 +1,25 @@ +--- +"@tanstack/electric-db-collection": minor +--- + +feat: Add flexible matching strategies for electric-db-collection (#402) + +Add three matching strategies for client-server synchronization: + +1. **Txid strategy** (existing, backward compatible) - Uses PostgreSQL transaction IDs for precise matching +2. **Custom match function strategy** (new) - Allows heuristic-based matching with custom logic +3. **Void/timeout strategy** (new, 3-second default) - Simple timeout for prototyping + +**New Features:** + +- New types: `MatchFunction`, `MatchingStrategy` +- Enhanced `ElectricCollectionConfig` to support all strategies +- New utility: `awaitMatch(matchFn, timeout?)` +- Export `isChangeMessage` and `isControlMessage` helpers for custom match functions + +**Benefits:** + +- Backward compatibility maintained - existing code works unchanged +- Architecture flexibility for different backend capabilities +- Progressive enhancement path - start with void strategy, upgrade to txid when ready +- No forced backend API changes - custom match functions work without backend modifications diff --git a/docs/collections/electric-collection.md b/docs/collections/electric-collection.md index c57336df3..388e42896 100644 --- a/docs/collections/electric-collection.md +++ b/docs/collections/electric-collection.md @@ -54,15 +54,26 @@ The `electricCollectionOptions` function accepts the following options: ### Persistence Handlers +Handlers are called before mutations and support three different matching strategies: + - `onInsert`: Handler called before insert operations - `onUpdate`: Handler called before update operations - `onDelete`: Handler called before delete operations -## Persistence Handlers +Each handler can return: +- `{ txid: number | number[] }` - Txid strategy (recommended) +- `{ matchFn: (message) => boolean, timeout?: number }` - Custom match function strategy +- `{}` - Void strategy (3-second timeout) + +## Persistence Handlers & Matching Strategies + +Handlers can be defined to run on mutations. They are useful to send mutations to the backend and confirming them once Electric delivers the corresponding transactions. Until confirmation, TanStack DB blocks sync data for the collection to prevent race conditions. To avoid any delays, it's important to use a matching strategy. -Handlers can be defined to run on mutations. They are useful to send mutations to the backend and confirming them once Electric delivers the corresponding transactions. Until confirmation, TanStack DB blocks sync data for the collection to prevent race conditions. To avoid any delays, it’s important to use a matching strategy. +Electric collections support three matching strategies for synchronizing client mutations with server responses: -The most reliable strategy is for the backend to include the transaction ID (txid) in its response, allowing the client to match each mutation with Electric’s transaction identifiers for precise confirmation. If no strategy is provided, client mutations are automatically confirmed after three seconds. +### 1. Txid Strategy (Recommended) + +The most reliable strategy uses PostgreSQL transaction IDs (txids) for precise matching. The backend returns a txid, and the client waits for that specific txid to appear in the Electric stream. ```typescript const todosCollection = createCollection( @@ -79,10 +90,78 @@ const todosCollection = createCollection( const newItem = transaction.mutations[0].modified const response = await api.todos.create(newItem) + // Txid strategy - most reliable return { txid: response.txid } }, - // you can also implement onUpdate and onDelete handlers + onUpdate: async ({ transaction }) => { + const { original, changes } = transaction.mutations[0] + const response = await api.todos.update({ + where: { id: original.id }, + data: changes + }) + + return { txid: response.txid } + } + }) +) +``` + +### 2. Custom Match Function Strategy + +When txids aren't available, you can provide a custom function that examines Electric stream messages to determine when a mutation has been synchronized. This is useful for heuristic-based matching. + +```typescript +import { isChangeMessage } from '@tanstack/electric-db-collection' + +const todosCollection = createCollection( + electricCollectionOptions({ + id: 'todos', + getKey: (item) => item.id, + shapeOptions: { + url: '/api/todos', + params: { table: 'todos' }, + }, + + onInsert: async ({ transaction }) => { + const newItem = transaction.mutations[0].modified + await api.todos.create(newItem) + + // Custom match function strategy + return { + matchFn: (message) => { + return isChangeMessage(message) && + message.headers.operation === 'insert' && + message.value.text === newItem.text + }, + timeout: 10000 // Optional timeout in ms, defaults to 30000 + } + } + }) +) +``` + +### 3. Void Strategy (Timeout) + +When neither txids nor reliable matching are possible, you can use the void strategy which simply waits a fixed timeout period (3 seconds by default). This is useful for prototyping or when you're confident about timing. + +```typescript +const todosCollection = createCollection( + electricCollectionOptions({ + id: 'todos', + getKey: (item) => item.id, + shapeOptions: { + url: '/api/todos', + params: { table: 'todos' }, + }, + + onInsert: async ({ transaction }) => { + const newItem = transaction.mutations[0].modified + await api.todos.create(newItem) + + // Void strategy - waits 3 seconds + return {} + } }) ) ``` @@ -162,7 +241,9 @@ export const ServerRoute = createServerFileRoute("/api/todos").methods({ ## Optimistic Updates with Explicit Transactions -For more advanced use cases, you can create custom actions that can do multiple mutations across collections transactionally. In this case, you need to explicitly await for the transaction ID using `utils.awaitTxId()`. +For more advanced use cases, you can create custom actions that can do multiple mutations across collections transactionally. You can use the utility methods to wait for synchronization with different strategies: + +### Using Txid Strategy ```typescript const addTodoAction = createOptimisticAction({ @@ -184,19 +265,94 @@ const addTodoAction = createOptimisticAction({ data: { text, completed: false } }) + // Wait for the specific txid await todosCollection.utils.awaitTxId(response.txid) } }) ``` +### Using Custom Match Function + +```typescript +import { isChangeMessage } from '@tanstack/electric-db-collection' + +const addTodoAction = createOptimisticAction({ + onMutate: ({ text }) => { + const tempId = crypto.randomUUID() + todosCollection.insert({ + id: tempId, + text, + completed: false, + created_at: new Date(), + }) + }, + + mutationFn: async ({ text }) => { + await api.todos.create({ + data: { text, completed: false } + }) + + // Wait for matching message + await todosCollection.utils.awaitMatch( + (message) => { + return isChangeMessage(message) && + message.headers.operation === 'insert' && + message.value.text === text + } + ) + } +}) +``` + ## Utility Methods The collection provides these utility methods via `collection.utils`: -- `awaitTxId(txid, timeout?)`: Manually wait for a specific transaction ID to be synchronized +### `awaitTxId(txid, timeout?)` + +Manually wait for a specific transaction ID to be synchronized: + +```typescript +// Wait for specific txid +await todosCollection.utils.awaitTxId(12345) + +// With custom timeout (default is 30 seconds) +await todosCollection.utils.awaitTxId(12345, 10000) +``` + +### `awaitMatch(matchFn, timeout?)` + +Manually wait for a custom match function to find a matching message: ```typescript -todosCollection.utils.awaitTxId(12345) +import { isChangeMessage } from '@tanstack/electric-db-collection' + +// Wait for a specific message pattern +await todosCollection.utils.awaitMatch( + (message) => { + return isChangeMessage(message) && + message.headers.operation === 'insert' && + message.value.text === 'New Todo' + }, + 5000 // timeout in ms +) ``` -This is useful when you need to ensure a mutation has been synchronized before proceeding with other operations. +### Helper Functions + +The package exports helper functions for use in custom match functions: + +- `isChangeMessage(message)`: Check if a message is a data change (insert/update/delete) +- `isControlMessage(message)`: Check if a message is a control message (up-to-date, must-refetch) + +```typescript +import { isChangeMessage, isControlMessage } from '@tanstack/electric-db-collection' + +// Use in custom match functions +const matchFn = (message) => { + if (isChangeMessage(message)) { + return message.headers.operation === 'insert' + } + return false +} +``` diff --git a/packages/db/src/collection/mutations.ts b/packages/db/src/collection/mutations.ts index 0e8cbf03a..278234f47 100644 --- a/packages/db/src/collection/mutations.ts +++ b/packages/db/src/collection/mutations.ts @@ -230,7 +230,8 @@ export class CollectionMutationsManager< // Apply mutations to the new transaction directOpTransaction.applyMutations(mutations) - directOpTransaction.commit() + // Errors still reject tx.isPersisted.promise; this catch only prevents global unhandled rejections + directOpTransaction.commit().catch(() => undefined) // Add the transaction to the collection's transactions store state.transactions.set(directOpTransaction.id, directOpTransaction) @@ -387,7 +388,8 @@ export class CollectionMutationsManager< const emptyTransaction = createTransaction({ mutationFn: async () => {}, }) - emptyTransaction.commit() + // Errors still propagate through tx.isPersisted.promise; suppress the background commit from warning + emptyTransaction.commit().catch(() => undefined) // Schedule cleanup for empty transaction state.scheduleTransactionCleanup(emptyTransaction) return emptyTransaction @@ -423,7 +425,8 @@ export class CollectionMutationsManager< // Apply mutations to the new transaction directOpTransaction.applyMutations(mutations) - directOpTransaction.commit() + // Errors still hit tx.isPersisted.promise; avoid leaking an unhandled rejection from the fire-and-forget commit + directOpTransaction.commit().catch(() => undefined) // Add the transaction to the collection's transactions store @@ -524,7 +527,8 @@ export class CollectionMutationsManager< // Apply mutations to the new transaction directOpTransaction.applyMutations(mutations) - directOpTransaction.commit() + // Errors still reject tx.isPersisted.promise; silence the internal commit promise to prevent test noise + directOpTransaction.commit().catch(() => undefined) state.transactions.set(directOpTransaction.id, directOpTransaction) state.scheduleTransactionCleanup(directOpTransaction) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 3dcb54b64..f9c603d77 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -7,17 +7,14 @@ import { import { Store } from "@tanstack/store" import DebugModule from "debug" import { - ElectricDeleteHandlerMustReturnTxIdError, - ElectricInsertHandlerMustReturnTxIdError, - ElectricUpdateHandlerMustReturnTxIdError, ExpectedNumberInAwaitTxIdError, + StreamAbortedError, + TimeoutWaitingForMatchError, TimeoutWaitingForTxIdError, } from "./errors" import type { - BaseCollectionConfig, CollectionConfig, DeleteMutationFnParams, - Fn, InsertMutationFnParams, SyncConfig, UpdateMutationFnParams, @@ -33,6 +30,9 @@ import type { ShapeStreamOptions, } from "@electric-sql/client" +// Re-export for user convenience in custom match functions +export { isChangeMessage, isControlMessage } from "@electric-sql/client" + const debug = DebugModule.debug(`ts/db:electric`) /** @@ -41,14 +41,24 @@ const debug = DebugModule.debug(`ts/db:electric`) export type Txid = number /** - * Type representing the result of an insert, update, or delete handler + * Custom match function type - receives stream messages and returns boolean + * indicating if the mutation has been synchronized */ -type MaybeTxId = - | { - txid?: Txid | Array - } - | undefined - | null +export type MatchFunction> = ( + message: Message +) => boolean + +/** + * Matching strategies for Electric synchronization + * Handlers can return one of three strategies: + * - Txid strategy: { txid: number | number[] } + * - Custom match strategy: { matchFn: (message) => boolean, timeout?: number } + * - Void strategy: { timeout?: number } (when neither txid nor matchFn provided) + */ +export type MatchingStrategy = Row> = + | { txid: Txid | Array } + | { matchFn: MatchFunction; timeout?: number } + | { timeout?: number } /** * Type representing a snapshot end message @@ -56,7 +66,6 @@ type MaybeTxId = type SnapshotEndMessage = ControlMessage & { headers: { control: `snapshot-end` } } - // The `InferSchemaOutput` and `ResolveType` are copied from the `@tanstack/db` package // but we modified `InferSchemaOutput` slightly to restrict the schema output to `Row` // This is needed in order for `GetExtensions` to be able to infer the parser extensions type from the schema @@ -74,17 +83,148 @@ type InferSchemaOutput = T extends StandardSchemaV1 export interface ElectricCollectionConfig< T extends Row = Row, TSchema extends StandardSchemaV1 = never, -> extends BaseCollectionConfig< - T, - string | number, - TSchema, - Record, - { txid: Txid | Array } - > { +> { /** * Configuration options for the ElectricSQL ShapeStream */ shapeOptions: ShapeStreamOptions> + + /** + * All standard Collection configuration properties + */ + id?: string + schema?: TSchema + getKey: CollectionConfig[`getKey`] + sync?: CollectionConfig[`sync`] + + /** + * Optional asynchronous handler function called before an insert operation + * Can return different matching strategies for synchronization + * @param params Object containing transaction and collection information + * @returns Promise resolving to a matching strategy + * @example + * // Basic Electric insert handler with txid matching (backward compatible) + * onInsert: async ({ transaction }) => { + * const newItem = transaction.mutations[0].modified + * const result = await api.todos.create({ + * data: newItem + * }) + * return { txid: result.txid } // Txid strategy (backward compatible) + * } + * + * @example + * // Custom match function strategy + * onInsert: async ({ transaction }) => { + * const newItem = transaction.mutations[0].modified + * await api.todos.create({ data: newItem }) + * return { + * matchFn: (message) => { + * return isChangeMessage(message) && + * message.headers.operation === 'insert' && + * message.value.name === newItem.name + * }, + * timeout: 5000 // Optional timeout in ms, defaults to 3000 + * } + * } + * + * @example + * // Void strategy - always waits 3 seconds + * onInsert: async ({ transaction }) => { + * const newItem = transaction.mutations[0].modified + * await api.todos.create({ data: newItem }) + * return {} // Void strategy + * } + * + * @example + * // Insert handler with multiple items - return array of txids + * onInsert: async ({ transaction }) => { + * const items = transaction.mutations.map(m => m.modified) + * const results = await Promise.all( + * items.map(item => api.todos.create({ data: item })) + * ) + * return { txid: results.map(r => r.txid) } // Array of txids + * } + */ + onInsert?: (params: InsertMutationFnParams) => Promise> + + /** + * Optional asynchronous handler function called before an update operation + * Can return different matching strategies for synchronization + * @param params Object containing transaction and collection information + * @returns Promise resolving to a matching strategy + * @example + * // Basic Electric update handler with txid matching (backward compatible) + * onUpdate: async ({ transaction }) => { + * const { original, changes } = transaction.mutations[0] + * const result = await api.todos.update({ + * where: { id: original.id }, + * data: changes + * }) + * return { txid: result.txid } // Txid strategy (backward compatible) + * } + * + * @example + * // Custom match function strategy for updates + * onUpdate: async ({ transaction }) => { + * const { original, changes } = transaction.mutations[0] + * await api.todos.update({ where: { id: original.id }, data: changes }) + * return { + * matchFn: (message) => { + * return isChangeMessage(message) && + * message.headers.operation === 'update' && + * message.value.id === original.id + * } + * } + * } + * + * @example + * // Void strategy - always waits 3 seconds + * onUpdate: async ({ transaction }) => { + * const { original, changes } = transaction.mutations[0] + * await api.todos.update({ where: { id: original.id }, data: changes }) + * return {} // Void strategy + * } + */ + onUpdate?: (params: UpdateMutationFnParams) => Promise> + + /** + * Optional asynchronous handler function called before a delete operation + * Can return different matching strategies for synchronization + * @param params Object containing transaction and collection information + * @returns Promise resolving to a matching strategy + * @example + * // Basic Electric delete handler with txid matching (backward compatible) + * onDelete: async ({ transaction }) => { + * const mutation = transaction.mutations[0] + * const result = await api.todos.delete({ + * id: mutation.original.id + * }) + * return { txid: result.txid } // Txid strategy (backward compatible) + * } + * + * @example + * // Custom match function strategy for deletes + * onDelete: async ({ transaction }) => { + * const mutation = transaction.mutations[0] + * await api.todos.delete({ id: mutation.original.id }) + * return { + * matchFn: (message) => { + * return isChangeMessage(message) && + * message.headers.operation === 'delete' && + * message.value.id === mutation.original.id + * } + * } + * } + * + * @example + * // Void strategy - always waits 3 seconds + * onDelete: async ({ transaction }) => { + * const mutation = transaction.mutations[0] + * await api.todos.delete({ id: mutation.original.id }) + * return {} // Void strategy + * } + */ + onDelete?: (params: DeleteMutationFnParams) => Promise> } function isUpToDateMessage>( @@ -125,11 +265,21 @@ function hasTxids>( */ export type AwaitTxIdFn = (txId: Txid, timeout?: number) => Promise +/** + * Type for the awaitMatch utility function + */ +export type AwaitMatchFn> = ( + matchFn: MatchFunction, + timeout?: number +) => Promise + /** * Electric collection utilities type */ -export interface ElectricCollectionUtils extends UtilsRecord { +export interface ElectricCollectionUtils = Row> + extends UtilsRecord { awaitTxId: AwaitTxIdFn + awaitMatch: AwaitMatchFn } /** @@ -173,20 +323,37 @@ export function electricCollectionOptions( } { const seenTxids = new Store>(new Set([])) const seenSnapshots = new Store>([]) + const pendingMatches = new Store< + Map< + string, + { + matchFn: (message: Message) => boolean + resolve: (value: boolean) => void + reject: (error: Error) => void + timeoutId: ReturnType + matched: boolean + } + > + >(new Map()) + + // Buffer messages since last up-to-date to handle race conditions + const currentBatchMessages = new Store>>([]) const sync = createElectricSync(config.shapeOptions, { seenTxids, seenSnapshots, + pendingMatches, + currentBatchMessages, }) /** * Wait for a specific transaction ID to be synced * @param txId The transaction ID to wait for as a number - * @param timeout Optional timeout in milliseconds (defaults to 30000ms) + * @param timeout Optional timeout in milliseconds (defaults to 3000ms) * @returns Promise that resolves when the txId is synced */ const awaitTxId: AwaitTxIdFn = async ( txId: Txid, - timeout: number = 30000 + timeout: number = 3000 ): Promise => { debug(`awaitTxId called with txid %d`, txId) if (typeof txId !== `number`) { @@ -239,49 +406,149 @@ export function electricCollectionOptions( }) } - // Create wrapper handlers for direct persistence operations that handle txid awaiting - const wrappedOnInsert = config.onInsert - ? async (params: InsertMutationFnParams) => { - // Runtime check (that doesn't follow type) + /** + * Wait for a custom match function to find a matching message + * @param matchFn Function that returns true when a message matches + * @param timeout Optional timeout in milliseconds (defaults to 3000ms) + * @returns Promise that resolves when a matching message is found + */ + const awaitMatch: AwaitMatchFn = async ( + matchFn: MatchFunction, + timeout: number = 3000 + ): Promise => { + debug(`awaitMatch called with custom function`) - const handlerResult = - ((await config.onInsert!(params)) as MaybeTxId) ?? {} - const txid = handlerResult.txid + return new Promise((resolve, reject) => { + const matchId = Math.random().toString(36) + + const cleanupMatch = () => { + pendingMatches.setState((current) => { + const newMatches = new Map(current) + newMatches.delete(matchId) + return newMatches + }) + } + + const onTimeout = () => { + cleanupMatch() + reject(new TimeoutWaitingForMatchError()) + } - if (!txid) { - throw new ElectricInsertHandlerMustReturnTxIdError() + const timeoutId = setTimeout(onTimeout, timeout) + + // We need access to the stream messages to check against the match function + // This will be handled by the sync configuration + const checkMatch = (message: Message) => { + if (matchFn(message)) { + debug(`awaitMatch found matching message, waiting for up-to-date`) + // Mark as matched but don't resolve yet - wait for up-to-date + pendingMatches.setState((current) => { + const newMatches = new Map(current) + const existing = newMatches.get(matchId) + if (existing) { + newMatches.set(matchId, { ...existing, matched: true }) + } + return newMatches + }) + return true } + return false + } - // Handle both single txid and array of txids - if (Array.isArray(txid)) { - await Promise.all(txid.map((id) => awaitTxId(id))) - } else { - await awaitTxId(txid) + // Check against current batch messages first to handle race conditions + for (const message of currentBatchMessages.state) { + if (checkMatch(message)) { + debug( + `awaitMatch found immediate match in current batch, waiting for up-to-date` + ) + // Mark as matched and register for up-to-date resolution + pendingMatches.setState((current) => { + const newMatches = new Map(current) + newMatches.set(matchId, { + matchFn: checkMatch, + resolve, + reject, + timeoutId, + matched: true, // Already matched + }) + return newMatches + }) + return } + } + + // Store the match function for the sync process to use + // We'll add this to a pending matches store + pendingMatches.setState((current) => { + const newMatches = new Map(current) + newMatches.set(matchId, { + matchFn: checkMatch, + resolve, + reject, + timeoutId, + matched: false, + }) + return newMatches + }) + }) + } + + /** + * Wait for a fixed timeout (void strategy) + * @param timeout Timeout in milliseconds (defaults to 3000ms for void strategy) + * @returns Promise that resolves after the timeout + */ + const awaitVoid = async (timeout: number = 3000): Promise => { + debug(`awaitVoid called with timeout %dms`, timeout) + return new Promise((resolve) => { + setTimeout(() => { + debug(`awaitVoid completed after %dms`, timeout) + resolve(true) + }, timeout) + }) + } + + /** + * Process matching strategy and wait for synchronization + */ + const processMatchingStrategy = async ( + result: MatchingStrategy + ): Promise => { + // Check for txid strategy (backward compatible) + if (`txid` in result) { + // Handle both single txid and array of txids + if (Array.isArray(result.txid)) { + await Promise.all(result.txid.map((id) => awaitTxId(id))) + } else { + await awaitTxId(result.txid) + } + return + } + + // Check for custom match function strategy + if (`matchFn` in result) { + await awaitMatch(result.matchFn, result.timeout) + return + } + // Void strategy with configurable timeout + const timeout = result.timeout ?? 3000 + await awaitVoid(timeout) + } + + // Create wrapper handlers for direct persistence operations that handle different matching strategies + const wrappedOnInsert = config.onInsert + ? async (params: InsertMutationFnParams) => { + const handlerResult = await config.onInsert!(params) + await processMatchingStrategy(handlerResult) return handlerResult } : undefined const wrappedOnUpdate = config.onUpdate ? async (params: UpdateMutationFnParams) => { - // Runtime check (that doesn't follow type) - - const handlerResult = - ((await config.onUpdate!(params)) as MaybeTxId) ?? {} - const txid = handlerResult.txid - - if (!txid) { - throw new ElectricUpdateHandlerMustReturnTxIdError() - } - - // Handle both single txid and array of txids - if (Array.isArray(txid)) { - await Promise.all(txid.map((id) => awaitTxId(id))) - } else { - await awaitTxId(txid) - } - + const handlerResult = await config.onUpdate!(params) + await processMatchingStrategy(handlerResult) return handlerResult } : undefined @@ -289,17 +556,7 @@ export function electricCollectionOptions( const wrappedOnDelete = config.onDelete ? async (params: DeleteMutationFnParams) => { const handlerResult = await config.onDelete!(params) - if (!handlerResult.txid) { - throw new ElectricDeleteHandlerMustReturnTxIdError() - } - - // Handle both single txid and array of txids - if (Array.isArray(handlerResult.txid)) { - await Promise.all(handlerResult.txid.map((id) => awaitTxId(id))) - } else { - await awaitTxId(handlerResult.txid) - } - + await processMatchingStrategy(handlerResult) return handlerResult } : undefined @@ -321,7 +578,8 @@ export function electricCollectionOptions( onDelete: wrappedOnDelete, utils: { awaitTxId, - }, + awaitMatch, + } as ElectricCollectionUtils, } } @@ -333,10 +591,24 @@ function createElectricSync>( options: { seenTxids: Store> seenSnapshots: Store> + pendingMatches: Store< + Map< + string, + { + matchFn: (message: Message) => boolean + resolve: (value: boolean) => void + reject: (error: Error) => void + timeoutId: ReturnType + matched: boolean + } + > + > + currentBatchMessages: Store>> } ): SyncConfig { - const { seenTxids } = options - const { seenSnapshots } = options + const { seenTxids, seenSnapshots, pendingMatches, currentBatchMessages } = + options + const MAX_BATCH_MESSAGES = 1000 // Safety limit for message buffer // Store for the relation schema information const relationSchema = new Store(undefined) @@ -380,6 +652,17 @@ function createElectricSync>( } } + // Cleanup pending matches on abort + abortController.signal.addEventListener(`abort`, () => { + pendingMatches.setState((current) => { + current.forEach((match) => { + clearTimeout(match.timeoutId) + match.reject(new StreamAbortedError()) + }) + return new Map() // Clear all pending matches + }) + }) + const stream = new ShapeStream({ ...shapeOptions, signal: abortController.signal, @@ -413,11 +696,51 @@ function createElectricSync>( let hasUpToDate = false for (const message of messages) { + // Add message to current batch buffer (for race condition handling) + if (isChangeMessage(message)) { + currentBatchMessages.setState((currentBuffer) => { + const newBuffer = [...currentBuffer, message] + // Limit buffer size for safety + if (newBuffer.length > MAX_BATCH_MESSAGES) { + newBuffer.splice(0, newBuffer.length - MAX_BATCH_MESSAGES) + } + return newBuffer + }) + } + // Check for txids in the message and add them to our store if (hasTxids(message)) { message.headers.txids?.forEach((txid) => newTxids.add(txid)) } + // Check pending matches against this message + // Note: matchFn will mark matches internally, we don't resolve here + const matchesToRemove: Array = [] + pendingMatches.state.forEach((match, matchId) => { + if (!match.matched) { + try { + match.matchFn(message) + } catch (err) { + // If matchFn throws, clean up and reject the promise + clearTimeout(match.timeoutId) + match.reject( + err instanceof Error ? err : new Error(String(err)) + ) + matchesToRemove.push(matchId) + debug(`matchFn error: %o`, err) + } + } + }) + + // Remove matches that errored + if (matchesToRemove.length > 0) { + pendingMatches.setState((current) => { + const newMatches = new Map(current) + matchesToRemove.forEach((id) => newMatches.delete(id)) + return newMatches + }) + } + if (isChangeMessage(message)) { // Check if the message contains schema information const schema = message.headers.schema @@ -462,6 +785,9 @@ function createElectricSync>( } if (hasUpToDate) { + // Clear the current batch buffer since we're now up-to-date + currentBatchMessages.setState(() => []) + // Commit transaction if one was started if (transactionStarted) { commit() @@ -491,6 +817,26 @@ function createElectricSync>( newSnapshots.length = 0 return seen }) + + // Resolve all matched pending matches on up-to-date + const matchesToResolve: Array = [] + pendingMatches.state.forEach((match, matchId) => { + if (match.matched) { + clearTimeout(match.timeoutId) + match.resolve(true) + matchesToResolve.push(matchId) + debug(`awaitMatch resolved on up-to-date for match %s`, matchId) + } + }) + + // Remove resolved matches + if (matchesToResolve.length > 0) { + pendingMatches.setState((current) => { + const newMatches = new Map(current) + matchesToResolve.forEach((id) => newMatches.delete(id)) + return newMatches + }) + } } }) diff --git a/packages/electric-db-collection/src/errors.ts b/packages/electric-db-collection/src/errors.ts index 6d289668b..40b9966e0 100644 --- a/packages/electric-db-collection/src/errors.ts +++ b/packages/electric-db-collection/src/errors.ts @@ -22,29 +22,16 @@ export class TimeoutWaitingForTxIdError extends ElectricDBCollectionError { } } -export class ElectricInsertHandlerMustReturnTxIdError extends ElectricDBCollectionError { +export class TimeoutWaitingForMatchError extends ElectricDBCollectionError { constructor() { - super( - `Electric collection onInsert handler must return a txid or array of txids` - ) - this.name = `ElectricInsertHandlerMustReturnTxIdError` + super(`Timeout waiting for custom match function`) + this.name = `TimeoutWaitingForMatchError` } } -export class ElectricUpdateHandlerMustReturnTxIdError extends ElectricDBCollectionError { +export class StreamAbortedError extends ElectricDBCollectionError { constructor() { - super( - `Electric collection onUpdate handler must return a txid or array of txids` - ) - this.name = `ElectricUpdateHandlerMustReturnTxIdError` - } -} - -export class ElectricDeleteHandlerMustReturnTxIdError extends ElectricDBCollectionError { - constructor() { - super( - `Electric collection onDelete handler must return a txid or array of txids` - ) - this.name = `ElectricDeleteHandlerMustReturnTxIdError` + super(`Stream aborted`) + this.name = `StreamAbortedError` } } diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index ba21e5c6d..21b2898c0 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -4,7 +4,7 @@ import { createCollection, createTransaction, } from "@tanstack/db" -import { electricCollectionOptions } from "../src/electric" +import { electricCollectionOptions, isChangeMessage } from "../src/electric" import type { ElectricCollectionUtils } from "../src/electric" import type { Collection, @@ -539,9 +539,9 @@ describe(`Electric Integration`, () => { const options = electricCollectionOptions(config) // Call the wrapped handler and expect it to throw - await expect(options.onInsert!(mockParams)).rejects.toThrow( - `Electric collection onInsert handler must return a txid` - ) + // With the new matching strategies, empty object triggers void strategy (3-second wait) + // So we expect it to resolve, not throw + await expect(options.onInsert!(mockParams)).resolves.not.toThrow() }) it(`should simulate complete flow with direct persistence handlers`, async () => { @@ -643,6 +643,359 @@ describe(`Electric Integration`, () => { }) expect(testCollection._state.syncedData.size).toEqual(1) }) + + it(`should support void strategy when handler returns empty object`, async () => { + vi.useFakeTimers() + + const onInsert = vi.fn().mockResolvedValue({}) + + const config = { + id: `test-void-strategy`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Insert with void strategy - should complete after 3 seconds with fake timers + const tx = testCollection.insert({ id: 1, name: `Void Test` }) + + // Use runOnlyPendingTimers to execute the timeout + await vi.runOnlyPendingTimersAsync() + + await expect(tx.isPersisted.promise).resolves.toBeDefined() + expect(onInsert).toHaveBeenCalled() + + vi.useRealTimers() + }) + + it(`should support custom match function strategy`, async () => { + let resolveCustomMatch: () => void + const customMatchPromise = new Promise((resolve) => { + resolveCustomMatch = resolve + }) + + const onInsert = vi.fn().mockImplementation(({ transaction }) => { + const item = transaction.mutations[0].modified + return { + matchFn: (message: any) => { + if ( + isChangeMessage(message) && + message.headers.operation === `insert` && + message.value.name === item.name + ) { + resolveCustomMatch() + return true + } + return false + }, + timeout: 5000, + } + }) + + const config = { + id: `test-custom-match`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Start insert - will wait for custom match + const insertPromise = testCollection.insert({ + id: 1, + name: `Custom Match Test`, + }) + + // Wait a moment then send matching message + setTimeout(() => { + subscriber([ + { + key: `1`, + value: { id: 1, name: `Custom Match Test` }, + headers: { operation: `insert` }, + }, + { headers: { control: `up-to-date` } }, + ]) + }, 100) + + // Wait for both the custom match and persistence + await Promise.all([customMatchPromise, insertPromise.isPersisted.promise]) + + expect(onInsert).toHaveBeenCalled() + expect(testCollection.has(1)).toBe(true) + }) + + it(`should timeout with custom match function when no match found`, async () => { + vi.useFakeTimers() + + const onInsert = vi.fn().mockResolvedValue({ + matchFn: () => false, // Never matches + timeout: 1, // Short timeout for test + }) + + const config = { + id: `test-timeout`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + const tx = testCollection.insert({ id: 1, name: `Timeout Test` }) + + // Add catch handler to prevent global unhandled rejection detection + tx.isPersisted.promise.catch(() => {}) + + // Advance timers to trigger timeout + await vi.runOnlyPendingTimersAsync() + + // Should timeout and fail + await expect(tx.isPersisted.promise).rejects.toThrow() + + vi.useRealTimers() + }) + }) + + // Tests for matching strategies utilities + describe(`Matching strategies utilities`, () => { + it(`should export isChangeMessage helper for custom match functions`, () => { + expect(typeof isChangeMessage).toBe(`function`) + + // Test with a change message + const changeMessage = { + key: `1`, + value: { id: 1, name: `Test` }, + headers: { operation: `insert` as const }, + } + expect(isChangeMessage(changeMessage)).toBe(true) + + // Test with a control message + const controlMessage = { + headers: { control: `up-to-date` as const }, + } + expect(isChangeMessage(controlMessage)).toBe(false) + }) + + it(`should provide awaitMatch utility in collection utils`, () => { + const config = { + id: `test-await-match`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + getKey: (item: Row) => item.id as number, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + expect(typeof testCollection.utils.awaitMatch).toBe(`function`) + }) + + it(`should support multiple matching strategies in different handlers`, () => { + const onInsert = vi.fn().mockResolvedValue({ txid: 100 }) // Txid strategy + const onUpdate = vi + .fn() + .mockImplementation(() => Promise.resolve({ timeout: 1500 })) // Void strategy with custom timeout + const onDelete = vi.fn().mockResolvedValue({ + // Custom match strategy + matchFn: (message: any) => + isChangeMessage(message) && message.headers.operation === `delete`, + }) + + const config = { + id: `test-mixed-strategies`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + getKey: (item: Row) => item.id as number, + onInsert, + onUpdate, + onDelete, + } + + const options = electricCollectionOptions(config) + + // All handlers should be wrapped properly + expect(options.onInsert).toBeDefined() + expect(options.onUpdate).toBeDefined() + expect(options.onDelete).toBeDefined() + }) + + it(`should cleanup pending matches on timeout without memory leaks`, async () => { + vi.useFakeTimers() + + const onInsert = vi.fn().mockResolvedValue({ + matchFn: () => false, // Never matches + timeout: 1, // Short timeout for test + }) + + const config = { + id: `test-cleanup`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Start insert that will timeout + const tx = testCollection.insert({ id: 1, name: `Timeout Test` }) + + // Add catch handler to prevent global unhandled rejection detection + tx.isPersisted.promise.catch(() => {}) + + // Advance timers to trigger timeout + await vi.runOnlyPendingTimersAsync() + + // Should timeout and fail + await expect(tx.isPersisted.promise).rejects.toThrow( + `Timeout waiting for custom match function` + ) + + // Send a message after timeout - should not cause any side effects + // This verifies that the pending match was properly cleaned up + expect(() => { + subscriber([ + { + key: `1`, + value: { id: 1, name: `Timeout Test` }, + headers: { operation: `insert` }, + }, + { headers: { control: `up-to-date` } }, + ]) + }).not.toThrow() + + vi.useRealTimers() + }) + + it(`should wait for up-to-date after custom match (commit semantics)`, async () => { + let matchFound = false + let persistenceCompleted = false + + const onInsert = vi.fn().mockImplementation(({ transaction }) => { + const item = transaction.mutations[0].modified + return Promise.resolve({ + matchFn: (message: any) => { + if ( + isChangeMessage(message) && + message.headers.operation === `insert` && + message.value.name === item.name + ) { + matchFound = true + return true + } + return false + }, + timeout: 5000, + }) + }) + + const config = { + id: `test-commit-semantics`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Start insert + const insertPromise = testCollection.insert({ + id: 1, + name: `Commit Test`, + }) + + // Set up persistence completion tracking + insertPromise.isPersisted.promise.then(() => { + persistenceCompleted = true + }) + + // Give a moment for handler setup + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Send matching message (should match but not complete persistence yet) + subscriber([ + { + key: `1`, + value: { id: 1, name: `Commit Test` }, + headers: { operation: `insert` }, + }, + ]) + + // Give time for match to be processed + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Verify match was found but persistence not yet completed + expect(matchFound).toBe(true) + expect(persistenceCompleted).toBe(false) + + // Now send up-to-date (should complete persistence) + subscriber([{ headers: { control: `up-to-date` } }]) + + // Wait for persistence to complete + await insertPromise.isPersisted.promise + + // Verify persistence completed after up-to-date + expect(persistenceCompleted).toBe(true) + expect(testCollection._state.syncedData.has(1)).toBe(true) + }) + + it(`should support configurable timeout for void strategy`, async () => { + vi.useFakeTimers() + + const customTimeout = 500 // Custom short timeout + + const onInsert = vi.fn().mockResolvedValue({ + timeout: customTimeout, // Void strategy with custom timeout + }) + + const config = { + id: `test-void-timeout`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Insert with custom void timeout + const tx = testCollection.insert({ id: 1, name: `Custom Timeout Test` }) + + // Use runOnlyPendingTimers to execute the timeout + await vi.runOnlyPendingTimersAsync() + + await expect(tx.isPersisted.promise).resolves.toBeDefined() + expect(onInsert).toHaveBeenCalled() + + vi.useRealTimers() + }) }) // Tests for Electric stream lifecycle management