diff --git a/.changeset/fix-live-query-join-duplicates.md b/.changeset/fix-live-query-join-duplicates.md new file mode 100644 index 000000000..77a95d163 --- /dev/null +++ b/.changeset/fix-live-query-join-duplicates.md @@ -0,0 +1,7 @@ +--- +'@tanstack/db': patch +--- + +Fix `isReady()` returning `true` while `toArray()` returns empty results. The status now correctly waits until data has been processed through the graph before marking ready. + +Also fix duplicate key errors when live queries use joins with custom `getKey` functions. D2's incremental join can produce multiple outputs for the same key during a single graph run; this change batches all outputs into a single transaction to prevent conflicts. diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 841e76c1f..376ab58df 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -134,10 +134,10 @@ export class CollectionSyncManager< !isTruncateTransaction ) { const existingValue = this.state.syncedData.get(key) - if ( + const valuesEqual = existingValue !== undefined && deepEquals(existingValue, messageWithOptionalKey.value) - ) { + if (valuesEqual) { // The "insert" is an echo of a value we already have locally. // Treat it as an update so we preserve optimistic intent without // throwing a duplicate-key error during reconciliation. diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 59efff818..4309ad829 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -337,6 +337,10 @@ export class CollectionConfigBuilder< if (syncState.subscribedToAllCollections) { while (syncState.graph.pendingWork()) { syncState.graph.run() + // Flush accumulated changes after each graph step to commit them as one transaction. + // This ensures intermediate join states (like null on one side) don't cause + // duplicate key errors when the full join result arrives in the same step. + syncState.flushPendingChanges?.() callback?.() } @@ -345,10 +349,14 @@ export class CollectionConfigBuilder< if (syncState.messagesCount === 0) { begin() commit() - // After initial commit, check if we should mark ready - // (in case all sources were already ready before we subscribed) - this.updateLiveQueryStatus(this.currentSyncConfig) } + + // After graph processing completes, check if we should mark ready. + // This is the canonical place to transition to ready state because: + // 1. All data has been processed through the graph + // 2. All source collections have had a chance to send their initial data + // This prevents marking ready before data is processed (fixes isReady=true with empty data) + this.updateLiveQueryStatus(this.currentSyncConfig) } } finally { this.isGraphRunning = false @@ -687,22 +695,56 @@ export class CollectionConfigBuilder< const { begin, commit } = config const { graph, inputs, pipeline } = this.maybeCompileBasePipeline() + // Accumulator for changes across all output callbacks within a single graph run. + // This allows us to batch all changes from intermediate join states into a single + // transaction, avoiding duplicate key errors when joins produce multiple outputs + // for the same key (e.g., first output with null, then output with joined data). + let pendingChanges: Map> = new Map() + pipeline.pipe( output((data) => { const messages = data.getInner() syncState.messagesCount += messages.length - begin() - messages - .reduce( - accumulateChanges, - new Map>(), - ) - .forEach(this.applyChanges.bind(this, config)) - commit() + // Accumulate changes from this output callback into the pending changes map. + // Changes for the same key are merged (inserts/deletes are added together). + for (const [[key, tupleData], multiplicity] of messages as Array< + [[unknown, [TResult, string | undefined]], number] + >) { + const [value, orderByIndex] = tupleData + const existing = pendingChanges.get(key) || { + deletes: 0, + inserts: 0, + value, + orderByIndex, + } + if (multiplicity < 0) { + existing.deletes += Math.abs(multiplicity) + } else if (multiplicity > 0) { + existing.inserts += multiplicity + // Update value to the latest version for this key + existing.value = value + if (orderByIndex !== undefined) { + existing.orderByIndex = orderByIndex + } + } + pendingChanges.set(key, existing) + } }), ) + // Flush pending changes and reset the accumulator. + // Called at the end of each graph run to commit all accumulated changes. + syncState.flushPendingChanges = () => { + if (pendingChanges.size === 0) { + return + } + begin() + pendingChanges.forEach(this.applyChanges.bind(this, config)) + commit() + pendingChanges = new Map() + } + graph.finalize() // Extend the sync state with the graph, inputs, and pipeline @@ -808,11 +850,14 @@ export class CollectionConfigBuilder< return } - // Mark ready when all source collections are ready AND - // the live query collection is not loading subset data. - // This prevents marking the live query ready before its data is loaded + // Mark ready when: + // 1. All subscriptions are set up (subscribedToAllCollections) + // 2. All source collections are ready + // 3. The live query collection is not loading subset data + // This prevents marking the live query ready before its data is processed // (fixes issue where useLiveQuery returns isReady=true with empty data) if ( + this.currentSyncState?.subscribedToAllCollections && this.allCollectionsReady() && !this.liveQueryCollection?.isLoadingSubset ) { @@ -913,8 +958,10 @@ export class CollectionConfigBuilder< // (graph only runs when all collections are subscribed) syncState.subscribedToAllCollections = true - // Initial status check after all subscriptions are set up - this.updateLiveQueryStatus(config) + // Note: We intentionally don't call updateLiveQueryStatus() here. + // The graph hasn't run yet, so marking ready would be premature. + // The canonical place to mark ready is after the graph processes data + // in maybeRunGraph(), which ensures data has been processed first. return loadSubsetDataCallbacks } @@ -1074,31 +1121,3 @@ function extractCollectionAliases(query: QueryIR): Map> { return aliasesById } - -function accumulateChanges( - acc: Map>, - [[key, tupleData], multiplicity]: [ - [unknown, [any, string | undefined]], - number, - ], -) { - // All queries now consistently return [value, orderByIndex] format - // where orderByIndex is undefined for queries without ORDER BY - const [value, orderByIndex] = tupleData as [T, string | undefined] - - const changes = acc.get(key) || { - deletes: 0, - inserts: 0, - value, - orderByIndex, - } - if (multiplicity < 0) { - changes.deletes += Math.abs(multiplicity) - } else if (multiplicity > 0) { - changes.inserts += multiplicity - changes.value = value - changes.orderByIndex = orderByIndex - } - acc.set(key, changes) - return acc -} diff --git a/packages/db/src/query/live/types.ts b/packages/db/src/query/live/types.ts index 64a51d7ab..04d0b389f 100644 --- a/packages/db/src/query/live/types.ts +++ b/packages/db/src/query/live/types.ts @@ -22,9 +22,11 @@ export type SyncState = { graph?: D2 inputs?: Record> pipeline?: ResultStream + flushPendingChanges?: () => void } -export type FullSyncState = Required +export type FullSyncState = Required> & + Pick /** * Configuration interface for live query collection options diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 85866945c..58ed050ea 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -2293,10 +2293,11 @@ describe(`createLiveQueryCollection`, () => { .limit(10), ) - // Trigger sync which will call loadSubset - await liveQueryCollection.preload() + // Start preload (don't await yet - it won't resolve until loadSubset completes) + const preloadPromise = liveQueryCollection.preload() await flushPromises() + // Verify loadSubset was called with the correct options expect(capturedOptions.length).toBeGreaterThan(0) // Find the call that has orderBy (the limited snapshot request) @@ -2308,8 +2309,10 @@ describe(`createLiveQueryCollection`, () => { expect(callWithOrderBy?.orderBy?.[0]?.expression.type).toBe(`ref`) expect(callWithOrderBy?.limit).toBe(10) + // Resolve the loadSubset promise so preload can complete resolveLoadSubset!() await flushPromises() + await preloadPromise }) it(`passes multiple orderBy columns to loadSubset when using limit`, async () => { @@ -2350,10 +2353,11 @@ describe(`createLiveQueryCollection`, () => { .limit(10), ) - // Trigger sync which will call loadSubset - await liveQueryCollection.preload() + // Start preload (don't await yet - it won't resolve until loadSubset completes) + const preloadPromise = liveQueryCollection.preload() await flushPromises() + // Verify loadSubset was called with the correct options expect(capturedOptions.length).toBeGreaterThan(0) // Find the call that has orderBy with multiple columns @@ -2369,8 +2373,10 @@ describe(`createLiveQueryCollection`, () => { expect(callWithMultiOrderBy?.orderBy?.[1]?.expression.type).toBe(`ref`) expect(callWithMultiOrderBy?.limit).toBe(10) + // Resolve the loadSubset promise so preload can complete resolveLoadSubset!() await flushPromises() + await preloadPromise }) }) })