Skip to content

Commit 69a6d2d

Browse files
authored
support for electric must-refetch and truncate method to sync hander (#412)
1 parent 68538b4 commit 69a6d2d

File tree

8 files changed

+938
-7
lines changed

8 files changed

+938
-7
lines changed

.changeset/db-sync-methods.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
Add a new truncate method to the sync handler to enable a collections state to be reset from a sync transaction.

.changeset/electric-must-refetch.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/electric-db-collection": patch
3+
---
4+
5+
Add must-refetch message handling to clear synced data and re-sync collection data from server.

packages/db/src/collection.ts

Lines changed: 154 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ import type { BaseIndex, IndexResolver } from "./indexes/base-index.js"
6565
interface PendingSyncedTransaction<T extends object = Record<string, unknown>> {
6666
committed: boolean
6767
operations: Array<OptimisticChangeMessage<T>>
68+
truncate?: boolean
6869
}
6970

7071
/**
@@ -559,11 +560,16 @@ export class CollectionImpl<
559560

560561
// Check if an item with this key already exists when inserting
561562
if (messageWithoutKey.type === `insert`) {
563+
const insertingIntoExistingSynced = this.syncedData.has(key)
564+
const hasPendingDeleteForKey = pendingTransaction.operations.some(
565+
(op) => op.key === key && op.type === `delete`
566+
)
567+
const isTruncateTransaction = pendingTransaction.truncate === true
568+
// Allow insert after truncate in the same transaction even if it existed in syncedData
562569
if (
563-
this.syncedData.has(key) &&
564-
!pendingTransaction.operations.some(
565-
(op) => op.key === key && op.type === `delete`
566-
)
570+
insertingIntoExistingSynced &&
571+
!hasPendingDeleteForKey &&
572+
!isTruncateTransaction
567573
) {
568574
throw new DuplicateKeySyncError(key, this.id)
569575
}
@@ -600,6 +606,28 @@ export class CollectionImpl<
600606
markReady: () => {
601607
this.markReady()
602608
},
609+
truncate: () => {
610+
const pendingTransaction =
611+
this.pendingSyncedTransactions[
612+
this.pendingSyncedTransactions.length - 1
613+
]
614+
if (!pendingTransaction) {
615+
throw new NoPendingSyncTransactionWriteError()
616+
}
617+
if (pendingTransaction.committed) {
618+
throw new SyncTransactionAlreadyCommittedWriteError()
619+
}
620+
621+
// Clear all operations from the current transaction
622+
pendingTransaction.operations = []
623+
624+
// Mark the transaction as a truncate operation. During commit, this triggers:
625+
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
626+
// - Clearing of syncedData/syncedMetadata
627+
// - Subsequent synced ops applied on the fresh base
628+
// - Finally, optimistic mutations re-applied on top (single batch)
629+
pendingTransaction.truncate = true
630+
},
603631
})
604632

605633
// Store cleanup function if provided
@@ -1149,7 +1177,11 @@ export class CollectionImpl<
11491177
}
11501178
}
11511179

1152-
if (!hasPersistingTransaction) {
1180+
const hasTruncateSync = this.pendingSyncedTransactions.some(
1181+
(t) => t.truncate === true
1182+
)
1183+
1184+
if (!hasPersistingTransaction || hasTruncateSync) {
11531185
// Set flag to prevent redundant optimistic state recalculations
11541186
this.isCommittingSyncTransactions = true
11551187

@@ -1179,6 +1211,28 @@ export class CollectionImpl<
11791211
const rowUpdateMode = this.config.sync.rowUpdateMode || `partial`
11801212

11811213
for (const transaction of this.pendingSyncedTransactions) {
1214+
// Handle truncate operations first
1215+
if (transaction.truncate) {
1216+
// TRUNCATE PHASE
1217+
// 1) Emit a delete for every currently-synced key so downstream listeners/indexes
1218+
// observe a clear-before-rebuild. We intentionally skip keys already in
1219+
// optimisticDeletes because their delete was previously emitted by the user.
1220+
for (const key of this.syncedData.keys()) {
1221+
if (this.optimisticDeletes.has(key)) continue
1222+
const previousValue =
1223+
this.optimisticUpserts.get(key) || this.syncedData.get(key)
1224+
if (previousValue !== undefined) {
1225+
events.push({ type: `delete`, key, value: previousValue })
1226+
}
1227+
}
1228+
1229+
// 2) Clear the authoritative synced base. Subsequent server ops in this
1230+
// same commit will rebuild the base atomically.
1231+
this.syncedData.clear()
1232+
this.syncedMetadata.clear()
1233+
this.syncedKeys.clear()
1234+
}
1235+
11821236
for (const operation of transaction.operations) {
11831237
const key = operation.key as TKey
11841238
this.syncedKeys.add(key)
@@ -1228,7 +1282,101 @@ export class CollectionImpl<
12281282
}
12291283
}
12301284

1231-
// Clear optimistic state since sync operations will now provide the authoritative data
1285+
// After applying synced operations, if this commit included a truncate,
1286+
// re-apply optimistic mutations on top of the fresh synced base. This ensures
1287+
// the UI preserves local intent while respecting server rebuild semantics.
1288+
// Ordering: deletes (above) -> server ops (just applied) -> optimistic upserts.
1289+
const hadTruncate = this.pendingSyncedTransactions.some(
1290+
(t) => t.truncate === true
1291+
)
1292+
if (hadTruncate) {
1293+
// Avoid duplicating keys that were inserted/updated by synced operations in this commit
1294+
const syncedInsertedOrUpdatedKeys = new Set<TKey>()
1295+
for (const t of this.pendingSyncedTransactions) {
1296+
for (const op of t.operations) {
1297+
if (op.type === `insert` || op.type === `update`) {
1298+
syncedInsertedOrUpdatedKeys.add(op.key as TKey)
1299+
}
1300+
}
1301+
}
1302+
1303+
// Build re-apply sets from ACTIVE optimistic transactions against the new synced base
1304+
// We do not copy maps; we compute intent directly from transactions to avoid drift.
1305+
const reapplyUpserts = new Map<TKey, T>()
1306+
const reapplyDeletes = new Set<TKey>()
1307+
1308+
for (const tx of this.transactions.values()) {
1309+
if ([`completed`, `failed`].includes(tx.state)) continue
1310+
for (const mutation of tx.mutations) {
1311+
if (mutation.collection !== this || !mutation.optimistic) continue
1312+
const key = mutation.key as TKey
1313+
switch (mutation.type) {
1314+
case `insert`:
1315+
reapplyUpserts.set(key, mutation.modified as T)
1316+
reapplyDeletes.delete(key)
1317+
break
1318+
case `update`: {
1319+
const base = this.syncedData.get(key)
1320+
const next = base
1321+
? (Object.assign({}, base, mutation.changes) as T)
1322+
: (mutation.modified as T)
1323+
reapplyUpserts.set(key, next)
1324+
reapplyDeletes.delete(key)
1325+
break
1326+
}
1327+
case `delete`:
1328+
reapplyUpserts.delete(key)
1329+
reapplyDeletes.add(key)
1330+
break
1331+
}
1332+
}
1333+
}
1334+
1335+
// Emit inserts for re-applied upserts, skipping any keys that have an optimistic delete.
1336+
// If the server also inserted/updated the same key in this batch, override that value
1337+
// with the optimistic value to preserve local intent.
1338+
for (const [key, value] of reapplyUpserts) {
1339+
if (reapplyDeletes.has(key)) continue
1340+
if (syncedInsertedOrUpdatedKeys.has(key)) {
1341+
let foundInsert = false
1342+
for (let i = events.length - 1; i >= 0; i--) {
1343+
const evt = events[i]!
1344+
if (evt.key === key && evt.type === `insert`) {
1345+
evt.value = value
1346+
foundInsert = true
1347+
break
1348+
}
1349+
}
1350+
if (!foundInsert) {
1351+
events.push({ type: `insert`, key, value })
1352+
}
1353+
} else {
1354+
events.push({ type: `insert`, key, value })
1355+
}
1356+
}
1357+
1358+
// Finally, ensure we do NOT insert keys that have an outstanding optimistic delete.
1359+
if (events.length > 0 && reapplyDeletes.size > 0) {
1360+
const filtered: Array<ChangeMessage<T, TKey>> = []
1361+
for (const evt of events) {
1362+
if (evt.type === `insert` && reapplyDeletes.has(evt.key)) {
1363+
continue
1364+
}
1365+
filtered.push(evt)
1366+
}
1367+
events.length = 0
1368+
events.push(...filtered)
1369+
}
1370+
1371+
// Ensure listeners are active before emitting this critical batch
1372+
if (!this.isReady()) {
1373+
this.setStatus(`ready`)
1374+
}
1375+
}
1376+
1377+
// Maintain optimistic state appropriately
1378+
// Clear optimistic state since sync operations will now provide the authoritative data.
1379+
// Any still-active user transactions will be re-applied below in recompute.
12321380
this.optimisticUpserts.clear()
12331381
this.optimisticDeletes.clear()
12341382

packages/db/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ export interface SyncConfig<
207207
write: (message: Omit<ChangeMessage<T>, `key`>) => void
208208
commit: () => void
209209
markReady: () => void
210+
truncate: () => void
210211
}) => void
211212

212213
/**

0 commit comments

Comments
 (0)