Skip to content

Commit 56b870b

Browse files
authored
Fix live query optimistic updates during long sync commits (#631)
* fix: keep live queries in sync during long optimistic commits * docs: note live-query optimistic fix in changeset * chore: document optimistic commit behaviour * test: consolidate live-query optimistic scenarios * chore: clarify forced emit batching comment * test: handle live query optimistic helpers safely
1 parent d27d32a commit 56b870b

File tree

5 files changed

+337
-8
lines changed

5 files changed

+337
-8
lines changed

.changeset/busy-olives-cover.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
Fix live queries getting stuck during long-running sync commits by always
6+
clearing the batching flag on forced emits, tolerating duplicate insert echoes,
7+
and allowing optimistic recomputes to run while commits are still applying. Adds
8+
regression coverage for concurrent optimistic inserts, queued updates, and the
9+
offline-transactions example to ensure everything stays in sync.

packages/db/src/collection/changes.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,20 @@ export class CollectionChangesManager<
6868
// Either we're not batching, or we're forcing emission (user action or ending batch cycle)
6969
let eventsToEmit = changes
7070

71-
// If we have batched events and this is a forced emit, combine them
72-
if (this.batchedEvents.length > 0 && forceEmit) {
73-
eventsToEmit = [...this.batchedEvents, ...changes]
71+
if (forceEmit) {
72+
// Force emit is used to end a batch (e.g. after a sync commit). Combine any
73+
// buffered optimistic events with the final changes so subscribers see the
74+
// whole picture, even if the sync diff is empty.
75+
if (this.batchedEvents.length > 0) {
76+
eventsToEmit = [...this.batchedEvents, ...changes]
77+
}
7478
this.batchedEvents = []
7579
this.shouldBatchEvents = false
7680
}
7781

78-
if (eventsToEmit.length === 0) return
82+
if (eventsToEmit.length === 0) {
83+
return
84+
}
7985

8086
// Emit to all listeners
8187
for (const subscription of this.changeSubscriptions) {

packages/db/src/collection/state.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,11 @@ export class CollectionStateManager<
217217
triggeredByUserAction: boolean = false
218218
): void {
219219
// Skip redundant recalculations when we're in the middle of committing sync transactions
220-
if (this.isCommittingSyncTransactions) {
220+
// While the sync pipeline is replaying a large batch we still want to honour
221+
// fresh optimistic mutations from the UI. Only skip recompute for the
222+
// internal sync-driven redraws; user-triggered work (triggeredByUserAction)
223+
// must run so live queries stay responsive during long commits.
224+
if (this.isCommittingSyncTransactions && !triggeredByUserAction) {
221225
return
222226
}
223227

packages/db/src/collection/sync.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
SyncTransactionAlreadyCommittedError,
88
SyncTransactionAlreadyCommittedWriteError,
99
} from "../errors"
10+
import { deepEquals } from "../utils"
1011
import type { StandardSchemaV1 } from "@standard-schema/spec"
1112
import type { ChangeMessage, CollectionConfig } from "../types"
1213
import type { CollectionImpl } from "./index.js"
@@ -84,6 +85,8 @@ export class CollectionSyncManager<
8485
}
8586
const key = this.config.getKey(messageWithoutKey.value)
8687

88+
let messageType = messageWithoutKey.type
89+
8790
// Check if an item with this key already exists when inserting
8891
if (messageWithoutKey.type === `insert`) {
8992
const insertingIntoExistingSynced = state.syncedData.has(key)
@@ -96,17 +99,29 @@ export class CollectionSyncManager<
9699
!hasPendingDeleteForKey &&
97100
!isTruncateTransaction
98101
) {
99-
throw new DuplicateKeySyncError(key, this.id)
102+
const existingValue = state.syncedData.get(key)
103+
if (
104+
existingValue !== undefined &&
105+
deepEquals(existingValue, messageWithoutKey.value)
106+
) {
107+
// The "insert" is an echo of a value we already have locally.
108+
// Treat it as an update so we preserve optimistic intent without
109+
// throwing a duplicate-key error during reconciliation.
110+
messageType = `update`
111+
} else {
112+
throw new DuplicateKeySyncError(key, this.id)
113+
}
100114
}
101115
}
102116

103117
const message: ChangeMessage<TOutput> = {
104118
...messageWithoutKey,
119+
type: messageType,
105120
key,
106121
}
107122
pendingTransaction.operations.push(message)
108123

109-
if (messageWithoutKey.type === `delete`) {
124+
if (messageType === `delete`) {
110125
pendingTransaction.deletedKeys.add(key)
111126
}
112127
},

0 commit comments

Comments
 (0)