Skip to content
25 changes: 25 additions & 0 deletions .changeset/poor-wasps-stand.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"@tanstack/electric-db-collection": minor
---

feat: Add flexible matching strategies for electric-db-collection (#402)

Add three matching strategies for client-server synchronization:

1. **Txid strategy** (existing, backward compatible) - Uses PostgreSQL transaction IDs for precise matching
2. **Custom match function strategy** (new) - Allows heuristic-based matching with custom logic
3. **Void/timeout strategy** (new, 3-second default) - Simple timeout for prototyping

**New Features:**

- New types: `MatchFunction<T>`, `MatchingStrategy<T>`
- Enhanced `ElectricCollectionConfig` to support all strategies
- New utility: `awaitMatch(matchFn, timeout?)`
- Export `isChangeMessage` and `isControlMessage` helpers for custom match functions

**Benefits:**

- Backward compatibility maintained - existing code works unchanged
- Architecture flexibility for different backend capabilities
- Progressive enhancement path - start with void strategy, upgrade to txid when ready
- No forced backend API changes - custom match functions work without backend modifications
172 changes: 164 additions & 8 deletions docs/collections/electric-collection.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,26 @@ The `electricCollectionOptions` function accepts the following options:

### Persistence Handlers

Handlers are called before mutations and support three different matching strategies:

- `onInsert`: Handler called before insert operations
- `onUpdate`: Handler called before update operations
- `onDelete`: Handler called before delete operations

## Persistence Handlers
Each handler can return:
- `{ txid: number | number[] }` - Txid strategy (recommended)
- `{ matchFn: (message) => boolean, timeout?: number }` - Custom match function strategy
- `{}` - Void strategy (3-second timeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we let a user return the timeout on the void strategy?

Suggested change
- `{}` - Void strategy (3-second timeout)
- `{ timeout?: number }` - Void strategy (default 3-second timeout)


## Persistence Handlers & Matching Strategies

Handlers can be defined to run on mutations. They are useful to send mutations to the backend and confirming them once Electric delivers the corresponding transactions. Until confirmation, TanStack DB blocks sync data for the collection to prevent race conditions. To avoid any delays, it's important to use a matching strategy.

Handlers can be defined to run on mutations. They are useful to send mutations to the backend and confirming them once Electric delivers the corresponding transactions. Until confirmation, TanStack DB blocks sync data for the collection to prevent race conditions. To avoid any delays, it’s important to use a matching strategy.
Electric collections support three matching strategies for synchronizing client mutations with server responses:

The most reliable strategy is for the backend to include the transaction ID (txid) in its response, allowing the client to match each mutation with Electric’s transaction identifiers for precise confirmation. If no strategy is provided, client mutations are automatically confirmed after three seconds.
### 1. Txid Strategy (Recommended)

The most reliable strategy uses PostgreSQL transaction IDs (txids) for precise matching. The backend returns a txid, and the client waits for that specific txid to appear in the Electric stream.

```typescript
const todosCollection = createCollection(
Expand All @@ -79,10 +90,78 @@ const todosCollection = createCollection(
const newItem = transaction.mutations[0].modified
const response = await api.todos.create(newItem)

// Txid strategy - most reliable
return { txid: response.txid }
},

// you can also implement onUpdate and onDelete handlers
onUpdate: async ({ transaction }) => {
const { original, changes } = transaction.mutations[0]
const response = await api.todos.update({
where: { id: original.id },
data: changes
})

return { txid: response.txid }
}
})
)
```

### 2. Custom Match Function Strategy

When txids aren't available, you can provide a custom function that examines Electric stream messages to determine when a mutation has been synchronized. This is useful for heuristic-based matching.

```typescript
import { isChangeMessage } from '@tanstack/electric-db-collection'

const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},

onInsert: async ({ transaction }) => {
const newItem = transaction.mutations[0].modified
await api.todos.create(newItem)

// Custom match function strategy
return {
matchFn: (message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === newItem.text
},
timeout: 10000 // Optional timeout in ms, defaults to 30000
}
}
})
)
```

### 3. Void Strategy (Timeout)

When neither txids nor reliable matching are possible, you can use the void strategy which simply waits a fixed timeout period (3 seconds by default). This is useful for prototyping or when you're confident about timing.

```typescript
const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},

onInsert: async ({ transaction }) => {
const newItem = transaction.mutations[0].modified
await api.todos.create(newItem)

// Void strategy - waits 3 seconds
return {}
}
})
)
```
Expand Down Expand Up @@ -162,7 +241,9 @@ export const ServerRoute = createServerFileRoute("/api/todos").methods({

## Optimistic Updates with Explicit Transactions

For more advanced use cases, you can create custom actions that can do multiple mutations across collections transactionally. In this case, you need to explicitly await for the transaction ID using `utils.awaitTxId()`.
For more advanced use cases, you can create custom actions that can do multiple mutations across collections transactionally. You can use the utility methods to wait for synchronization with different strategies:

### Using Txid Strategy

```typescript
const addTodoAction = createOptimisticAction({
Expand All @@ -184,19 +265,94 @@ const addTodoAction = createOptimisticAction({
data: { text, completed: false }
})

// Wait for the specific txid
await todosCollection.utils.awaitTxId(response.txid)
}
})
```

### Using Custom Match Function

```typescript
import { isChangeMessage } from '@tanstack/electric-db-collection'

const addTodoAction = createOptimisticAction({
onMutate: ({ text }) => {
const tempId = crypto.randomUUID()
todosCollection.insert({
id: tempId,
text,
completed: false,
created_at: new Date(),
})
},

mutationFn: async ({ text }) => {
await api.todos.create({
data: { text, completed: false }
})

// Wait for matching message
await todosCollection.utils.awaitMatch(
(message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === text
}
)
}
})
```

## Utility Methods

The collection provides these utility methods via `collection.utils`:

- `awaitTxId(txid, timeout?)`: Manually wait for a specific transaction ID to be synchronized
### `awaitTxId(txid, timeout?)`

Manually wait for a specific transaction ID to be synchronized:

```typescript
// Wait for specific txid
await todosCollection.utils.awaitTxId(12345)

// With custom timeout (default is 30 seconds)
await todosCollection.utils.awaitTxId(12345, 10000)
```

### `awaitMatch(matchFn, timeout?)`

Manually wait for a custom match function to find a matching message:

```typescript
todosCollection.utils.awaitTxId(12345)
import { isChangeMessage } from '@tanstack/electric-db-collection'

// Wait for a specific message pattern
await todosCollection.utils.awaitMatch(
(message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === 'New Todo'
},
5000 // timeout in ms
)
```

This is useful when you need to ensure a mutation has been synchronized before proceeding with other operations.
### Helper Functions

The package exports helper functions for use in custom match functions:

- `isChangeMessage(message)`: Check if a message is a data change (insert/update/delete)
- `isControlMessage(message)`: Check if a message is a control message (up-to-date, must-refetch)

```typescript
import { isChangeMessage, isControlMessage } from '@tanstack/electric-db-collection'

// Use in custom match functions
const matchFn = (message) => {
if (isChangeMessage(message)) {
return message.headers.operation === 'insert'
}
return false
}
```
12 changes: 8 additions & 4 deletions packages/db/src/collection/mutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ export class CollectionMutationsManager<

// Apply mutations to the new transaction
directOpTransaction.applyMutations(mutations)
directOpTransaction.commit()
// Errors still reject tx.isPersisted.promise; this catch only prevents global unhandled rejections
directOpTransaction.commit().catch(() => undefined)

// Add the transaction to the collection's transactions store
state.transactions.set(directOpTransaction.id, directOpTransaction)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the commit fails, do we still want to store this TX ID in the state.transactions?

Expand Down Expand Up @@ -387,7 +388,8 @@ export class CollectionMutationsManager<
const emptyTransaction = createTransaction({
mutationFn: async () => {},
})
emptyTransaction.commit()
// Errors still propagate through tx.isPersisted.promise; suppress the background commit from warning
emptyTransaction.commit().catch(() => undefined)
// Schedule cleanup for empty transaction
state.scheduleTransactionCleanup(emptyTransaction)
return emptyTransaction
Expand Down Expand Up @@ -423,7 +425,8 @@ export class CollectionMutationsManager<

// Apply mutations to the new transaction
directOpTransaction.applyMutations(mutations)
directOpTransaction.commit()
// Errors still hit tx.isPersisted.promise; avoid leaking an unhandled rejection from the fire-and-forget commit
directOpTransaction.commit().catch(() => undefined)

// Add the transaction to the collection's transactions store

Expand Down Expand Up @@ -524,7 +527,8 @@ export class CollectionMutationsManager<

// Apply mutations to the new transaction
directOpTransaction.applyMutations(mutations)
directOpTransaction.commit()
// Errors still reject tx.isPersisted.promise; silence the internal commit promise to prevent test noise
directOpTransaction.commit().catch(() => undefined)

state.transactions.set(directOpTransaction.id, directOpTransaction)
state.scheduleTransactionCleanup(directOpTransaction)
Expand Down
Loading
Loading