diff --git a/.changeset/chilly-penguins-learn.md b/.changeset/chilly-penguins-learn.md new file mode 100644 index 000000000..ca6bbec6c --- /dev/null +++ b/.changeset/chilly-penguins-learn.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Fixed potential race conditions in WatchedQueries when updateSettings is called frequently. diff --git a/.changeset/empty-glasses-camp.md b/.changeset/empty-glasses-camp.md new file mode 100644 index 000000000..f1dd59ed6 --- /dev/null +++ b/.changeset/empty-glasses-camp.md @@ -0,0 +1,6 @@ +--- +'@powersync/react': patch +--- + +- Fixed bug where the `useQuery` reported `error` state would not clear after updating the query to a valid query. +- Fixed bug where `useQuery` `isFetching` status would not immediately be reported as true when the query has changed. diff --git a/.github/workflows/test-simulators.yaml b/.github/workflows/test-simulators.yaml index bf838b828..a3077a756 100644 --- a/.github/workflows/test-simulators.yaml +++ b/.github/workflows/test-simulators.yaml @@ -141,7 +141,8 @@ jobs: - name: Set up XCode uses: maxim-lobanov/setup-xcode@v1 with: - xcode-version: latest-stable + # TODO: Update to latest-stable once GH installs iOS 26 simulators + xcode-version: '^16.4.0' - name: CocoaPods Cache uses: actions/cache@v3 diff --git a/packages/common/src/client/watched/WatchedQuery.ts b/packages/common/src/client/watched/WatchedQuery.ts index ad0f0506d..08d38986c 100644 --- a/packages/common/src/client/watched/WatchedQuery.ts +++ b/packages/common/src/client/watched/WatchedQuery.ts @@ -71,6 +71,7 @@ export enum WatchedQueryListenerEvent { ON_DATA = 'onData', ON_ERROR = 'onError', ON_STATE_CHANGE = 'onStateChange', + SETTINGS_WILL_UPDATE = 'settingsWillUpdate', CLOSED = 'closed' } @@ -78,6 +79,7 @@ export interface WatchedQueryListener extends BaseListener { [WatchedQueryListenerEvent.ON_DATA]?: (data: Data) => void | Promise; [WatchedQueryListenerEvent.ON_ERROR]?: (error: Error) => void | Promise; [WatchedQueryListenerEvent.ON_STATE_CHANGE]?: (state: WatchedQueryState) => void | Promise; + [WatchedQueryListenerEvent.SETTINGS_WILL_UPDATE]?: () => void; [WatchedQueryListenerEvent.CLOSED]?: () => void | Promise; } diff --git a/packages/common/src/client/watched/processors/AbstractQueryProcessor.ts b/packages/common/src/client/watched/processors/AbstractQueryProcessor.ts index 3f089a3bf..22dbcf874 100644 --- a/packages/common/src/client/watched/processors/AbstractQueryProcessor.ts +++ b/packages/common/src/client/watched/processors/AbstractQueryProcessor.ts @@ -1,6 +1,12 @@ import { AbstractPowerSyncDatabase } from '../../../client/AbstractPowerSyncDatabase.js'; import { MetaBaseObserver } from '../../../utils/MetaBaseObserver.js'; -import { WatchedQuery, WatchedQueryListener, WatchedQueryOptions, WatchedQueryState } from '../WatchedQuery.js'; +import { + WatchedQuery, + WatchedQueryListener, + WatchedQueryListenerEvent, + WatchedQueryOptions, + WatchedQueryState +} from '../WatchedQuery.js'; /** * @internal @@ -62,7 +68,7 @@ export abstract class AbstractQueryProcessor< this._closed = false; this.state = this.constructInitialState(); this.disposeListeners = null; - this.initialized = this.init(); + this.initialized = this.init(this.abortController.signal); } protected constructInitialState(): WatchedQueryState { @@ -79,12 +85,15 @@ export abstract class AbstractQueryProcessor< return this.options.watchOptions.reportFetching ?? true; } - /** - * Updates the underlying query. - */ - async updateSettings(settings: Settings) { - this.abortController.abort(); - await this.initialized; + protected async updateSettingsInternal(settings: Settings, signal: AbortSignal) { + // This may have been aborted while awaiting or if multiple calls to `updateSettings` were made + if (this._closed || signal.aborted) { + return; + } + + this.options.watchOptions = settings; + + this.iterateListeners((l) => l[WatchedQueryListenerEvent.SETTINGS_WILL_UPDATE]?.()); if (!this.state.isFetching && this.reportFetching) { await this.updateState({ @@ -92,17 +101,30 @@ export abstract class AbstractQueryProcessor< }); } - this.options.watchOptions = settings; - - this.abortController = new AbortController(); await this.runWithReporting(() => this.linkQuery({ - abortSignal: this.abortController.signal, + abortSignal: signal, settings }) ); } + /** + * Updates the underlying query. + */ + async updateSettings(settings: Settings) { + // Abort the previous request + this.abortController.abort(); + + // Keep track of this controller's abort status + const abortController = new AbortController(); + // Allow this to be aborted externally + this.abortController = abortController; + + await this.initialized; + return this.updateSettingsInternal(settings, abortController.signal); + } + /** * This method is used to link a query to the subscribers of this listener class. * This method should perform actual query watching and report results via {@link updateState} method. @@ -110,6 +132,10 @@ export abstract class AbstractQueryProcessor< protected abstract linkQuery(options: LinkQueryOptions): Promise; protected async updateState(update: Partial>) { + if (this._closed) { + return; + } + if (typeof update.error !== 'undefined') { await this.iterateAsyncListenersWithError(async (l) => l.onError?.(update.error!)); // An error always stops for the current fetching state @@ -128,7 +154,7 @@ export abstract class AbstractQueryProcessor< /** * Configures base DB listeners and links the query to listeners. */ - protected async init() { + protected async init(signal: AbortSignal) { const { db } = this.options; const disposeCloseListener = db.registerListener({ @@ -153,17 +179,16 @@ export abstract class AbstractQueryProcessor< }; // Initial setup - this.runWithReporting(async () => { - await this.updateSettings(this.options.watchOptions); + await this.runWithReporting(async () => { + await this.updateSettingsInternal(this.options.watchOptions, signal); }); } async close() { - await this.initialized; + this._closed = true; this.abortController.abort(); this.disposeListeners?.(); this.disposeListeners = null; - this._closed = true; this.iterateListeners((l) => l.closed?.()); this.listeners.clear(); } diff --git a/packages/common/src/client/watched/processors/DifferentialQueryProcessor.ts b/packages/common/src/client/watched/processors/DifferentialQueryProcessor.ts index 9d7701bf0..33fbe9a25 100644 --- a/packages/common/src/client/watched/processors/DifferentialQueryProcessor.ts +++ b/packages/common/src/client/watched/processors/DifferentialQueryProcessor.ts @@ -279,6 +279,10 @@ export class DifferentialQueryProcessor }); } + if (this.state.error) { + partialStateUpdate.error = null; + } + if (Object.keys(partialStateUpdate).length > 0) { await this.updateState(partialStateUpdate); } diff --git a/packages/common/src/client/watched/processors/OnChangeQueryProcessor.ts b/packages/common/src/client/watched/processors/OnChangeQueryProcessor.ts index bdf46a96a..1d2c8372a 100644 --- a/packages/common/src/client/watched/processors/OnChangeQueryProcessor.ts +++ b/packages/common/src/client/watched/processors/OnChangeQueryProcessor.ts @@ -96,6 +96,10 @@ export class OnChangeQueryProcessor extends AbstractQueryProcessor 0) { await this.updateState(partialStateUpdate); } diff --git a/packages/react/src/hooks/watched/useWatchedQuery.ts b/packages/react/src/hooks/watched/useWatchedQuery.ts index d8331a710..1485cbbbe 100644 --- a/packages/react/src/hooks/watched/useWatchedQuery.ts +++ b/packages/react/src/hooks/watched/useWatchedQuery.ts @@ -16,13 +16,6 @@ export const useWatchedQuery = ( ): QueryResult | ReadonlyQueryResult => { const { query, powerSync, queryChanged, options: hookOptions, active } = options; - // This ref is used to protect against cases where `queryChanged` changes multiple times too quickly to be - // picked up by the useEffect below. This typically happens when React.StrictMode is enabled. - const queryChangeRef = React.useRef(false); - if (queryChanged && !queryChangeRef.current) { - queryChangeRef.current = true; - } - function createWatchedQuery() { if (!active) { return null; @@ -42,24 +35,55 @@ export const useWatchedQuery = ( } const [watchedQuery, setWatchedQuery] = React.useState(createWatchedQuery); + const disposePendingUpdateListener = React.useRef<() => void | null>(null); React.useEffect(() => { watchedQuery?.close(); - setWatchedQuery(createWatchedQuery); + const newQuery = createWatchedQuery(); + setWatchedQuery(newQuery); + + return () => { + disposePendingUpdateListener.current?.(); + newQuery?.close(); + }; }, [powerSync, active]); - // Indicates that the query will be re-fetched due to a change in the query. - // Used when `isFetching` hasn't been set to true yet due to React execution. - React.useEffect(() => { - if (queryChangeRef.current) { - watchedQuery?.updateSettings({ - query, - throttleMs: hookOptions.throttleMs, - reportFetching: hookOptions.reportFetching - }); - queryChangeRef.current = false; - } - }, [queryChangeRef.current]); + /** + * Indicates that the query will be re-fetched due to a change in the query. + * We execute this in-line (not using an effect) since effects are delayed till after the hook returns. + * The `queryChanged` value should only be true for a single render. + * The `updateSettings` method is asynchronous, thus it will update the state asynchronously. + * In the React hooks we'd like to report that we are fetching the data for an updated query + * as soon as the query has been updated. This prevents a result flow where e.g. the hook: + * - already returned a result: isLoading, isFetching are both false + * - the query is updated, but the state is still isFetching=false from the previous state + * We override the isFetching status while the `updateSettings` method is running (if we report `isFetching`), + * we override this value just until the `updateSettings` method itself will update the `isFetching` status. + * We achieve this by registering a `settingsWillUpdate` listener on the `WatchedQuery`. This will fire + * just before the `isFetching` status is updated. + */ + if (queryChanged) { + // Keep track of this pending operation + watchedQuery?.updateSettings({ + query, + throttleMs: hookOptions.throttleMs, + reportFetching: hookOptions.reportFetching + }); + // This could have been called multiple times, clear any old listeners. + disposePendingUpdateListener.current?.(); + disposePendingUpdateListener.current = watchedQuery?.registerListener({ + settingsWillUpdate: () => { + // We'll use the fact that we have a listener at all as an indication + disposePendingUpdateListener.current?.(); + disposePendingUpdateListener.current = null; + } + }); + } - return useNullableWatchedQuerySubscription(watchedQuery); + const shouldReportCurrentlyFetching = (hookOptions.reportFetching ?? true) && !!disposePendingUpdateListener.current; + const result = useNullableWatchedQuerySubscription(watchedQuery); + return { + ...result, + isFetching: result?.isFetching || shouldReportCurrentlyFetching + }; }; diff --git a/packages/react/tests/useQuery.test.tsx b/packages/react/tests/useQuery.test.tsx index d5619e224..dfc9ef810 100644 --- a/packages/react/tests/useQuery.test.tsx +++ b/packages/react/tests/useQuery.test.tsx @@ -10,6 +10,7 @@ import { beforeEach, describe, expect, it, onTestFinished, vi } from 'vitest'; import { PowerSyncContext } from '../src/hooks/PowerSyncContext'; import { useQuery } from '../src/hooks/watched/useQuery'; import { useWatchedQuerySubscription } from '../src/hooks/watched/useWatchedQuerySubscription'; +import { QueryResult } from '../src/hooks/watched/watch-types'; export const openPowerSync = () => { const db = new PowerSyncDatabase({ @@ -147,6 +148,13 @@ describe('useQuery', () => { (uuid (), 'second') `); + type TestEvent = { + parameters: string[]; + hookResults: QueryResult; + }; + + const hookEvents: TestEvent[] = []; + const query = () => { const [parameters, setParameters] = React.useState(['first']); @@ -155,7 +163,12 @@ describe('useQuery', () => { newParametersPromise.then((params) => setParameters(params)); }, []); - return useQuery('SELECT * FROM lists WHERE name = ?', parameters); + const result = useQuery('SELECT * FROM lists WHERE name = ?', parameters); + hookEvents.push({ + parameters, + hookResults: result + }); + return result; }; const { result } = renderHook(query, { wrapper: ({ children }) => testWrapper({ children, db }) }); @@ -168,6 +181,12 @@ describe('useQuery', () => { { timeout: 500, interval: 100 } ); + // Verify that the fetching status was correlated to the parameters + const firstResultEvent = hookEvents.find((event) => event.hookResults.data.length == 1); + expect(firstResultEvent).toBeDefined(); + // Fetching should be false as soon as the results were made available + expect(firstResultEvent?.hookResults.isFetching).false; + // Now update the parameter updateParameters(['second']); @@ -178,6 +197,268 @@ describe('useQuery', () => { }, { timeout: 500, interval: 100 } ); + + // finds the first result where the parameters have changed + const secondFetchingEvent = hookEvents.find((event) => event.parameters[0] == 'second'); + expect(secondFetchingEvent).toBeDefined(); + // We should immediately report that we are fetching once we detect new params + expect(secondFetchingEvent?.hookResults.isFetching).true; + }); + + it('should react to updated queries (many updates)', async () => { + const db = openPowerSync(); + + await db.execute(/* sql */ ` + INSERT INTO + lists (id, name) + VALUES + (uuid (), 'first'), + (uuid (), 'second') + `); + + type TestEvent = { + parameters: string[]; + hookResults: QueryResult; + }; + + const hookEvents: TestEvent[] = []; + + const queryObserver = new commonSdk.BaseObserver(); + const baseQuery = 'SELECT * FROM lists WHERE name = ?'; + const query = () => { + const [query, setQuery] = React.useState({ + sql: baseQuery, + params: [''] + }); + + useEffect(() => { + // allow updating the parameters externally + queryObserver.registerListener({ + queryUpdated: (query) => setQuery(query) + }); + }, []); + + const result = useQuery(query.sql, query.params); + hookEvents.push({ + parameters: query.params, + hookResults: result + }); + return result; + }; + + const { result } = renderHook(query, { wrapper: ({ children }) => testWrapper({ children, db }) }); + // let the hook render once, and immediately update the query + queryObserver.iterateListeners((l) => + l.queryUpdated?.({ + sql: baseQuery, + params: ['first'] + }) + ); + + // Wait for the first result to be emitted + await vi.waitFor( + () => { + expect(result.current.data[0]?.name).toEqual('first'); + expect(result.current.isFetching).false; + expect(result.current.isLoading).false; + }, + { timeout: 500, interval: 100 } + ); + + // We changed the params before the initial query could execute (we changed the params immediately) + // We should not see isLoading=false for the first set of params + expect( + hookEvents.find((event) => event.parameters[0] == '' && event.hookResults.isLoading == false) + ).toBeUndefined(); + // We should have an event where this was both loading and fetching + expect( + hookEvents.find( + (event) => + event.parameters[0] == 'first' && + event.hookResults.isLoading == true && + event.hookResults.isFetching == true + ) + ).toBeDefined(); + // We should not have any event where isLoading or isFetching is false without data being presented + expect( + hookEvents.find( + (event) => + event.parameters[0] == 'first' && + (event.hookResults.isLoading || event.hookResults.isFetching) == false && + event.hookResults.data[0]?.name != 'first' + ) + ).toBeUndefined(); + + // Verify that the fetching status was correlated to the parameters + const firstResultEvent = hookEvents.find((event) => event.hookResults.data.length == 1); + expect(firstResultEvent).toBeDefined(); + // Fetching should be false as soon as the results were made available + expect(firstResultEvent?.hookResults.isFetching).false; + + // Now update the parameter with something which will cause an error + queryObserver.iterateListeners((l) => + l.queryUpdated?.({ + sql: 'select this is a broken query', + params: ['error'] + }) + ); + + // wait for the error to have been found + await vi.waitFor( + () => { + expect(result.current.error).not.equal(null); + expect(result.current.isFetching).false; + }, + { timeout: 500, interval: 100 } + ); + + // The error should not be present before isFetching is false + expect( + hookEvents.find((event) => event.hookResults.error != null && event.hookResults.isFetching == true) + ).toBeUndefined(); + // there should not be any results where the fetching status is false, but the error is not presented + expect( + hookEvents.find( + (event) => + event.parameters[0] == 'error' && + event.hookResults.error == null && + (event.hookResults.isFetching || event.hookResults.isLoading) == false + ) + ).toBeUndefined(); + + queryObserver.iterateListeners((l) => + l.queryUpdated?.({ + sql: baseQuery, + params: ['second'] + }) + ); + + // We should now only receive the second list due to the WHERE clause and updated parameter + await vi.waitFor( + () => { + expect(result.current.data[0]?.name).toEqual('second'); + expect(result.current.error).null; + expect(result.current.isFetching).false; + expect(result.current.isLoading).false; + }, + { timeout: 500, interval: 100 } + ); + + const secondFetchingEvent = hookEvents.find((event) => event.parameters[0] == 'second'); + expect(secondFetchingEvent).toBeDefined(); + // We should immediately report that we are fetching once we detect new params + expect(secondFetchingEvent?.hookResults.isFetching).true; + // We should never have reported that fetching was false before the results were present + expect( + hookEvents.find( + (event) => + event.parameters[0] == 'second' && + event.hookResults.data[0]?.name != 'second' && + (event.hookResults.isFetching == false || event.hookResults.isLoading == false) + ) + ); + }); + + it('should react to updated queries (immediate updates)', async () => { + const db = openPowerSync(); + + await db.execute(/* sql */ ` + INSERT INTO + lists (id, name) + VALUES + (uuid (), 'first'), + (uuid (), 'second'), + (uuid (), 'third') + `); + + type TestEvent = { + parameters: number[]; + hookResults: QueryResult; + }; + + const hookEvents: TestEvent[] = []; + + const baseQuery = 'SELECT * FROM lists LIMIT ?'; + + const query = () => { + const [query, setQuery] = React.useState({ + sql: baseQuery, + params: [1] + }); + + // Change the params after the first render + useEffect(() => { + setQuery({ + sql: baseQuery, + params: [2] + }); + }, []); + + const result = useQuery(query.sql, query.params); + hookEvents.push({ + parameters: query.params, + hookResults: result + }); + return result; + }; + + const { result } = renderHook(query, { wrapper: ({ children }) => testWrapper({ children, db }) }); + // Wait for the first result to be emitted + await vi.waitFor( + () => { + expect(result.current.data.length).toEqual(2); + expect(result.current.isFetching).false; + expect(result.current.isLoading).false; + }, + { timeout: 500, interval: 100 } + ); + + console.log(JSON.stringify(hookEvents)); + + // We changed the params before the initial query could execute (we changed the params immediately) + // We should not see isLoading=false for the first set of params + expect( + hookEvents.find( + (event) => + event.parameters[0] == 1 && + (event.hookResults.isLoading == false || event.hookResults.isFetching == false) + ) + ).toBeUndefined(); + // We should have an event where this was both loading and fetching + expect( + hookEvents.find( + (event) => + event.parameters[0] == 1 && event.hookResults.isLoading == true && event.hookResults.isFetching == true + ) + ).toBeDefined(); + + // We should not have any event where isLoading or isFetching is false without data being presented + expect( + hookEvents.find( + (event) => + event.parameters[0] == 1 && + (event.hookResults.isLoading || event.hookResults.isFetching) == false && + event.hookResults.data.length != 1 + ) + ).toBeUndefined(); + + expect( + hookEvents.find( + (event) => + event.parameters[0] == 2 && + (event.hookResults.isLoading || event.hookResults.isFetching) == false && + event.hookResults.data.length != 2 + ) + ).toBeUndefined(); + + expect( + hookEvents.find( + (event) => + event.parameters[0] == 2 && + (event.hookResults.isLoading && event.hookResults.isFetching) == false && + event.hookResults.data.length == 2 + ) + ).toBeDefined(); }); it('should execute compatible queries', async () => { diff --git a/packages/web/tests/watch.test.ts b/packages/web/tests/watch.test.ts index b9511e277..e01c5d05e 100644 --- a/packages/web/tests/watch.test.ts +++ b/packages/web/tests/watch.test.ts @@ -733,6 +733,48 @@ describe('Watch Tests', { sequential: true }, () => { expect(watch.state.data[0].make).equals('nottest'); }); + it('should allow updating queries', async () => { + const watch = powersync + .query<{ make: string }>({ + sql: 'SELECT ? as result', + parameters: [0] + }) + .watch({ + reportFetching: false + }); + + let states: WatchedQueryState[] = []; + + // Keep track of all the states which have been updated + const dispose = watch.registerListener({ + onStateChange: (state) => { + states.push(state); + } + }); + + // Spam the updateSettings + let updatePromises = Array.from({ length: 100 }).map(async (_, index) => + watch.updateSettings({ + query: new GetAllQuery({ + sql: 'SELECT ? as result', + parameters: [index + 1] + }) + }) + ); + + await Promise.all(updatePromises); + + await vi.waitFor( + () => { + console.log(JSON.stringify(states)); + expect(states[states.length - 1].isFetching).false; + expect(states[states.length - 1].data[0].result).eq(100); + }, + { timeout: 3000 } + ); + dispose(); + }); + it('should report differential query results', async () => { const watch = powersync .query({