Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/fix-live-query-join-duplicates.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@tanstack/db': patch
---

Fix `isReady()` returning `true` while `toArray()` returns empty results. The status now correctly waits until data has been processed through the graph before marking ready.

Also fix duplicate key errors when live queries use joins with custom `getKey` functions. D2's incremental join can produce multiple outputs for the same key during a single graph run; this change batches all outputs into a single transaction to prevent conflicts.
4 changes: 2 additions & 2 deletions packages/db/src/collection/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ export class CollectionSyncManager<
!isTruncateTransaction
) {
const existingValue = this.state.syncedData.get(key)
if (
const valuesEqual =
existingValue !== undefined &&
deepEquals(existingValue, messageWithOptionalKey.value)
) {
if (valuesEqual) {
// The "insert" is an echo of a value we already have locally.
// Treat it as an update so we preserve optimistic intent without
// throwing a duplicate-key error during reconciliation.
Expand Down
107 changes: 63 additions & 44 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ export class CollectionConfigBuilder<
if (syncState.subscribedToAllCollections) {
while (syncState.graph.pendingWork()) {
syncState.graph.run()
// Flush accumulated changes after each graph step to commit them as one transaction.
// This ensures intermediate join states (like null on one side) don't cause
// duplicate key errors when the full join result arrives in the same step.
syncState.flushPendingChanges?.()
callback?.()
}

Expand All @@ -345,10 +349,14 @@ export class CollectionConfigBuilder<
if (syncState.messagesCount === 0) {
begin()
commit()
// After initial commit, check if we should mark ready
// (in case all sources were already ready before we subscribed)
this.updateLiveQueryStatus(this.currentSyncConfig)
}

// After graph processing completes, check if we should mark ready.
// This is the canonical place to transition to ready state because:
// 1. All data has been processed through the graph
// 2. All source collections have had a chance to send their initial data
// This prevents marking ready before data is processed (fixes isReady=true with empty data)
this.updateLiveQueryStatus(this.currentSyncConfig)
}
} finally {
this.isGraphRunning = false
Expand Down Expand Up @@ -687,22 +695,56 @@ export class CollectionConfigBuilder<
const { begin, commit } = config
const { graph, inputs, pipeline } = this.maybeCompileBasePipeline()

// Accumulator for changes across all output callbacks within a single graph run.
// This allows us to batch all changes from intermediate join states into a single
// transaction, avoiding duplicate key errors when joins produce multiple outputs
// for the same key (e.g., first output with null, then output with joined data).
let pendingChanges: Map<unknown, Changes<TResult>> = new Map()

pipeline.pipe(
output((data) => {
const messages = data.getInner()
syncState.messagesCount += messages.length

begin()
messages
.reduce(
accumulateChanges<TResult>,
new Map<unknown, Changes<TResult>>(),
)
.forEach(this.applyChanges.bind(this, config))
commit()
// Accumulate changes from this output callback into the pending changes map.
// Changes for the same key are merged (inserts/deletes are added together).
for (const [[key, tupleData], multiplicity] of messages as Array<
[[unknown, [TResult, string | undefined]], number]
>) {
const [value, orderByIndex] = tupleData
const existing = pendingChanges.get(key) || {
deletes: 0,
inserts: 0,
value,
orderByIndex,
}
if (multiplicity < 0) {
existing.deletes += Math.abs(multiplicity)
} else if (multiplicity > 0) {
existing.inserts += multiplicity
// Update value to the latest version for this key
existing.value = value
if (orderByIndex !== undefined) {
existing.orderByIndex = orderByIndex
}
}
pendingChanges.set(key, existing)
}
}),
)

// Flush pending changes and reset the accumulator.
// Called at the end of each graph run to commit all accumulated changes.
syncState.flushPendingChanges = () => {
if (pendingChanges.size === 0) {
return
}
begin()
pendingChanges.forEach(this.applyChanges.bind(this, config))
commit()
pendingChanges = new Map()
}

graph.finalize()

// Extend the sync state with the graph, inputs, and pipeline
Expand Down Expand Up @@ -808,11 +850,14 @@ export class CollectionConfigBuilder<
return
}

// Mark ready when all source collections are ready AND
// the live query collection is not loading subset data.
// This prevents marking the live query ready before its data is loaded
// Mark ready when:
// 1. All subscriptions are set up (subscribedToAllCollections)
// 2. All source collections are ready
// 3. The live query collection is not loading subset data
// This prevents marking the live query ready before its data is processed
// (fixes issue where useLiveQuery returns isReady=true with empty data)
if (
this.currentSyncState?.subscribedToAllCollections &&
this.allCollectionsReady() &&
!this.liveQueryCollection?.isLoadingSubset
) {
Expand Down Expand Up @@ -913,8 +958,10 @@ export class CollectionConfigBuilder<
// (graph only runs when all collections are subscribed)
syncState.subscribedToAllCollections = true

// Initial status check after all subscriptions are set up
this.updateLiveQueryStatus(config)
// Note: We intentionally don't call updateLiveQueryStatus() here.
// The graph hasn't run yet, so marking ready would be premature.
// The canonical place to mark ready is after the graph processes data
// in maybeRunGraph(), which ensures data has been processed first.

return loadSubsetDataCallbacks
}
Expand Down Expand Up @@ -1074,31 +1121,3 @@ function extractCollectionAliases(query: QueryIR): Map<string, Set<string>> {

return aliasesById
}

function accumulateChanges<T>(
acc: Map<unknown, Changes<T>>,
[[key, tupleData], multiplicity]: [
[unknown, [any, string | undefined]],
number,
],
) {
// All queries now consistently return [value, orderByIndex] format
// where orderByIndex is undefined for queries without ORDER BY
const [value, orderByIndex] = tupleData as [T, string | undefined]

const changes = acc.get(key) || {
deletes: 0,
inserts: 0,
value,
orderByIndex,
}
if (multiplicity < 0) {
changes.deletes += Math.abs(multiplicity)
} else if (multiplicity > 0) {
changes.inserts += multiplicity
changes.value = value
changes.orderByIndex = orderByIndex
}
acc.set(key, changes)
return acc
}
4 changes: 3 additions & 1 deletion packages/db/src/query/live/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ export type SyncState = {
graph?: D2
inputs?: Record<string, RootStreamBuilder<unknown>>
pipeline?: ResultStream
flushPendingChanges?: () => void
}

export type FullSyncState = Required<SyncState>
export type FullSyncState = Required<Omit<SyncState, `flushPendingChanges`>> &
Pick<SyncState, `flushPendingChanges`>

/**
* Configuration interface for live query collection options
Expand Down
14 changes: 10 additions & 4 deletions packages/db/tests/query/live-query-collection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2293,10 +2293,11 @@ describe(`createLiveQueryCollection`, () => {
.limit(10),
)

// Trigger sync which will call loadSubset
await liveQueryCollection.preload()
// Start preload (don't await yet - it won't resolve until loadSubset completes)
const preloadPromise = liveQueryCollection.preload()
await flushPromises()

// Verify loadSubset was called with the correct options
expect(capturedOptions.length).toBeGreaterThan(0)

// Find the call that has orderBy (the limited snapshot request)
Expand All @@ -2308,8 +2309,10 @@ describe(`createLiveQueryCollection`, () => {
expect(callWithOrderBy?.orderBy?.[0]?.expression.type).toBe(`ref`)
expect(callWithOrderBy?.limit).toBe(10)

// Resolve the loadSubset promise so preload can complete
resolveLoadSubset!()
await flushPromises()
await preloadPromise
})

it(`passes multiple orderBy columns to loadSubset when using limit`, async () => {
Expand Down Expand Up @@ -2350,10 +2353,11 @@ describe(`createLiveQueryCollection`, () => {
.limit(10),
)

// Trigger sync which will call loadSubset
await liveQueryCollection.preload()
// Start preload (don't await yet - it won't resolve until loadSubset completes)
const preloadPromise = liveQueryCollection.preload()
await flushPromises()

// Verify loadSubset was called with the correct options
expect(capturedOptions.length).toBeGreaterThan(0)

// Find the call that has orderBy with multiple columns
Expand All @@ -2369,8 +2373,10 @@ describe(`createLiveQueryCollection`, () => {
expect(callWithMultiOrderBy?.orderBy?.[1]?.expression.type).toBe(`ref`)
expect(callWithMultiOrderBy?.limit).toBe(10)

// Resolve the loadSubset promise so preload can complete
resolveLoadSubset!()
await flushPromises()
await preloadPromise
})
})
})
Loading