Skip to content

Commit 7556fb6

Browse files
authored
fix race condition that resulted in live queries becoming stuck (#650)
* add failing test * fix * changeset
1 parent a8624c8 commit 7556fb6

File tree

3 files changed

+136
-6
lines changed

3 files changed

+136
-6
lines changed

.changeset/nasty-stars-take.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
Fixed race condition which could result in a live query throwing and becoming stuck after multiple mutations complete asynchronously.

packages/db/src/collection/state.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -825,9 +825,6 @@ export class CollectionStateManager<
825825
public capturePreSyncVisibleState(): void {
826826
if (this.pendingSyncedTransactions.length === 0) return
827827

828-
// Clear any previous capture
829-
this.preSyncVisibleState.clear()
830-
831828
// Get all keys that will be affected by sync operations
832829
const syncedKeys = new Set<TKey>()
833830
for (const transaction of this.pendingSyncedTransactions) {
@@ -843,10 +840,13 @@ export class CollectionStateManager<
843840

844841
// Only capture current visible state for keys that will be affected by sync operations
845842
// This is much more efficient than capturing the entire collection state
843+
// Only capture keys that haven't been captured yet to preserve earlier captures
846844
for (const key of syncedKeys) {
847-
const currentValue = this.get(key)
848-
if (currentValue !== undefined) {
849-
this.preSyncVisibleState.set(key, currentValue)
845+
if (!this.preSyncVisibleState.has(key)) {
846+
const currentValue = this.get(key)
847+
if (currentValue !== undefined) {
848+
this.preSyncVisibleState.set(key, currentValue)
849+
}
850850
}
851851
}
852852
}

packages/db/tests/collection-subscribe-changes.test.ts

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type {
99
ChangesPayload,
1010
MutationFn,
1111
PendingMutation,
12+
SyncConfig,
1213
} from "../src/types"
1314

1415
// Helper function to wait for changes to be processed
@@ -1516,4 +1517,128 @@ describe(`Collection.subscribeChanges`, () => {
15161517

15171518
subscription.unsubscribe()
15181519
})
1520+
1521+
it(`should not emit duplicate insert events when onInsert delays sync write`, async () => {
1522+
vi.useFakeTimers()
1523+
1524+
try {
1525+
const changeEvents: Array<any> = []
1526+
let syncOps:
1527+
| Parameters<
1528+
SyncConfig<{ id: string; n: number; foo?: string }, string>[`sync`]
1529+
>[0]
1530+
| undefined
1531+
1532+
const collection = createCollection<
1533+
{ id: string; n: number; foo?: string },
1534+
string
1535+
>({
1536+
id: `async-oninsert-race-test`,
1537+
getKey: (item) => item.id,
1538+
sync: {
1539+
sync: (cfg) => {
1540+
syncOps = cfg
1541+
cfg.markReady()
1542+
},
1543+
},
1544+
onInsert: async ({ transaction }) => {
1545+
// Simulate async operation (e.g., server round-trip)
1546+
await vi.advanceTimersByTimeAsync(100)
1547+
1548+
// Write modified data back via sync
1549+
const modifiedValues = transaction.mutations.map((m) => m.modified)
1550+
syncOps!.begin()
1551+
for (const value of modifiedValues) {
1552+
const existing = collection._state.syncedData.get(value.id)
1553+
syncOps!.write({
1554+
type: existing ? `update` : `insert`,
1555+
value: { ...value, foo: `abc` },
1556+
})
1557+
}
1558+
syncOps!.commit()
1559+
},
1560+
startSync: true,
1561+
})
1562+
1563+
collection.subscribeChanges((changes) => changeEvents.push(...changes))
1564+
1565+
// Insert two items rapidly - this triggers the race condition
1566+
collection.insert({ id: `0`, n: 1 })
1567+
collection.insert({ id: `1`, n: 1 })
1568+
1569+
await vi.runAllTimersAsync()
1570+
1571+
// Filter events by type
1572+
const insertEvents = changeEvents.filter((e) => e.type === `insert`)
1573+
const updateEvents = changeEvents.filter((e) => e.type === `update`)
1574+
1575+
// Expected: 2 optimistic inserts + 2 sync updates = 4 events
1576+
expect(insertEvents.length).toBe(2)
1577+
expect(updateEvents.length).toBe(2)
1578+
} finally {
1579+
vi.restoreAllMocks()
1580+
}
1581+
})
1582+
1583+
it(`should handle single insert with delayed sync correctly`, async () => {
1584+
vi.useFakeTimers()
1585+
1586+
try {
1587+
const changeEvents: Array<any> = []
1588+
let syncOps:
1589+
| Parameters<
1590+
SyncConfig<{ id: string; n: number; foo?: string }, string>[`sync`]
1591+
>[0]
1592+
| undefined
1593+
1594+
const collection = createCollection<
1595+
{ id: string; n: number; foo?: string },
1596+
string
1597+
>({
1598+
id: `single-insert-delayed-sync-test`,
1599+
getKey: (item) => item.id,
1600+
sync: {
1601+
sync: (cfg) => {
1602+
syncOps = cfg
1603+
cfg.markReady()
1604+
},
1605+
},
1606+
onInsert: async ({ transaction }) => {
1607+
await vi.advanceTimersByTimeAsync(50)
1608+
1609+
const modifiedValues = transaction.mutations.map((m) => m.modified)
1610+
syncOps!.begin()
1611+
for (const value of modifiedValues) {
1612+
const existing = collection._state.syncedData.get(value.id)
1613+
syncOps!.write({
1614+
type: existing ? `update` : `insert`,
1615+
value: { ...value, foo: `abc` },
1616+
})
1617+
}
1618+
syncOps!.commit()
1619+
},
1620+
startSync: true,
1621+
})
1622+
1623+
collection.subscribeChanges((changes) => changeEvents.push(...changes))
1624+
1625+
collection.insert({ id: `x`, n: 1 })
1626+
await vi.runAllTimersAsync()
1627+
1628+
// Should have optimistic insert + sync update
1629+
expect(changeEvents).toHaveLength(2)
1630+
expect(changeEvents[0]).toMatchObject({
1631+
type: `insert`,
1632+
key: `x`,
1633+
value: { id: `x`, n: 1 },
1634+
})
1635+
expect(changeEvents[1]).toMatchObject({
1636+
type: `update`,
1637+
key: `x`,
1638+
value: { id: `x`, n: 1, foo: `abc` },
1639+
})
1640+
} finally {
1641+
vi.restoreAllMocks()
1642+
}
1643+
})
15191644
})

0 commit comments

Comments
 (0)