Skip to content

Commit c1247e8

Browse files
KyleAMathewsclaudeautofix-ci[bot]
authored
fix(db): prevent on-demand live query from being marked ready before data loads (#1081)
* fix(react-db): prevent suspense from releasing before data is loaded in on-demand mode When using useLiveSuspenseQuery with on-demand sync mode, the suspense boundary would sometimes release before the query's data was actually loaded. This happened because the live query collection was marked as ready immediately when the source collection was already ready, even though the loadSubset operation for the specific query hadn't completed. This fix ensures that useLiveSuspenseQuery also suspends while isLoadingSubset is true, waiting for the initial subset load to complete before releasing the suspense boundary. * test(react-db): add test for isLoadingSubset suspense behavior This test verifies that useLiveSuspenseQuery holds the suspense boundary when isLoadingSubset is true, even if the collection status is 'ready'. The test confirms: 1. WITHOUT the fix: suspense releases prematurely (test fails) 2. WITH the fix: suspense waits for isLoadingSubset to be false (test passes) * ci: apply automated fixes * fix(db): prevent live query from being marked ready before subset data is loaded In on-demand sync mode, the live query collection was being marked as 'ready' before the subset data finished loading. This caused useLiveQuery to return isReady=true with empty data, and useLiveSuspenseQuery to release suspense prematurely. The fix: 1. Added isLoadingSubset check in updateLiveQueryStatus() to prevent marking ready while subset is loading 2. Added listener for loadingSubset:change events to trigger ready check when subset loading completes 3. Added test case that verifies the correct timing behavior * ci: apply automated fixes * test(db): verify status is 'loading' while isLoadingSubset is true * fix(db): register loadingSubset listener before subscribing to avoid race condition The loadingSubset:change listener was registered after subscribeToAllCollections(), which could cause a race condition where the event fires before the listener is registered. This resulted in the live query never becoming ready. Also adds await in electric test to account for async subset loading. * fix(db): fix race condition in subscription status tracking Register the status:change listener BEFORE checking the current subscription status to avoid missing status transitions. Previously, if loadSubset completed very quickly, the status could change from 'loadingSubset' to 'ready' between checking the status and registering the listener, causing the tracked promise to never resolve and the live query to never become ready. * ci: apply automated fixes * fix(db): check source collections' isLoadingSubset instead of live query's The previous fix incorrectly checked isLoadingSubset on the live query collection itself, but the loadSubset/trackLoadPromise mechanism runs on SOURCE collections during on-demand sync, so the live query's isLoadingSubset was always false. This fix: - Adds anySourceCollectionLoadingSubset() to check if any source collection has isLoadingSubset=true - Listens for loadingSubset:change events on source collections instead of the live query collection * ci: apply automated fixes * fix(db): check live query collection's isLoadingSubset instead of source collections Reverts the change to check source collections' isLoadingSubset, which was causing test timeouts in query-db-collection tests. The live query collection's isLoadingSubset is correctly updated by CollectionSubscriber.trackLoadPromise() which tracks loading on the live query collection itself. Also updates changeset to accurately describe the fix. * ci: apply automated fixes * fix(db): fix race condition where status listener was registered after snapshot trigger The subscription's status:change listener was being registered AFTER the snapshot was triggered (via requestSnapshot/requestLimitedSnapshot). This meant that if the loadSubset promise resolved quickly (or synchronously), the status transition from 'loadingSubset' to 'ready' could be missed entirely. Changes: - Refactored subscribeToChanges() to split subscription creation from snapshot triggering - subscribeToMatchingChanges() and subscribeToOrderedChanges() now return both the subscription AND a triggerSnapshot function - The status listener is registered AFTER getting the subscription but BEFORE calling triggerSnapshot() - Added deferSnapshot option to subscribeChanges() to prevent automatic snapshot request - For non-ordered queries, continue using trackLoadSubsetPromise: false to maintain compatibility with query-db-collection's destroyed observer handling - Updated test for source collection isLoadingSubset independence - Added regression test for the race condition fix * ci: apply automated fixes * refactor: streamline on-demand suspense fix - Remove useLiveSuspenseQuery changes (not needed with core fix) - Remove artificial test that manually sets isLoadingSubset after ready - Update changeset to accurately describe the fix - Remove react-db changeset (no react-db source changes) The core fix (deferSnapshot + ready gating) is sufficient. The suspense hook doesn't need additional isLoadingSubset checks because the live query collection won't be marked ready while isLoadingSubset is true. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * ci: apply automated fixes * test: add sync/instant loadSubset resolution test Tests the tricky case where loadSubset returns Promise.resolve() immediately. This proves the race condition fix works even when the status transition happens synchronously, not just with delayed promises. Addresses reviewer feedback to strengthen test coverage. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor(db): replace deferSnapshot with onStatusChange option Simplify the subscription API by replacing the error-prone 3-step deferSnapshot pattern with a cleaner onStatusChange callback option. The listener is now registered internally BEFORE any snapshot is requested, guaranteeing no status transitions are missed regardless of how quickly loadSubset resolves. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * ci: apply automated fixes --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent 14d4cac commit c1247e8

File tree

8 files changed

+426
-50
lines changed

8 files changed

+426
-50
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
---
2+
'@tanstack/db': patch
3+
---
4+
5+
fix(db): prevent live query from being marked ready before subset data is loaded
6+
7+
In on-demand sync mode, the live query collection was being marked as `ready` before
8+
the subset data finished loading. This caused `useLiveQuery` to return `isReady=true`
9+
with empty data, and `useLiveSuspenseQuery` to release suspense prematurely.
10+
11+
The root cause was a race condition: the `status:change` listener in `CollectionSubscriber`
12+
was registered _after_ the snapshot was triggered. If `loadSubset` resolved quickly
13+
(or synchronously), the `loadingSubset` status transition would be missed entirely,
14+
so `trackLoadPromise` was never called on the live query collection.
15+
16+
Changes:
17+
18+
1. **Core fix - `onStatusChange` option**: Added `onStatusChange` callback option to
19+
`subscribeChanges()`. The listener is registered BEFORE any snapshot is requested,
20+
guaranteeing no status transitions are missed. This replaces the error-prone pattern
21+
of manually deferring snapshots and registering listeners in the correct order.
22+
23+
2. **Ready state gating**: `updateLiveQueryStatus()` now checks `isLoadingSubset` on the
24+
live query collection before marking it ready, and listens for `loadingSubset:change`
25+
to trigger the ready check when subset loading completes.

packages/db/src/collection/changes.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@ export class CollectionChangesManager<
127127
},
128128
})
129129

130+
// Register status listener BEFORE requesting snapshot to avoid race condition.
131+
// This ensures the listener catches all status transitions, even if the
132+
// loadSubset promise resolves synchronously or very quickly.
133+
if (options.onStatusChange) {
134+
subscription.on(`status:change`, options.onStatusChange)
135+
}
136+
130137
if (options.includeInitialState) {
131138
subscription.requestSnapshot({ trackLoadSubsetPromise: false })
132139
} else if (options.includeInitialState === false) {

packages/db/src/query/live/collection-config-builder.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,21 @@ export class CollectionConfigBuilder<
566566
},
567567
)
568568

569+
// Listen for loadingSubset changes on the live query collection BEFORE subscribing.
570+
// This ensures we don't miss the event if subset loading completes synchronously.
571+
// When isLoadingSubset becomes false, we may need to mark the collection as ready
572+
// (if all source collections are already ready but we were waiting for subset load to complete)
573+
const loadingSubsetUnsubscribe = config.collection.on(
574+
`loadingSubset:change`,
575+
(event) => {
576+
if (!event.isLoadingSubset) {
577+
// Subset loading finished, check if we can now mark ready
578+
this.updateLiveQueryStatus(config)
579+
}
580+
},
581+
)
582+
syncState.unsubscribeCallbacks.add(loadingSubsetUnsubscribe)
583+
569584
const loadSubsetDataCallbacks = this.subscribeToAllCollections(
570585
config,
571586
fullSyncState,
@@ -793,8 +808,14 @@ export class CollectionConfigBuilder<
793808
return
794809
}
795810

796-
// Mark ready when all source collections are ready
797-
if (this.allCollectionsReady()) {
811+
// Mark ready when all source collections are ready AND
812+
// the live query collection is not loading subset data.
813+
// This prevents marking the live query ready before its data is loaded
814+
// (fixes issue where useLiveQuery returns isReady=true with empty data)
815+
if (
816+
this.allCollectionsReady() &&
817+
!this.liveQueryCollection?.isLoadingSubset
818+
) {
798819
markReady()
799820
}
800821
}

packages/db/src/query/live/collection-subscriber.ts

Lines changed: 60 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import {
55
} from '../compiler/expressions.js'
66
import type { MultiSetArray, RootStreamBuilder } from '@tanstack/db-ivm'
77
import type { Collection } from '../../collection/index.js'
8-
import type { ChangeMessage } from '../../types.js'
8+
import type {
9+
ChangeMessage,
10+
SubscriptionStatusChangeEvent,
11+
} from '../../types.js'
912
import type { Context, GetResult } from '../builder/types.js'
1013
import type { BasicExpression } from '../ir.js'
1114
import type { OrderByOptimizationInfo } from '../compiler/order-by.js'
@@ -53,26 +56,10 @@ export class CollectionSubscriber<
5356
}
5457

5558
private subscribeToChanges(whereExpression?: BasicExpression<boolean>) {
56-
let subscription: CollectionSubscription
5759
const orderByInfo = this.getOrderByInfo()
58-
if (orderByInfo) {
59-
subscription = this.subscribeToOrderedChanges(
60-
whereExpression,
61-
orderByInfo,
62-
)
63-
} else {
64-
// If the source alias is lazy then we should not include the initial state
65-
const includeInitialState = !this.collectionConfigBuilder.isLazyAlias(
66-
this.alias,
67-
)
68-
69-
subscription = this.subscribeToMatchingChanges(
70-
whereExpression,
71-
includeInitialState,
72-
)
73-
}
7460

75-
const trackLoadPromise = () => {
61+
// Track load promises using subscription from the event (avoids circular dependency)
62+
const trackLoadPromise = (subscription: CollectionSubscription) => {
7663
// Guard against duplicate transitions
7764
if (!this.subscriptionLoadingPromises.has(subscription)) {
7865
let resolve: () => void
@@ -89,16 +76,12 @@ export class CollectionSubscriber<
8976
}
9077
}
9178

92-
// It can be that we are not yet subscribed when the first `loadSubset` call happens (i.e. the initial query).
93-
// So we also check the status here and if it's `loadingSubset` then we track the load promise
94-
if (subscription.status === `loadingSubset`) {
95-
trackLoadPromise()
96-
}
97-
98-
// Subscribe to subscription status changes to propagate loading state
99-
const statusUnsubscribe = subscription.on(`status:change`, (event) => {
79+
// Status change handler - passed to subscribeChanges so it's registered
80+
// BEFORE any snapshot is requested, preventing race conditions
81+
const onStatusChange = (event: SubscriptionStatusChangeEvent) => {
82+
const subscription = event.subscription as CollectionSubscription
10083
if (event.status === `loadingSubset`) {
101-
trackLoadPromise()
84+
trackLoadPromise(subscription)
10285
} else {
10386
// status is 'ready'
10487
const deferred = this.subscriptionLoadingPromises.get(subscription)
@@ -108,7 +91,34 @@ export class CollectionSubscriber<
10891
deferred.resolve()
10992
}
11093
}
111-
})
94+
}
95+
96+
// Create subscription with onStatusChange - listener is registered before any async work
97+
let subscription: CollectionSubscription
98+
if (orderByInfo) {
99+
subscription = this.subscribeToOrderedChanges(
100+
whereExpression,
101+
orderByInfo,
102+
onStatusChange,
103+
)
104+
} else {
105+
// If the source alias is lazy then we should not include the initial state
106+
const includeInitialState = !this.collectionConfigBuilder.isLazyAlias(
107+
this.alias,
108+
)
109+
110+
subscription = this.subscribeToMatchingChanges(
111+
whereExpression,
112+
includeInitialState,
113+
onStatusChange,
114+
)
115+
}
116+
117+
// Check current status after subscribing - if status is 'loadingSubset', track it.
118+
// The onStatusChange listener will catch the transition to 'ready'.
119+
if (subscription.status === `loadingSubset`) {
120+
trackLoadPromise(subscription)
121+
}
112122

113123
const unsubscribe = () => {
114124
// If subscription has a pending promise, resolve it before unsubscribing
@@ -119,7 +129,6 @@ export class CollectionSubscriber<
119129
deferred.resolve()
120130
}
121131

122-
statusUnsubscribe()
123132
subscription.unsubscribe()
124133
}
125134
// currentSyncState is always defined when subscribe() is called
@@ -179,22 +188,22 @@ export class CollectionSubscriber<
179188

180189
private subscribeToMatchingChanges(
181190
whereExpression: BasicExpression<boolean> | undefined,
182-
includeInitialState: boolean = false,
183-
) {
191+
includeInitialState: boolean,
192+
onStatusChange: (event: SubscriptionStatusChangeEvent) => void,
193+
): CollectionSubscription {
184194
const sendChanges = (
185195
changes: Array<ChangeMessage<any, string | number>>,
186196
) => {
187197
this.sendChangesToPipeline(changes)
188198
}
189199

190-
// Only pass includeInitialState when true. When it's false, we leave it
191-
// undefined so that user subscriptions with explicit `includeInitialState: false`
192-
// can be distinguished from internal lazy-loading subscriptions.
193-
// If we pass `false`, changes.ts would call markAllStateAsSeen() which
194-
// disables filtering - but internal subscriptions still need filtering.
200+
// Create subscription with onStatusChange - listener is registered before snapshot
201+
// Note: For non-ordered queries (no limit/offset), we use trackLoadSubsetPromise: false
202+
// which is the default behavior in subscribeChanges
195203
const subscription = this.collection.subscribeChanges(sendChanges, {
196204
...(includeInitialState && { includeInitialState }),
197205
whereExpression,
206+
onStatusChange,
198207
})
199208

200209
return subscription
@@ -203,22 +212,31 @@ export class CollectionSubscriber<
203212
private subscribeToOrderedChanges(
204213
whereExpression: BasicExpression<boolean> | undefined,
205214
orderByInfo: OrderByOptimizationInfo,
206-
) {
215+
onStatusChange: (event: SubscriptionStatusChangeEvent) => void,
216+
): CollectionSubscription {
207217
const { orderBy, offset, limit, index } = orderByInfo
208218

219+
// Use a holder to forward-reference subscription in the callback
220+
const subscriptionHolder: { current?: CollectionSubscription } = {}
221+
209222
const sendChangesInRange = (
210223
changes: Iterable<ChangeMessage<any, string | number>>,
211224
) => {
212225
// Split live updates into a delete of the old value and an insert of the new value
213226
const splittedChanges = splitUpdates(changes)
214-
this.sendChangesToPipelineWithTracking(splittedChanges, subscription)
227+
this.sendChangesToPipelineWithTracking(
228+
splittedChanges,
229+
subscriptionHolder.current!,
230+
)
215231
}
216232

217-
// Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far
218-
// values that are bigger don't need to be sent because they can't affect the topK
233+
// Subscribe to changes with onStatusChange - listener is registered before any snapshot
234+
// values bigger than what we've sent don't need to be sent because they can't affect the topK
219235
const subscription = this.collection.subscribeChanges(sendChangesInRange, {
220236
whereExpression,
237+
onStatusChange,
221238
})
239+
subscriptionHolder.current = subscription
222240

223241
// Listen for truncate events to reset cursor tracking state and sentToD2Keys
224242
// This ensures that after a must-refetch/truncate, we don't use stale cursor data
@@ -236,6 +254,7 @@ export class CollectionSubscriber<
236254
// Normalize the orderBy clauses such that the references are relative to the collection
237255
const normalizedOrderBy = normalizeOrderByPaths(orderBy, this.alias)
238256

257+
// Trigger the snapshot request - onStatusChange listener is already registered
239258
if (index) {
240259
// We have an index on the first orderBy column - use lazy loading optimization
241260
// This works for both single-column and multi-column orderBy:

packages/db/src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,12 @@ export interface SubscribeChangesOptions<
798798
where?: (row: SingleRowRefProxy<T>) => any
799799
/** Pre-compiled expression for filtering changes */
800800
whereExpression?: BasicExpression<boolean>
801+
/**
802+
* Listener for subscription status changes.
803+
* Registered BEFORE any snapshot is requested, ensuring no status transitions are missed.
804+
* @internal
805+
*/
806+
onStatusChange?: (event: SubscriptionStatusChangeEvent) => void
801807
}
802808

803809
export interface SubscribeChangesSnapshotOptions<

0 commit comments

Comments
 (0)