Skip to content

Commit a8624c8

Browse files
authored
electric-db-collection: support for snapshots in awaitTxid (#648)
* first pass at adding support for snapshots to awaitTxid * add tests * fix cleanup * changeset * address review
1 parent 1d71b21 commit a8624c8

File tree

5 files changed

+406
-21
lines changed

5 files changed

+406
-21
lines changed

.changeset/eager-wings-jog.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/electric-db-collection": patch
3+
---
4+
5+
The awaitTxId utility now resolves transaction IDs based on snapshot-end message metadata (xmin, xmax, xip_list) in addition to explicit txid arrays, enabling matching on the initial snapshot at the start of a new shape.

packages/electric-db-collection/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
"description": "ElectricSQL collection for TanStack DB",
44
"version": "0.1.28",
55
"dependencies": {
6-
"@standard-schema/spec": "^1.0.0",
76
"@electric-sql/client": "^1.0.14",
7+
"@standard-schema/spec": "^1.0.0",
88
"@tanstack/db": "workspace:*",
99
"@tanstack/store": "^0.7.7",
1010
"debug": "^4.4.3"

packages/electric-db-collection/src/electric.ts

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
ShapeStream,
33
isChangeMessage,
44
isControlMessage,
5+
isVisibleInSnapshot,
56
} from "@electric-sql/client"
67
import { Store } from "@tanstack/store"
78
import DebugModule from "debug"
@@ -27,6 +28,7 @@ import type {
2728
ControlMessage,
2829
GetExtensions,
2930
Message,
31+
PostgresSnapshot,
3032
Row,
3133
ShapeStreamOptions,
3234
} from "@electric-sql/client"
@@ -38,6 +40,23 @@ const debug = DebugModule.debug(`ts/db:electric`)
3840
*/
3941
export type Txid = number
4042

43+
/**
44+
* Type representing the result of an insert, update, or delete handler
45+
*/
46+
type MaybeTxId =
47+
| {
48+
txid?: Txid | Array<Txid>
49+
}
50+
| undefined
51+
| null
52+
53+
/**
54+
* Type representing a snapshot end message
55+
*/
56+
type SnapshotEndMessage = ControlMessage & {
57+
headers: { control: `snapshot-end` }
58+
}
59+
4160
// The `InferSchemaOutput` and `ResolveType` are copied from the `@tanstack/db` package
4261
// but we modified `InferSchemaOutput` slightly to restrict the schema output to `Row<unknown>`
4362
// This is needed in order for `GetExtensions` to be able to infer the parser extensions type from the schema
@@ -80,6 +99,20 @@ function isMustRefetchMessage<T extends Row<unknown>>(
8099
return isControlMessage(message) && message.headers.control === `must-refetch`
81100
}
82101

102+
function isSnapshotEndMessage<T extends Row<unknown>>(
103+
message: Message<T>
104+
): message is SnapshotEndMessage {
105+
return isControlMessage(message) && message.headers.control === `snapshot-end`
106+
}
107+
108+
function parseSnapshotMessage(message: SnapshotEndMessage): PostgresSnapshot {
109+
return {
110+
xmin: message.headers.xmin,
111+
xmax: message.headers.xmax,
112+
xip_list: message.headers.xip_list,
113+
}
114+
}
115+
83116
// Check if a message contains txids in its headers
84117
function hasTxids<T extends Row<unknown>>(
85118
message: Message<T>
@@ -139,8 +172,10 @@ export function electricCollectionOptions(
139172
schema?: any
140173
} {
141174
const seenTxids = new Store<Set<Txid>>(new Set([]))
175+
const seenSnapshots = new Store<Array<PostgresSnapshot>>([])
142176
const sync = createElectricSync<any>(config.shapeOptions, {
143177
seenTxids,
178+
seenSnapshots,
144179
})
145180

146181
/**
@@ -158,20 +193,46 @@ export function electricCollectionOptions(
158193
throw new ExpectedNumberInAwaitTxIdError(typeof txId)
159194
}
160195

196+
// First check if the txid is in the seenTxids store
161197
const hasTxid = seenTxids.state.has(txId)
162198
if (hasTxid) return true
163199

200+
// Then check if the txid is in any of the seen snapshots
201+
const hasSnapshot = seenSnapshots.state.some((snapshot) =>
202+
isVisibleInSnapshot(txId, snapshot)
203+
)
204+
if (hasSnapshot) return true
205+
164206
return new Promise((resolve, reject) => {
165207
const timeoutId = setTimeout(() => {
166-
unsubscribe()
208+
unsubscribeSeenTxids()
209+
unsubscribeSeenSnapshots()
167210
reject(new TimeoutWaitingForTxIdError(txId))
168211
}, timeout)
169212

170-
const unsubscribe = seenTxids.subscribe(() => {
213+
const unsubscribeSeenTxids = seenTxids.subscribe(() => {
171214
if (seenTxids.state.has(txId)) {
172215
debug(`awaitTxId found match for txid %o`, txId)
173216
clearTimeout(timeoutId)
174-
unsubscribe()
217+
unsubscribeSeenTxids()
218+
unsubscribeSeenSnapshots()
219+
resolve(true)
220+
}
221+
})
222+
223+
const unsubscribeSeenSnapshots = seenSnapshots.subscribe(() => {
224+
const visibleSnapshot = seenSnapshots.state.find((snapshot) =>
225+
isVisibleInSnapshot(txId, snapshot)
226+
)
227+
if (visibleSnapshot) {
228+
debug(
229+
`awaitTxId found match for txid %o in snapshot %o`,
230+
txId,
231+
visibleSnapshot
232+
)
233+
clearTimeout(timeoutId)
234+
unsubscribeSeenSnapshots()
235+
unsubscribeSeenTxids()
175236
resolve(true)
176237
}
177238
})
@@ -183,8 +244,9 @@ export function electricCollectionOptions(
183244
? async (params: InsertMutationFnParams<any>) => {
184245
// Runtime check (that doesn't follow type)
185246

186-
const handlerResult = (await config.onInsert!(params)) ?? {}
187-
const txid = (handlerResult as { txid?: Txid | Array<Txid> }).txid
247+
const handlerResult =
248+
((await config.onInsert!(params)) as MaybeTxId) ?? {}
249+
const txid = handlerResult.txid
188250

189251
if (!txid) {
190252
throw new ElectricInsertHandlerMustReturnTxIdError()
@@ -205,8 +267,9 @@ export function electricCollectionOptions(
205267
? async (params: UpdateMutationFnParams<any>) => {
206268
// Runtime check (that doesn't follow type)
207269

208-
const handlerResult = (await config.onUpdate!(params)) ?? {}
209-
const txid = (handlerResult as { txid?: Txid | Array<Txid> }).txid
270+
const handlerResult =
271+
((await config.onUpdate!(params)) as MaybeTxId) ?? {}
272+
const txid = handlerResult.txid
210273

211274
if (!txid) {
212275
throw new ElectricUpdateHandlerMustReturnTxIdError()
@@ -269,9 +332,11 @@ function createElectricSync<T extends Row<unknown>>(
269332
shapeOptions: ShapeStreamOptions<GetExtensions<T>>,
270333
options: {
271334
seenTxids: Store<Set<Txid>>
335+
seenSnapshots: Store<Array<PostgresSnapshot>>
272336
}
273337
): SyncConfig<T> {
274338
const { seenTxids } = options
339+
const { seenSnapshots } = options
275340

276341
// Store for the relation schema information
277342
const relationSchema = new Store<string | undefined>(undefined)
@@ -342,6 +407,7 @@ function createElectricSync<T extends Row<unknown>>(
342407
})
343408
let transactionStarted = false
344409
const newTxids = new Set<Txid>()
410+
const newSnapshots: Array<PostgresSnapshot> = []
345411

346412
unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
347413
let hasUpToDate = false
@@ -373,6 +439,8 @@ function createElectricSync<T extends Row<unknown>>(
373439
...message.headers,
374440
},
375441
})
442+
} else if (isSnapshotEndMessage(message)) {
443+
newSnapshots.push(parseSnapshotMessage(message))
376444
} else if (isUpToDateMessage(message)) {
377445
hasUpToDate = true
378446
} else if (isMustRefetchMessage(message)) {
@@ -413,6 +481,16 @@ function createElectricSync<T extends Row<unknown>>(
413481
newTxids.clear()
414482
return clonedSeen
415483
})
484+
485+
// Always commit snapshots when we receive up-to-date, regardless of transaction state
486+
seenSnapshots.setState((currentSnapshots) => {
487+
const seen = [...currentSnapshots, ...newSnapshots]
488+
newSnapshots.forEach((snapshot) =>
489+
debug(`new snapshot synced from pg %o`, snapshot)
490+
)
491+
newSnapshots.length = 0
492+
return seen
493+
})
416494
}
417495
})
418496

0 commit comments

Comments
 (0)