Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b3deae3
fix(react-db): prevent suspense from releasing before data is loaded …
claude Dec 30, 2025
b9e8eeb
test(react-db): add test for isLoadingSubset suspense behavior
claude Dec 30, 2025
4ac4e03
ci: apply automated fixes
autofix-ci[bot] Dec 30, 2025
e630acd
fix(db): prevent live query from being marked ready before subset dat…
claude Dec 30, 2025
c9b928a
ci: apply automated fixes
autofix-ci[bot] Dec 30, 2025
e66bc50
test(db): verify status is 'loading' while isLoadingSubset is true
claude Dec 30, 2025
cbba6e4
fix(db): register loadingSubset listener before subscribing to avoid …
claude Dec 31, 2025
6b09e01
fix(db): fix race condition in subscription status tracking
claude Dec 31, 2025
732acd3
ci: apply automated fixes
autofix-ci[bot] Dec 31, 2025
67acfa6
fix(db): check source collections' isLoadingSubset instead of live qu…
claude Dec 31, 2025
6c0975c
ci: apply automated fixes
autofix-ci[bot] Dec 31, 2025
6b03b56
fix(db): check live query collection's isLoadingSubset instead of sou…
claude Dec 31, 2025
d68d585
ci: apply automated fixes
autofix-ci[bot] Dec 31, 2025
cc9a8a4
fix(db): fix race condition where status listener was registered afte…
claude Dec 31, 2025
2ae80a2
ci: apply automated fixes
autofix-ci[bot] Dec 31, 2025
84b6569
refactor: streamline on-demand suspense fix
KyleAMathews Jan 7, 2026
cdd706d
ci: apply automated fixes
autofix-ci[bot] Jan 7, 2026
e5c0f6b
test: add sync/instant loadSubset resolution test
KyleAMathews Jan 7, 2026
62f0bd8
refactor(db): replace deferSnapshot with onStatusChange option
KyleAMathews Jan 8, 2026
5c3ab10
ci: apply automated fixes
autofix-ci[bot] Jan 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .changeset/fix-on-demand-ready-timing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
'@tanstack/db': patch
---

fix(db): prevent live query from being marked ready before subset data is loaded

In on-demand sync mode, the live query collection was being marked as `ready` before
the subset data finished loading. This caused `useLiveQuery` to return `isReady=true`
with empty data, and `useLiveSuspenseQuery` to release suspense prematurely.

The root cause was a race condition: the `status:change` listener in `CollectionSubscriber`
was registered _after_ the snapshot was triggered. If `loadSubset` resolved quickly
(or synchronously), the `loadingSubset` status transition would be missed entirely,
so `trackLoadPromise` was never called on the live query collection.

Changes:

1. **Core fix - `onStatusChange` option**: Added `onStatusChange` callback option to
`subscribeChanges()`. The listener is registered BEFORE any snapshot is requested,
guaranteeing no status transitions are missed. This replaces the error-prone pattern
of manually deferring snapshots and registering listeners in the correct order.

2. **Ready state gating**: `updateLiveQueryStatus()` now checks `isLoadingSubset` on the
live query collection before marking it ready, and listens for `loadingSubset:change`
to trigger the ready check when subset loading completes.
7 changes: 7 additions & 0 deletions packages/db/src/collection/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ export class CollectionChangesManager<
},
})

// Register status listener BEFORE requesting snapshot to avoid race condition.
// This ensures the listener catches all status transitions, even if the
// loadSubset promise resolves synchronously or very quickly.
if (options.onStatusChange) {
subscription.on(`status:change`, options.onStatusChange)
}

if (options.includeInitialState) {
subscription.requestSnapshot({ trackLoadSubsetPromise: false })
} else if (options.includeInitialState === false) {
Expand Down
25 changes: 23 additions & 2 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,21 @@ export class CollectionConfigBuilder<
},
)

// Listen for loadingSubset changes on the live query collection BEFORE subscribing.
// This ensures we don't miss the event if subset loading completes synchronously.
// When isLoadingSubset becomes false, we may need to mark the collection as ready
// (if all source collections are already ready but we were waiting for subset load to complete)
const loadingSubsetUnsubscribe = config.collection.on(
`loadingSubset:change`,
(event) => {
if (!event.isLoadingSubset) {
// Subset loading finished, check if we can now mark ready
this.updateLiveQueryStatus(config)
}
},
)
syncState.unsubscribeCallbacks.add(loadingSubsetUnsubscribe)

const loadSubsetDataCallbacks = this.subscribeToAllCollections(
config,
fullSyncState,
Expand Down Expand Up @@ -793,8 +808,14 @@ export class CollectionConfigBuilder<
return
}

// Mark ready when all source collections are ready
if (this.allCollectionsReady()) {
// Mark ready when all source collections are ready AND
// the live query collection is not loading subset data.
// This prevents marking the live query ready before its data is loaded
// (fixes issue where useLiveQuery returns isReady=true with empty data)
if (
this.allCollectionsReady() &&
!this.liveQueryCollection?.isLoadingSubset
) {
markReady()
}
}
Expand Down
101 changes: 60 additions & 41 deletions packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import {
} from '../compiler/expressions.js'
import type { MultiSetArray, RootStreamBuilder } from '@tanstack/db-ivm'
import type { Collection } from '../../collection/index.js'
import type { ChangeMessage } from '../../types.js'
import type {
ChangeMessage,
SubscriptionStatusChangeEvent,
} from '../../types.js'
import type { Context, GetResult } from '../builder/types.js'
import type { BasicExpression } from '../ir.js'
import type { OrderByOptimizationInfo } from '../compiler/order-by.js'
Expand Down Expand Up @@ -53,26 +56,10 @@ export class CollectionSubscriber<
}

private subscribeToChanges(whereExpression?: BasicExpression<boolean>) {
let subscription: CollectionSubscription
const orderByInfo = this.getOrderByInfo()
if (orderByInfo) {
subscription = this.subscribeToOrderedChanges(
whereExpression,
orderByInfo,
)
} else {
// If the source alias is lazy then we should not include the initial state
const includeInitialState = !this.collectionConfigBuilder.isLazyAlias(
this.alias,
)

subscription = this.subscribeToMatchingChanges(
whereExpression,
includeInitialState,
)
}

const trackLoadPromise = () => {
// Track load promises using subscription from the event (avoids circular dependency)
const trackLoadPromise = (subscription: CollectionSubscription) => {
// Guard against duplicate transitions
if (!this.subscriptionLoadingPromises.has(subscription)) {
let resolve: () => void
Expand All @@ -89,16 +76,12 @@ export class CollectionSubscriber<
}
}

// It can be that we are not yet subscribed when the first `loadSubset` call happens (i.e. the initial query).
// So we also check the status here and if it's `loadingSubset` then we track the load promise
if (subscription.status === `loadingSubset`) {
trackLoadPromise()
}

// Subscribe to subscription status changes to propagate loading state
const statusUnsubscribe = subscription.on(`status:change`, (event) => {
// Status change handler - passed to subscribeChanges so it's registered
// BEFORE any snapshot is requested, preventing race conditions
const onStatusChange = (event: SubscriptionStatusChangeEvent) => {
const subscription = event.subscription as CollectionSubscription
if (event.status === `loadingSubset`) {
trackLoadPromise()
trackLoadPromise(subscription)
} else {
// status is 'ready'
const deferred = this.subscriptionLoadingPromises.get(subscription)
Expand All @@ -108,7 +91,34 @@ export class CollectionSubscriber<
deferred.resolve()
}
}
})
}

// Create subscription with onStatusChange - listener is registered before any async work
let subscription: CollectionSubscription
if (orderByInfo) {
subscription = this.subscribeToOrderedChanges(
whereExpression,
orderByInfo,
onStatusChange,
)
} else {
// If the source alias is lazy then we should not include the initial state
const includeInitialState = !this.collectionConfigBuilder.isLazyAlias(
this.alias,
)

subscription = this.subscribeToMatchingChanges(
whereExpression,
includeInitialState,
onStatusChange,
)
}

// Check current status after subscribing - if status is 'loadingSubset', track it.
// The onStatusChange listener will catch the transition to 'ready'.
if (subscription.status === `loadingSubset`) {
trackLoadPromise(subscription)
}

const unsubscribe = () => {
// If subscription has a pending promise, resolve it before unsubscribing
Expand All @@ -119,7 +129,6 @@ export class CollectionSubscriber<
deferred.resolve()
}

statusUnsubscribe()
subscription.unsubscribe()
}
// currentSyncState is always defined when subscribe() is called
Expand Down Expand Up @@ -179,22 +188,22 @@ export class CollectionSubscriber<

private subscribeToMatchingChanges(
whereExpression: BasicExpression<boolean> | undefined,
includeInitialState: boolean = false,
) {
includeInitialState: boolean,
onStatusChange: (event: SubscriptionStatusChangeEvent) => void,
): CollectionSubscription {
const sendChanges = (
changes: Array<ChangeMessage<any, string | number>>,
) => {
this.sendChangesToPipeline(changes)
}

// Only pass includeInitialState when true. When it's false, we leave it
// undefined so that user subscriptions with explicit `includeInitialState: false`
// can be distinguished from internal lazy-loading subscriptions.
// If we pass `false`, changes.ts would call markAllStateAsSeen() which
// disables filtering - but internal subscriptions still need filtering.
// Create subscription with onStatusChange - listener is registered before snapshot
// Note: For non-ordered queries (no limit/offset), we use trackLoadSubsetPromise: false
// which is the default behavior in subscribeChanges
const subscription = this.collection.subscribeChanges(sendChanges, {
...(includeInitialState && { includeInitialState }),
whereExpression,
onStatusChange,
})

return subscription
Expand All @@ -203,22 +212,31 @@ export class CollectionSubscriber<
private subscribeToOrderedChanges(
whereExpression: BasicExpression<boolean> | undefined,
orderByInfo: OrderByOptimizationInfo,
) {
onStatusChange: (event: SubscriptionStatusChangeEvent) => void,
): CollectionSubscription {
const { orderBy, offset, limit, index } = orderByInfo

// Use a holder to forward-reference subscription in the callback
const subscriptionHolder: { current?: CollectionSubscription } = {}

const sendChangesInRange = (
changes: Iterable<ChangeMessage<any, string | number>>,
) => {
// Split live updates into a delete of the old value and an insert of the new value
const splittedChanges = splitUpdates(changes)
this.sendChangesToPipelineWithTracking(splittedChanges, subscription)
this.sendChangesToPipelineWithTracking(
splittedChanges,
subscriptionHolder.current!,
)
}

// Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far
// values that are bigger don't need to be sent because they can't affect the topK
// Subscribe to changes with onStatusChange - listener is registered before any snapshot
// values bigger than what we've sent don't need to be sent because they can't affect the topK
const subscription = this.collection.subscribeChanges(sendChangesInRange, {
whereExpression,
onStatusChange,
})
subscriptionHolder.current = subscription

// Listen for truncate events to reset cursor tracking state and sentToD2Keys
// This ensures that after a must-refetch/truncate, we don't use stale cursor data
Expand All @@ -236,6 +254,7 @@ export class CollectionSubscriber<
// Normalize the orderBy clauses such that the references are relative to the collection
const normalizedOrderBy = normalizeOrderByPaths(orderBy, this.alias)

// Trigger the snapshot request - onStatusChange listener is already registered
if (index) {
// We have an index on the first orderBy column - use lazy loading optimization
// This works for both single-column and multi-column orderBy:
Expand Down
6 changes: 6 additions & 0 deletions packages/db/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,12 @@ export interface SubscribeChangesOptions {
includeInitialState?: boolean
/** Pre-compiled expression for filtering changes */
whereExpression?: BasicExpression<boolean>
/**
* Listener for subscription status changes.
* Registered BEFORE any snapshot is requested, ensuring no status transitions are missed.
* @internal
*/
onStatusChange?: (event: SubscriptionStatusChangeEvent) => void
}

export interface SubscribeChangesSnapshotOptions extends Omit<
Expand Down
Loading
Loading