From b810516eac937870689fd1092725354334bc61f8 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 23 Sep 2025 10:32:47 +0200 Subject: [PATCH 01/11] Add onLoadMore callback and call it from the requestSnapshot methods --- packages/db/src/collection/sync.ts | 71 ++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 1a26038d..da9ac7f0 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -11,8 +11,13 @@ import { deepEquals } from "../utils" import type { StandardSchemaV1 } from "@standard-schema/spec" import type { ChangeMessage, +<<<<<<< HEAD CleanupFn, CollectionConfig, +======= + CollectionConfig, + CleanupFn, +>>>>>>> 684c70ec (Add onLoadMore callback and call it from the requestSnapshot methods) OnLoadMoreOptions, SyncConfigRes, } from "../types" @@ -94,6 +99,7 @@ export class CollectionSyncManager< } const key = this.config.getKey(messageWithoutKey.value) +<<<<<<< HEAD let messageType = messageWithoutKey.type // Check if an item with this key already exists when inserting @@ -171,6 +177,71 @@ export class CollectionSyncManager< throw new SyncTransactionAlreadyCommittedWriteError() } +======= + // Check if an item with this key already exists when inserting + if (messageWithoutKey.type === `insert`) { + const insertingIntoExistingSynced = this.state.syncedData.has(key) + const hasPendingDeleteForKey = + pendingTransaction.deletedKeys.has(key) + const isTruncateTransaction = pendingTransaction.truncate === true + // Allow insert after truncate in the same transaction even if it existed in syncedData + if ( + insertingIntoExistingSynced && + !hasPendingDeleteForKey && + !isTruncateTransaction + ) { + throw new DuplicateKeySyncError(key, this.id) + } + } + + const message: ChangeMessage = { + ...messageWithoutKey, + key, + } + pendingTransaction.operations.push(message) + + if (messageWithoutKey.type === `delete`) { + pendingTransaction.deletedKeys.add(key) + } + }, + commit: () => { + const pendingTransaction = + this.state.pendingSyncedTransactions[ + this.state.pendingSyncedTransactions.length - 1 + ] + if (!pendingTransaction) { + throw new NoPendingSyncTransactionCommitError() + } + if (pendingTransaction.committed) { + throw new SyncTransactionAlreadyCommittedError() + } + + pendingTransaction.committed = true + + // Update status to initialCommit when transitioning from loading + // This indicates we're in the process of committing the first transaction + if (this.lifecycle.status === `loading`) { + this.lifecycle.setStatus(`initialCommit`) + } + + this.state.commitPendingTransactions() + }, + markReady: () => { + this.lifecycle.markReady() + }, + truncate: () => { + const pendingTransaction = + this.state.pendingSyncedTransactions[ + this.state.pendingSyncedTransactions.length - 1 + ] + if (!pendingTransaction) { + throw new NoPendingSyncTransactionWriteError() + } + if (pendingTransaction.committed) { + throw new SyncTransactionAlreadyCommittedWriteError() + } + +>>>>>>> 684c70ec (Add onLoadMore callback and call it from the requestSnapshot methods) // Clear all operations from the current transaction pendingTransaction.operations = [] pendingTransaction.deletedKeys.clear() From e3f49794c87f1cbed10c1d4e73d12aaf2ba2d051 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 1 Oct 2025 09:59:46 +0200 Subject: [PATCH 02/11] Fixes to subscription --- packages/db/src/collection/subscription.ts | 4 +- packages/db/src/collection/sync.ts | 71 ---------------------- 2 files changed, 2 insertions(+), 73 deletions(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index f369971c..f806c53e 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -1,12 +1,11 @@ import { ensureIndexForExpression } from "../indexes/auto-index.js" import { and, gt, lt } from "../query/builder/functions.js" -import { Value } from "../query/ir.js" import { createFilterFunctionFromExpression, createFilteredCallback, } from "./change-events.js" -import type { BasicExpression, OrderBy } from "../query/ir.js" import type { IndexInterface } from "../indexes/base-index.js" +import { Value, type BasicExpression, type OrderBy } from "../query/ir.js" import type { ChangeMessage } from "../types.js" import type { CollectionImpl } from "./index.js" @@ -154,6 +153,7 @@ export class CollectionSubscription { limit, minValue, }: RequestLimitedSnapshotOptions) { + console.log("in requestLimitedSnapshot") if (!limit) throw new Error(`limit is required`) if (!this.orderByIndex) { diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index da9ac7f0..1a26038d 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -11,13 +11,8 @@ import { deepEquals } from "../utils" import type { StandardSchemaV1 } from "@standard-schema/spec" import type { ChangeMessage, -<<<<<<< HEAD CleanupFn, CollectionConfig, -======= - CollectionConfig, - CleanupFn, ->>>>>>> 684c70ec (Add onLoadMore callback and call it from the requestSnapshot methods) OnLoadMoreOptions, SyncConfigRes, } from "../types" @@ -99,7 +94,6 @@ export class CollectionSyncManager< } const key = this.config.getKey(messageWithoutKey.value) -<<<<<<< HEAD let messageType = messageWithoutKey.type // Check if an item with this key already exists when inserting @@ -177,71 +171,6 @@ export class CollectionSyncManager< throw new SyncTransactionAlreadyCommittedWriteError() } -======= - // Check if an item with this key already exists when inserting - if (messageWithoutKey.type === `insert`) { - const insertingIntoExistingSynced = this.state.syncedData.has(key) - const hasPendingDeleteForKey = - pendingTransaction.deletedKeys.has(key) - const isTruncateTransaction = pendingTransaction.truncate === true - // Allow insert after truncate in the same transaction even if it existed in syncedData - if ( - insertingIntoExistingSynced && - !hasPendingDeleteForKey && - !isTruncateTransaction - ) { - throw new DuplicateKeySyncError(key, this.id) - } - } - - const message: ChangeMessage = { - ...messageWithoutKey, - key, - } - pendingTransaction.operations.push(message) - - if (messageWithoutKey.type === `delete`) { - pendingTransaction.deletedKeys.add(key) - } - }, - commit: () => { - const pendingTransaction = - this.state.pendingSyncedTransactions[ - this.state.pendingSyncedTransactions.length - 1 - ] - if (!pendingTransaction) { - throw new NoPendingSyncTransactionCommitError() - } - if (pendingTransaction.committed) { - throw new SyncTransactionAlreadyCommittedError() - } - - pendingTransaction.committed = true - - // Update status to initialCommit when transitioning from loading - // This indicates we're in the process of committing the first transaction - if (this.lifecycle.status === `loading`) { - this.lifecycle.setStatus(`initialCommit`) - } - - this.state.commitPendingTransactions() - }, - markReady: () => { - this.lifecycle.markReady() - }, - truncate: () => { - const pendingTransaction = - this.state.pendingSyncedTransactions[ - this.state.pendingSyncedTransactions.length - 1 - ] - if (!pendingTransaction) { - throw new NoPendingSyncTransactionWriteError() - } - if (pendingTransaction.committed) { - throw new SyncTransactionAlreadyCommittedWriteError() - } - ->>>>>>> 684c70ec (Add onLoadMore callback and call it from the requestSnapshot methods) // Clear all operations from the current transaction pendingTransaction.operations = [] pendingTransaction.deletedKeys.clear() From 4f4a139b950b326385e05027e9c6b522ac44be48 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 1 Oct 2025 10:02:43 +0200 Subject: [PATCH 03/11] Remove debug logging in ts DB --- packages/db/src/collection/subscription.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index f806c53e..51a08368 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -153,7 +153,6 @@ export class CollectionSubscription { limit, minValue, }: RequestLimitedSnapshotOptions) { - console.log("in requestLimitedSnapshot") if (!limit) throw new Error(`limit is required`) if (!this.orderByIndex) { From 4978ef17c0d000ffc40956259fb18b96a76be341 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 1 Oct 2025 14:43:57 +0200 Subject: [PATCH 04/11] Forward pushed down predicates to the queryKey and queryFn --- packages/query-db-collection/src/query.ts | 387 ++++++++++++++-------- 1 file changed, 247 insertions(+), 140 deletions(-) diff --git a/packages/query-db-collection/src/query.ts b/packages/query-db-collection/src/query.ts index 2d53105c..0332730f 100644 --- a/packages/query-db-collection/src/query.ts +++ b/packages/query-db-collection/src/query.ts @@ -1,4 +1,4 @@ -import { QueryObserver } from "@tanstack/query-core" +import { QueryObserver, hashKey } from "@tanstack/query-core" import { GetKeyRequiredError, QueryClientRequiredError, @@ -18,6 +18,7 @@ import type { CollectionConfig, DeleteMutationFnParams, InsertMutationFnParams, + OnLoadMoreOptions, SyncConfig, UpdateMutationFnParams, UtilsRecord, @@ -41,6 +42,8 @@ type InferSchemaInput = T extends StandardSchemaV1 : Record : Record +type TQueryKeyBuilder = (opts: OnLoadMoreOptions) => TQueryKey + /** * Configuration options for creating a Query Collection * @template T - The explicit type of items stored in the collection @@ -62,7 +65,7 @@ export interface QueryCollectionConfig< TQueryData = Awaited>, > extends BaseCollectionConfig { /** The query key used by TanStack Query to identify this query */ - queryKey: TQueryKey + queryKey: TQueryKey | TQueryKeyBuilder /** Function that fetches data from the server. Must return the complete collection state */ queryFn: TQueryFn extends ( context: QueryFunctionContext @@ -421,174 +424,273 @@ export function queryCollectionOptions( /** The timestamp for when the query most recently returned the status as "error" */ let lastErrorUpdatedAt = 0 + // hashedQueryKey → queryKey + const hashToQueryKey = new Map() + + // queryKey → Set + const queryToRows = new Map>() + + // RowKey → Set + const rowToQueries = new Map>() + + // queryKey → QueryObserver - map of query observers that we did not yet susbcribe to + const observers = new Map< + string, + QueryObserver, any, Array, Array, any> + >() + + // queryKey → QueryObserver's unsubscribe function + const unsubscribes = new Map void>() + + // Helper function to add a row to the internal state + const addRow = (rowKey: string | number, hashedQueryKey: string) => { + const rowToQueriesSet = rowToQueries.get(rowKey) || new Set() + rowToQueriesSet.add(hashedQueryKey) + rowToQueries.set(rowKey, rowToQueriesSet) + + const queryToRowsSet = queryToRows.get(hashedQueryKey) || new Set() + queryToRowsSet.add(rowKey) + queryToRows.set(hashedQueryKey, queryToRowsSet) + } + + // Helper function to remove a row from the internal state + const removeRow = (rowKey: string | number, hashedQuerKey: string) => { + const rowToQueriesSet = rowToQueries.get(rowKey) || new Set() + rowToQueriesSet.delete(hashedQuerKey) + rowToQueries.set(rowKey, rowToQueriesSet) + + const queryToRowsSet = queryToRows.get(hashedQuerKey) || new Set() + queryToRowsSet.delete(rowKey) + queryToRows.set(hashedQuerKey, queryToRowsSet) + } + const internalSync: SyncConfig[`sync`] = (params) => { const { begin, write, commit, markReady, collection } = params - const observerOptions: QueryObserverOptions< - Array, - any, - Array, - Array, - any - > = { - queryKey: queryKey, - queryFn: queryFn, - meta: meta, - enabled: enabled, - refetchInterval: refetchInterval, - retry: retry, - retryDelay: retryDelay, - staleTime: staleTime, - structuralSharing: true, - notifyOnChangeProps: `all`, - } + const createQueryFromOpts = (opts: OnLoadMoreOptions) => { + // Push the predicates down to the queryKey and queryFn + const key = typeof queryKey === `function` ? queryKey(opts) : queryKey + const hashedQueryKey = hashKey(key) + const extendedMeta = { ...meta, tsDb: opts } - const localObserver = new QueryObserver< - Array, - any, - Array, - Array, - any - >(queryClient, observerOptions) - - let isSubscribed = false - let actualUnsubscribeFn: (() => void) | null = null - - type UpdateHandler = Parameters[0] - const handleQueryResult: UpdateHandler = (result) => { - if (result.isSuccess) { - // Clear error state - lastError = undefined - errorCount = 0 + if (observers.has(hashedQueryKey)) { + // We already have a query for this queryKey, return early + return + } - const rawData = result.data - const newItemsArray = select ? select(rawData) : rawData + const observerOptions: QueryObserverOptions< + Array, + any, + Array, + Array, + any + > = { + queryKey: key, + queryFn: queryFn, + meta: extendedMeta, + enabled: enabled, + refetchInterval: refetchInterval, + retry: retry, + retryDelay: retryDelay, + staleTime: staleTime, + structuralSharing: true, + notifyOnChangeProps: `all`, + } - if ( - !Array.isArray(newItemsArray) || - newItemsArray.some((item) => typeof item !== `object`) - ) { - const errorMessage = select - ? `@tanstack/query-db-collection: select() must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` - : `@tanstack/query-db-collection: queryFn must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` + const localObserver = new QueryObserver< + Array, + any, + Array, + Array, + any + >(queryClient, observerOptions) - console.error(errorMessage) - return - } + hashToQueryKey.set(hashedQueryKey, key) + observers.set(hashedQueryKey, localObserver) - const currentSyncedItems: Map = new Map( - collection._state.syncedData.entries() - ) - const newItemsMap = new Map() - newItemsArray.forEach((item) => { - const key = getKey(item) - newItemsMap.set(key, item) - }) + // If startSync=true or there are subscribers to the collection, subscribe to the query straight away + if (config.startSync || collection.subscriberCount > 0) { + subscribeToQuery(localObserver, hashedQueryKey) + } + } - begin() - - // Helper function for shallow equality check of objects - const shallowEqual = ( - obj1: Record, - obj2: Record - ): boolean => { - // Get all keys from both objects - const keys1 = Object.keys(obj1) - const keys2 = Object.keys(obj2) - - // If number of keys is different, objects are not equal - if (keys1.length !== keys2.length) return false - - // Check if all keys in obj1 have the same values in obj2 - return keys1.every((key) => { - // Skip comparing functions and complex objects deeply - if (typeof obj1[key] === `function`) return true - return obj1[key] === obj2[key] - }) - } + type UpdateHandler = Parameters[0] + + const makeQueryResultHandler = (queryKey: QueryKey) => { + const hashedQueryKey = hashKey(queryKey) + const handleQueryResult: UpdateHandler = (result) => { + if (result.isSuccess) { + // Clear error state + lastError = undefined + errorCount = 0 - currentSyncedItems.forEach((oldItem, key) => { - const newItem = newItemsMap.get(key) - if (!newItem) { - write({ type: `delete`, value: oldItem }) - } else if ( - !shallowEqual( - oldItem as Record, - newItem as Record - ) + const rawData = result.data + const newItemsArray = select ? select(rawData) : rawData + + if ( + !Array.isArray(newItemsArray) || + newItemsArray.some((item) => typeof item !== `object`) ) { - // Only update if there are actual differences in the properties - write({ type: `update`, value: newItem }) + const errorMessage = select + ? `@tanstack/query-db-collection: select() must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` + : `@tanstack/query-db-collection: queryFn must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` + + console.error(errorMessage) + return } - }) - newItemsMap.forEach((newItem, key) => { - if (!currentSyncedItems.has(key)) { - write({ type: `insert`, value: newItem }) + const currentSyncedItems: Map = new Map( + collection._state.syncedData.entries() + ) + const newItemsMap = new Map() + newItemsArray.forEach((item) => { + const key = getKey(item) + newItemsMap.set(key, item) + }) + + begin() + + // Helper function for shallow equality check of objects + const shallowEqual = ( + obj1: Record, + obj2: Record + ): boolean => { + // Get all keys from both objects + const keys1 = Object.keys(obj1) + const keys2 = Object.keys(obj2) + + // If number of keys is different, objects are not equal + if (keys1.length !== keys2.length) return false + + // Check if all keys in obj1 have the same values in obj2 + return keys1.every((key) => { + // Skip comparing functions and complex objects deeply + if (typeof obj1[key] === `function`) return true + return obj1[key] === obj2[key] + }) } - }) - commit() + currentSyncedItems.forEach((oldItem, key) => { + const newItem = newItemsMap.get(key) + if (!newItem) { + write({ type: `delete`, value: oldItem }) + removeRow(key, hashedQueryKey) + } else if ( + !shallowEqual( + oldItem as Record, + newItem as Record + ) + ) { + // Only update if there are actual differences in the properties + write({ type: `update`, value: newItem }) + } + }) - // Mark collection as ready after first successful query result - markReady() - } else if (result.isError) { - if (result.errorUpdatedAt !== lastErrorUpdatedAt) { - lastError = result.error - errorCount++ - lastErrorUpdatedAt = result.errorUpdatedAt - } + newItemsMap.forEach((newItem, key) => { + addRow(key, hashedQueryKey) + if (!currentSyncedItems.has(key)) { + write({ type: `insert`, value: newItem }) + } + }) - console.error( - `[QueryCollection] Error observing query ${String(queryKey)}:`, - result.error - ) + commit() - // Mark collection as ready even on error to avoid blocking apps - markReady() + // Mark collection as ready after first successful query result + markReady() + } else if (result.isError) { + if (result.errorUpdatedAt !== lastErrorUpdatedAt) { + lastError = result.error + errorCount++ + lastErrorUpdatedAt = result.errorUpdatedAt + } + + console.error( + `[QueryCollection] Error observing query ${String(queryKey)}:`, + result.error + ) + + // Mark collection as ready even on error to avoid blocking apps + markReady() + } } + return handleQueryResult } - const subscribeToQuery = () => { - if (!isSubscribed) { - actualUnsubscribeFn = localObserver.subscribe(handleQueryResult) - isSubscribed = true - } + const isSubscribed = (hashedQueryKey: string) => { + return unsubscribes.has(hashedQueryKey) } - const unsubscribeFromQuery = () => { - if (isSubscribed && actualUnsubscribeFn) { - actualUnsubscribeFn() - actualUnsubscribeFn = null - isSubscribed = false + const subscribeToQuery = ( + observer: QueryObserver, any, Array, Array, any>, + hashedQueryKey: string + ) => { + if (!isSubscribed(hashedQueryKey)) { + const queryKey = hashToQueryKey.get(hashedQueryKey)! + const handleQueryResult = makeQueryResultHandler(queryKey) + const unsubscribeFn = observer.subscribe(handleQueryResult) + unsubscribes.set(hashedQueryKey, unsubscribeFn) } } - // Always subscribe when sync starts (this could be from preload(), startSync config, or first subscriber) - // We'll dynamically unsubscribe/resubscribe based on subscriber count to maintain staleTime behavior - subscribeToQuery() + const subscribeToQueries = () => { + observers.forEach(subscribeToQuery) + } + + const unsubscribeFromQueries = () => { + unsubscribes.forEach((unsubscribeFn) => { + unsubscribeFn() + }) + unsubscribes.clear() + } // Set up event listener for subscriber changes const unsubscribeFromCollectionEvents = collection.on( `subscribers:change`, ({ subscriberCount }) => { if (subscriberCount > 0) { - subscribeToQuery() + subscribeToQueries() } else if (subscriberCount === 0) { - unsubscribeFromQuery() + unsubscribeFromQueries() } } ) - // Ensure we process any existing query data (QueryObserver doesn't invoke its callback automatically with initial - // state) - handleQueryResult(localObserver.getCurrentResult()) + // Create the initial query without any predicates + createQueryFromOpts({}) - return async () => { + // Always subscribe when sync starts (this could be from preload(), startSync config, or first subscriber) + // We'll dynamically unsubscribe/resubscribe based on subscriber count to maintain staleTime behavior + subscribeToQueries() + + // Ensure we process any existing query data (QueryObserver doesn't invoke its callback automatically with initial state) + observers.forEach((observer, hashedQueryKey) => { + const queryKey = hashToQueryKey.get(hashedQueryKey)! + const handleQueryResult = makeQueryResultHandler(queryKey) + handleQueryResult(observer.getCurrentResult()) + }) + + const cleanup = async () => { unsubscribeFromCollectionEvents() - unsubscribeFromQuery() - await queryClient.cancelQueries({ queryKey }) - queryClient.removeQueries({ queryKey }) + unsubscribeFromQueries() + + const queryKeys = [...hashToQueryKey.values()] + + hashToQueryKey.clear() + queryToRows.clear() + rowToQueries.clear() + observers.clear() + + await Promise.all( + queryKeys.map(async (queryKey) => { + await queryClient.cancelQueries({ queryKey }) + queryClient.removeQueries({ queryKey }) + }) + ) + } + + return { + onLoadMore: createQueryFromOpts, + cleanup, } } @@ -596,15 +698,20 @@ export function queryCollectionOptions( * Refetch the query data * @returns Promise that resolves when the refetch is complete */ - const refetch: RefetchFn = (opts) => { - return queryClient.refetchQueries( - { - queryKey: queryKey, - }, - { - throwOnError: opts?.throwOnError, - } - ) + const refetch: RefetchFn = async (opts) => { + const queryKeys = [...hashToQueryKey.values()] + const refetchPromises = queryKeys.map((queryKey) => { + return queryClient.refetchQueries( + { + queryKey: queryKey, + }, + { + throwOnError: opts?.throwOnError, + } + ) + }) + + await Promise.all(refetchPromises) } // Create write context for manual write operations From 8dec9815c491139209052fe513374d70f954b02a Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 1 Oct 2025 15:10:36 +0200 Subject: [PATCH 05/11] Fix existing tests --- .../query-db-collection/tests/query.test.ts | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/packages/query-db-collection/tests/query.test.ts b/packages/query-db-collection/tests/query.test.ts index b87caf67..aa21bb78 100644 --- a/packages/query-db-collection/tests/query.test.ts +++ b/packages/query-db-collection/tests/query.test.ts @@ -431,7 +431,9 @@ describe(`QueryCollection`, () => { }) // Verify queryFn was called with the correct context, including the meta object - expect(queryFn).toHaveBeenCalledWith(expect.objectContaining({ meta })) + expect(queryFn).toHaveBeenCalledWith( + expect.objectContaining({ meta: { ...meta, tsDb: {} } }) + ) }) describe(`Select method testing`, () => { @@ -696,6 +698,20 @@ describe(`QueryCollection`, () => { // Test case 1: Default behavior (undefined return) should trigger refetch const optionsDefault = queryCollectionOptions(configDefault) + + // Call sync to force the query to be created + optionsDefault.sync.sync({ + begin: () => {}, + write: () => {}, + commit: () => {}, + collection: { + subscriberCount: 1, + on: () => { + return () => {} + }, + }, + } as any) + await optionsDefault.onInsert!(insertMockParams) // Verify handler was called and refetch was triggered From 009864977d70d8698bea130cfef34d95fbee26a6 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 1 Oct 2025 15:22:34 +0200 Subject: [PATCH 06/11] Handle garbage collection of the underlying tanstack queries --- packages/query-db-collection/src/query.ts | 43 +++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/packages/query-db-collection/src/query.ts b/packages/query-db-collection/src/query.ts index 0332730f..a6eb9db8 100644 --- a/packages/query-db-collection/src/query.ts +++ b/packages/query-db-collection/src/query.ts @@ -669,6 +669,48 @@ export function queryCollectionOptions( handleQueryResult(observer.getCurrentResult()) }) + // Subscribe to the query client's cache to handle queries that are GCed by tanstack query + const unsubscribeQueryCache = queryClient + .getQueryCache() + .subscribe((event) => { + const hashedKey = event.query.queryHash + if (event.type === `removed`) { + cleanupQuery(hashedKey) + } + }) + + function cleanupQuery(hashedQueryKey: string) { + // Unsubscribe from the query's observer + unsubscribes.get(hashedQueryKey)?.() + + // Get all the rows that are in the result of this query + const rowKeys = queryToRows.get(hashedQueryKey) ?? new Set() + + // Remove the query from these rows + rowKeys.forEach((rowKey) => { + const queries = rowToQueries.get(rowKey) // set of queries that reference this row + if (queries && queries.size > 0) { + queries.delete(hashedQueryKey) + if (queries.size === 0) { + // Reference count dropped to 0, we can GC the row + rowToQueries.delete(rowKey) + + if (collection.has(rowKey)) { + begin() + write({ type: `delete`, value: collection.get(rowKey) }) + commit() + } + } + } + }) + + // Remove the query from the internal state + unsubscribes.delete(hashedQueryKey) + observers.delete(hashedQueryKey) + queryToRows.delete(hashedQueryKey) + hashToQueryKey.delete(hashedQueryKey) + } + const cleanup = async () => { unsubscribeFromCollectionEvents() unsubscribeFromQueries() @@ -679,6 +721,7 @@ export function queryCollectionOptions( queryToRows.clear() rowToQueries.clear() observers.clear() + unsubscribeQueryCache() await Promise.all( queryKeys.map(async (queryKey) => { From 6d6c6042b666a7ba34ff69f2bae662bf84e56710 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 6 Oct 2025 12:19:05 +0200 Subject: [PATCH 07/11] Remove row from collection only when its reference count drops to 0 --- packages/query-db-collection/src/query.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/query-db-collection/src/query.ts b/packages/query-db-collection/src/query.ts index a6eb9db8..fca3c573 100644 --- a/packages/query-db-collection/src/query.ts +++ b/packages/query-db-collection/src/query.ts @@ -462,6 +462,8 @@ export function queryCollectionOptions( const queryToRowsSet = queryToRows.get(hashedQuerKey) || new Set() queryToRowsSet.delete(rowKey) queryToRows.set(hashedQuerKey, queryToRowsSet) + + return rowToQueriesSet.size === 0 } const internalSync: SyncConfig[`sync`] = (params) => { @@ -573,8 +575,10 @@ export function queryCollectionOptions( currentSyncedItems.forEach((oldItem, key) => { const newItem = newItemsMap.get(key) if (!newItem) { - write({ type: `delete`, value: oldItem }) - removeRow(key, hashedQueryKey) + const needToRemove = removeRow(key, hashedQueryKey) // returns true if the row is no longer referenced by any queries + if (needToRemove) { + write({ type: `delete`, value: oldItem }) + } } else if ( !shallowEqual( oldItem as Record, From 25b84edbb67a333543d9e091bcb32c27ffb5fce1 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 6 Oct 2025 12:20:34 +0200 Subject: [PATCH 08/11] Unit tests for row deletion when queries are GCed --- .../query-db-collection/tests/query.test.ts | 473 ++++++++++++++++++ 1 file changed, 473 insertions(+) diff --git a/packages/query-db-collection/tests/query.test.ts b/packages/query-db-collection/tests/query.test.ts index aa21bb78..d9f7a555 100644 --- a/packages/query-db-collection/tests/query.test.ts +++ b/packages/query-db-collection/tests/query.test.ts @@ -2356,4 +2356,477 @@ describe(`QueryCollection`, () => { expect(collection.size).toBe(items.length) }) }) + + describe(`Query Garbage Collection`, () => { + it(`should delete all rows when a single query is garbage collected`, async () => { + const queryKey = [`single-query-gc-test`] + const items: Array = [ + { id: `1`, name: `Item 1` }, + { id: `2`, name: `Item 2` }, + { id: `3`, name: `Item 3` }, + ] + + const queryFn = vi.fn().mockResolvedValue(items) + + const config: QueryCollectionConfig = { + id: `single-query-gc-test`, + queryClient, + queryKey, + queryFn, + getKey, + startSync: true, + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Wait for initial data to load + await vi.waitFor(() => { + expect(collection.size).toBe(3) + expect(collection.get(`1`)).toEqual(items[0]) + expect(collection.get(`2`)).toEqual(items[1]) + expect(collection.get(`3`)).toEqual(items[2]) + }) + + // Verify all items are in the collection + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + + // Simulate query garbage collection by removing the query from the cache + queryClient.removeQueries({ queryKey }) + + // Wait for GC to process + await vi.waitFor(() => { + expect(collection.size).toBe(0) + }) + + // Verify all items are removed + expect(collection.has(`1`)).toBe(false) + expect(collection.has(`2`)).toBe(false) + expect(collection.has(`3`)).toBe(false) + }) + + it(`should only delete non-shared rows when one of multiple overlapping queries is GCed`, async () => { + const baseQueryKey = [`overlapping-query-test`] + + // Mock queryFn to return different data based on predicates + const queryFn = vi.fn().mockImplementation((context) => { + const { meta } = context + const tsDb = meta?.tsDb ?? {} + const { where, orderBy, limit } = tsDb + + // Query 1: items 1, 2, 3 (no predicates) + if (!where && !orderBy && !limit) { + return Promise.resolve([ + { id: `1`, name: `Item 1` }, + { id: `2`, name: `Item 2` }, + { id: `3`, name: `Item 3` }, + ]) + } + + // Query 2: items 2, 3, 4 (where: { category: 'B' }) + if (where?.category === `B`) { + return Promise.resolve([ + { id: `2`, name: `Item 2` }, + { id: `3`, name: `Item 3` }, + { id: `4`, name: `Item 4` }, + ]) + } + + // Query 3: items 3, 4, 5 (where: { category: 'C' }) + if (where?.category === `C`) { + return Promise.resolve([ + { id: `3`, name: `Item 3` }, + { id: `4`, name: `Item 4` }, + { id: `5`, name: `Item 5` }, + ]) + } + return Promise.resolve([]) + }) + + const queryKey = (ctx: any) => { + if (ctx.where) { + return [...baseQueryKey, ctx.where] + } + return baseQueryKey + } + + const config: QueryCollectionConfig = { + id: `overlapping-test`, + queryClient, + queryKey, + queryFn, + getKey, + startSync: true, + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Wait for initial data to load (query 1) + await vi.waitFor(() => { + expect(collection.size).toBe(3) + }) + + // Add query 2 with different predicates + // We abuse the `where` clause being typed as `any` to pass a category + // but in real usage this would be some Intermediate Representation of the where clause + await collection.syncMore({ where: { category: `B` } } as any) + + // Wait for query 2 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(4) // Should have items 1, 2, 3, 4 + }) + + // Add query 3 with different predicates + await collection.syncMore({ where: { category: `C` } } as any) + + // Wait for query 3 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(5) // Should have items 1, 2, 3, 4, 5 + }) + + // Verify all items are present + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + expect(collection.has(`4`)).toBe(true) + expect(collection.has(`5`)).toBe(true) + + // GC query 1 (no predicates) - should only remove item 1 (unique to query 1) + // Items 2 and 3 should remain because they're shared with other queries + queryClient.removeQueries({ queryKey: queryKey({}), exact: true }) + + await vi.waitFor(() => { + expect(collection.size).toBe(4) // Should have items 2, 3, 4, 5 + }) + + // Verify item 1 is removed (it was only in query 1) + expect(collection.has(`1`)).toBe(false) + + // Verify shared items are still present + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + expect(collection.has(`4`)).toBe(true) + expect(collection.has(`5`)).toBe(true) + + // GC query 2 (where: { category: 'B' }) - should remove item 2 + // Items 3 and 4 should remain because they are shared with query 3 + queryClient.removeQueries({ + queryKey: queryKey({ where: { category: `B` } }), + exact: true, + }) + + await vi.waitFor(() => { + expect(collection.size).toBe(3) // Should have items 3, 4, 5 + }) + + // Verify item 2 is removed (it was only in query 2) + expect(collection.has(`2`)).toBe(false) + + // Verify items 3 and 4 are still present (shared with query 3) + expect(collection.has(`3`)).toBe(true) + expect(collection.has(`4`)).toBe(true) + expect(collection.has(`5`)).toBe(true) + + // GC query 3 (where: { category: 'C' }) - should remove all remaining items + queryClient.removeQueries({ + queryKey: queryKey({ where: { category: `C` } }), + exact: true, + }) + + await vi.waitFor(() => { + expect(collection.size).toBe(0) + }) + + // Verify all items are now removed + expect(collection.has(`3`)).toBe(false) + expect(collection.has(`4`)).toBe(false) + expect(collection.has(`5`)).toBe(false) + }) + + it(`should handle GC of queries with identical data`, async () => { + const baseQueryKey = [`identical-query-test`] + + // Mock queryFn to return the same data for all queries + const queryFn = vi.fn().mockImplementation(() => { + // All queries return the same data regardless of predicates + return Promise.resolve([ + { id: `1`, name: `Item 1` }, + { id: `2`, name: `Item 2` }, + { id: `3`, name: `Item 3` }, + ]) + }) + + const config: QueryCollectionConfig = { + id: `identical-test`, + queryClient, + queryKey: (ctx) => { + if (ctx.where) { + return [...baseQueryKey, ctx.where] + } + return baseQueryKey + }, + queryFn, + getKey, + startSync: true, + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Wait for initial data to load (query 1) + await vi.waitFor(() => { + expect(collection.size).toBe(3) + }) + + // Add query 2 with different predicates (but returns same data) + await collection.syncMore({ where: { category: `A` } } as any) + + // Wait for query 2 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(3) // Same data, no new items + }) + + // Add query 3 with different predicates (but returns same data) + await collection.syncMore({ where: { category: `B` } } as any) + + // Wait for query 3 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(3) // Same data, no new items + }) + + // GC query 1 - should not remove any items (all items are shared with other queries) + queryClient.removeQueries({ + queryKey: (config.queryKey as any)({}), + exact: true, + }) + + await vi.waitFor(() => { + expect(collection.size).toBe(3) // Items still present due to other queries + }) + + // All items should still be present + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + + // GC query 2 - should still not remove any items (all items are shared with query 3) + queryClient.removeQueries({ + queryKey: (config.queryKey as any)({ where: { category: `A` } }), + exact: true, + }) + + await vi.waitFor(() => { + expect(collection.size).toBe(3) // Items still present due to query 3 + }) + + // All items should still be present + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + + // GC query 3 - should remove all items (no more queries reference them) + queryClient.removeQueries({ + queryKey: (config.queryKey as any)({ where: { category: `B` } }), + exact: true, + }) + + await vi.waitFor(() => { + expect(collection.size).toBe(0) + }) + + // All items should now be removed + expect(collection.has(`1`)).toBe(false) + expect(collection.has(`2`)).toBe(false) + expect(collection.has(`3`)).toBe(false) + }) + + it(`should handle GC of empty queries gracefully`, async () => { + const baseQueryKey = [`empty-query-test`] + + // Mock queryFn to return different data based on predicates + const queryFn = vi.fn().mockImplementation((context) => { + const { meta } = context + const tsDb = meta?.tsDb || {} + const { where, orderBy, limit } = tsDb + + // Query 1: empty array (no predicates) + if (!where && !orderBy && !limit) { + return Promise.resolve([]) + } + + // Query 2: some items (where: { category: 'A' }) + if (where?.category === `A`) { + return Promise.resolve([ + { id: `1`, name: `Item 1` }, + { id: `2`, name: `Item 2` }, + ]) + } + + return Promise.resolve([]) + }) + + const config: QueryCollectionConfig = { + id: `empty-test`, + queryClient, + queryKey: (ctx) => { + if (ctx.where) { + return [...baseQueryKey, ctx.where] + } + return baseQueryKey + }, + queryFn, + getKey, + startSync: true, + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Wait for initial data to load (query 1 - empty) + await vi.waitFor(() => { + expect(collection.size).toBe(0) // Empty query + }) + + // Add query 2 with different predicates + await collection.syncMore({ where: { category: `A` } } as any) + + // Wait for query 2 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(2) // Should have items 1, 2 + }) + + // Verify items are present + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + + // GC empty query 1 - should not affect the collection + queryClient.removeQueries({ + queryKey: (config.queryKey as any)({}), + exact: true, + }) + + await flushPromises() + + // Collection should still have items from query 2 + expect(collection.size).toBe(2) + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + + // GC non-empty query 2 - should remove its items + queryClient.removeQueries({ + queryKey: (config.queryKey as any)({ where: { category: `A` } }), + exact: true, + }) + + await vi.waitFor(() => { + expect(collection.size).toBe(0) + }) + + expect(collection.has(`1`)).toBe(false) + expect(collection.has(`2`)).toBe(false) + }) + + it(`should handle concurrent GC of multiple queries`, async () => { + const baseQueryKey = [`concurrent-query-test`] + + // Mock queryFn to return different data based on predicates + const queryFn = vi.fn().mockImplementation((context) => { + const { meta } = context + const tsDb = meta?.tsDb || {} + const { where, orderBy, limit } = tsDb + + // Query 1: items 1, 2 (no predicates) + if (!where && !orderBy && !limit) { + return Promise.resolve([ + { id: `1`, name: `Item 1` }, + { id: `2`, name: `Item 2` }, + ]) + } + + // Query 2: items 2, 3 (where: { type: 'A' }) + if (where?.type === `A`) { + return Promise.resolve([ + { id: `2`, name: `Item 2` }, + { id: `3`, name: `Item 3` }, + ]) + } + + // Query 3: items 3, 4 (where: { type: 'B' }) + if (where?.type === `B`) { + return Promise.resolve([ + { id: `3`, name: `Item 3` }, + { id: `4`, name: `Item 4` }, + ]) + } + + return Promise.resolve([]) + }) + + const config: QueryCollectionConfig = { + id: `concurrent-test`, + queryClient, + queryKey: (ctx) => { + if (ctx.where) { + return [...baseQueryKey, ctx.where] + } + return baseQueryKey + }, + queryFn, + getKey, + startSync: true, + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Wait for initial data to load (query 1) + await vi.waitFor(() => { + expect(collection.size).toBe(2) + }) + + // Add query 2 with different predicates + await collection.syncMore({ where: { type: `A` } } as any) + + // Wait for query 2 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(3) // Should have items 1, 2, 3 + }) + + // Add query 3 with different predicates + await collection.syncMore({ where: { type: `B` } } as any) + + // Wait for query 3 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(4) // Should have items 1, 2, 3, 4 + }) + + // GC all queries concurrently + queryClient.removeQueries({ + queryKey: (config.queryKey as any)({}), + exact: true, + }) + queryClient.removeQueries({ + queryKey: (config.queryKey as any)({ where: { type: `A` } }), + exact: true, + }) + queryClient.removeQueries({ + queryKey: (config.queryKey as any)({ where: { type: `B` } }), + exact: true, + }) + + // Wait for all GC operations to complete + await vi.waitFor(() => { + expect(collection.size).toBe(0) + }) + + // Verify all items are removed + expect(collection.has(`1`)).toBe(false) + expect(collection.has(`2`)).toBe(false) + expect(collection.has(`3`)).toBe(false) + expect(collection.has(`4`)).toBe(false) + }) + }) }) From 0221a42db83cdcbfbbca0e1a2088bd24374be0de Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 6 Oct 2025 13:59:04 +0200 Subject: [PATCH 09/11] Changeset --- .changeset/silent-trains-tell.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/silent-trains-tell.md diff --git a/.changeset/silent-trains-tell.md b/.changeset/silent-trains-tell.md new file mode 100644 index 00000000..414dd8c8 --- /dev/null +++ b/.changeset/silent-trains-tell.md @@ -0,0 +1,5 @@ +--- +"@tanstack/query-db-collection": patch +--- + +Handle pushed-down predicates From 77e8bdebeeaf42be93328b06a80ac4b4ce6af7ea Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 6 Oct 2025 15:42:46 +0200 Subject: [PATCH 10/11] Update lockfile --- pnpm-lock.yaml | 36 ++++-------------------------------- 1 file changed, 4 insertions(+), 32 deletions(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 88dc1cb8..003339de 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -6424,10 +6424,6 @@ packages: resolution: {integrity: sha512-bAxsR8BVfj60DWXHE3u30oHzfl4G7khkSuPW+qvpd7jFRHm7dLxOjUk1EHACJ/hxLY8phGJ0YhYHZo7jil7Qdg==} engines: {node: '>= 8'} - minizlib@3.0.2: - resolution: {integrity: sha512-oG62iEk+CYt5Xj2YqI5Xi9xWUeZhDI8jjQmC5oThVH5JGCTgIjr7ciJDzC7MBzYd//WvR1OTmP5Q38Q8ShQtVA==} - engines: {node: '>= 18'} - minizlib@3.1.0: resolution: {integrity: sha512-KZxYo1BUkWD2TVFLr0MQoM8vUUigWD3LlD83a/75BqC+4qE0Hb1Vo5v1FgcfaNXvfXzr+5EhQ6ing/CaBijTlw==} engines: {node: '>= 18'} @@ -6444,11 +6440,6 @@ packages: engines: {node: '>=10'} hasBin: true - mkdirp@3.0.1: - resolution: {integrity: sha512-+NsyUUAZDmo6YVHzL/stxSu3t9YS1iljliy3BSDrXJ/dkn1KYdmtZODGGjLcc9XLgVVpH4KshHB8XmZgMhaBXg==} - engines: {node: '>=10'} - hasBin: true - mlly@1.8.0: resolution: {integrity: sha512-l8D9ODSRWLe2KHJSifWGwBqpTZXIXTeo8mlKjY+E2HAakaTeNpqAyBZ8GSqLzHgw4XmHmC8whvpjJNMbFZN7/g==} @@ -7707,10 +7698,6 @@ packages: resolution: {integrity: sha512-DZ4yORTwrbTj/7MZYq2w+/ZFdI6OZ/f9SFHR+71gIVUZhOQPHzVCLpvRnPgyaMpfWxxk/4ONva3GQSyNIKRv6A==} engines: {node: '>=10'} - tar@7.4.3: - resolution: {integrity: sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw==} - engines: {node: '>=18'} - tar@7.5.1: resolution: {integrity: sha512-nlGpxf+hv0v7GkWBK2V9spgactGOp0qvfWRxUMjqHyzrt3SgwE48DIv/FhqPHJYLHpgW1opq3nERbz5Anq7n1g==} engines: {node: '>=18'} @@ -12807,7 +12794,7 @@ snapshots: minipass-pipeline: 1.2.4 p-map: 7.0.3 ssri: 12.0.0 - tar: 7.4.3 + tar: 7.5.1 unique-filename: 4.0.0 call-bind-apply-helpers@1.0.2: @@ -15072,7 +15059,7 @@ snapshots: dependencies: minipass: 7.1.2 minipass-sized: 1.0.3 - minizlib: 3.0.2 + minizlib: 3.1.0 optionalDependencies: encoding: 0.1.13 @@ -15101,10 +15088,6 @@ snapshots: minipass: 3.3.6 yallist: 4.0.0 - minizlib@3.0.2: - dependencies: - minipass: 7.1.2 - minizlib@3.1.0: dependencies: minipass: 7.1.2 @@ -15117,8 +15100,6 @@ snapshots: mkdirp@1.0.4: {} - mkdirp@3.0.1: {} - mlly@1.8.0: dependencies: acorn: 8.15.0 @@ -15227,7 +15208,7 @@ snapshots: nopt: 8.1.0 proc-log: 5.0.0 semver: 7.7.2 - tar: 7.4.3 + tar: 7.5.1 tinyglobby: 0.2.15 which: 5.0.0 transitivePeerDependencies: @@ -15285,7 +15266,7 @@ snapshots: make-fetch-happen: 14.0.3 minipass: 7.1.2 minipass-fetch: 4.0.1 - minizlib: 3.0.2 + minizlib: 3.1.0 npm-package-arg: 12.0.2 proc-log: 5.0.0 transitivePeerDependencies: @@ -16627,15 +16608,6 @@ snapshots: mkdirp: 1.0.4 yallist: 4.0.0 - tar@7.4.3: - dependencies: - '@isaacs/fs-minipass': 4.0.1 - chownr: 3.0.0 - minipass: 7.1.2 - minizlib: 3.0.2 - mkdirp: 3.0.1 - yallist: 5.0.0 - tar@7.5.1: dependencies: '@isaacs/fs-minipass': 4.0.1 From 8c72581853999c314d1ae954fc2c608b9fbea13e Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 6 Oct 2025 15:44:50 +0200 Subject: [PATCH 11/11] Update imports --- packages/db/src/collection/subscription.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 51a08368..f369971c 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -1,11 +1,12 @@ import { ensureIndexForExpression } from "../indexes/auto-index.js" import { and, gt, lt } from "../query/builder/functions.js" +import { Value } from "../query/ir.js" import { createFilterFunctionFromExpression, createFilteredCallback, } from "./change-events.js" +import type { BasicExpression, OrderBy } from "../query/ir.js" import type { IndexInterface } from "../indexes/base-index.js" -import { Value, type BasicExpression, type OrderBy } from "../query/ir.js" import type { ChangeMessage } from "../types.js" import type { CollectionImpl } from "./index.js"