Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chilly-penguins-learn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Fixed potential race conditions in WatchedQueries when updateSettings is called frequently.
6 changes: 6 additions & 0 deletions .changeset/empty-glasses-camp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/react': patch
---

- Fixed bug where the `useQuery` reported `error` state would not clear after updating the query to a valid query.
- Fixed bug where `useQuery` `isFetching` status would not immediately be reported as true when the query has changed.
3 changes: 2 additions & 1 deletion .github/workflows/test-simulators.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ jobs:
- name: Set up XCode
uses: maxim-lobanov/setup-xcode@v1
with:
xcode-version: latest-stable
# TODO: Update to latest-stable once GH installs iOS 26 simulators
xcode-version: '^16.4.0'

- name: CocoaPods Cache
uses: actions/cache@v3
Expand Down
2 changes: 2 additions & 0 deletions packages/common/src/client/watched/WatchedQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ export enum WatchedQueryListenerEvent {
ON_DATA = 'onData',
ON_ERROR = 'onError',
ON_STATE_CHANGE = 'onStateChange',
SETTINGS_WILL_UPDATE = 'settingsWillUpdate',
CLOSED = 'closed'
}

export interface WatchedQueryListener<Data> extends BaseListener {
[WatchedQueryListenerEvent.ON_DATA]?: (data: Data) => void | Promise<void>;
[WatchedQueryListenerEvent.ON_ERROR]?: (error: Error) => void | Promise<void>;
[WatchedQueryListenerEvent.ON_STATE_CHANGE]?: (state: WatchedQueryState<Data>) => void | Promise<void>;
[WatchedQueryListenerEvent.SETTINGS_WILL_UPDATE]?: () => void;
[WatchedQueryListenerEvent.CLOSED]?: () => void | Promise<void>;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { AbstractPowerSyncDatabase } from '../../../client/AbstractPowerSyncDatabase.js';
import { MetaBaseObserver } from '../../../utils/MetaBaseObserver.js';
import { WatchedQuery, WatchedQueryListener, WatchedQueryOptions, WatchedQueryState } from '../WatchedQuery.js';
import {
WatchedQuery,
WatchedQueryListener,
WatchedQueryListenerEvent,
WatchedQueryOptions,
WatchedQueryState
} from '../WatchedQuery.js';

/**
* @internal
Expand Down Expand Up @@ -62,7 +68,7 @@ export abstract class AbstractQueryProcessor<
this._closed = false;
this.state = this.constructInitialState();
this.disposeListeners = null;
this.initialized = this.init();
this.initialized = this.init(this.abortController.signal);
}

protected constructInitialState(): WatchedQueryState<Data> {
Expand All @@ -79,37 +85,57 @@ export abstract class AbstractQueryProcessor<
return this.options.watchOptions.reportFetching ?? true;
}

/**
* Updates the underlying query.
*/
async updateSettings(settings: Settings) {
this.abortController.abort();
await this.initialized;
protected async updateSettingsInternal(settings: Settings, signal: AbortSignal) {
// This may have been aborted while awaiting or if multiple calls to `updateSettings` were made
if (this._closed || signal.aborted) {
return;
}

this.options.watchOptions = settings;

this.iterateListeners((l) => l[WatchedQueryListenerEvent.SETTINGS_WILL_UPDATE]?.());

if (!this.state.isFetching && this.reportFetching) {
await this.updateState({
isFetching: true
});
}

this.options.watchOptions = settings;

this.abortController = new AbortController();
await this.runWithReporting(() =>
this.linkQuery({
abortSignal: this.abortController.signal,
abortSignal: signal,
settings
})
);
}

/**
* Updates the underlying query.
*/
async updateSettings(settings: Settings) {
// Abort the previous request
this.abortController.abort();

// Keep track of this controller's abort status
const abortController = new AbortController();
// Allow this to be aborted externally
this.abortController = abortController;

await this.initialized;
return this.updateSettingsInternal(settings, abortController.signal);
}

/**
* This method is used to link a query to the subscribers of this listener class.
* This method should perform actual query watching and report results via {@link updateState} method.
*/
protected abstract linkQuery(options: LinkQueryOptions<Data>): Promise<void>;

protected async updateState(update: Partial<MutableWatchedQueryState<Data>>) {
if (this._closed) {
return;
}

if (typeof update.error !== 'undefined') {
await this.iterateAsyncListenersWithError(async (l) => l.onError?.(update.error!));
// An error always stops for the current fetching state
Expand All @@ -128,7 +154,7 @@ export abstract class AbstractQueryProcessor<
/**
* Configures base DB listeners and links the query to listeners.
*/
protected async init() {
protected async init(signal: AbortSignal) {
const { db } = this.options;

const disposeCloseListener = db.registerListener({
Expand All @@ -153,17 +179,16 @@ export abstract class AbstractQueryProcessor<
};

// Initial setup
this.runWithReporting(async () => {
await this.updateSettings(this.options.watchOptions);
await this.runWithReporting(async () => {
await this.updateSettingsInternal(this.options.watchOptions, signal);
});
}

async close() {
await this.initialized;
this._closed = true;
this.abortController.abort();
this.disposeListeners?.();
this.disposeListeners = null;
this._closed = true;
this.iterateListeners((l) => l.closed?.());
this.listeners.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ export class DifferentialQueryProcessor<RowType>
});
}

if (this.state.error) {
partialStateUpdate.error = null;
}

if (Object.keys(partialStateUpdate).length > 0) {
await this.updateState(partialStateUpdate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ export class OnChangeQueryProcessor<Data> extends AbstractQueryProcessor<Data, W
});
}

if (this.state.error) {
partialStateUpdate.error = null;
}

if (Object.keys(partialStateUpdate).length > 0) {
await this.updateState(partialStateUpdate);
}
Expand Down
66 changes: 45 additions & 21 deletions packages/react/src/hooks/watched/useWatchedQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@ export const useWatchedQuery = <RowType = unknown>(
): QueryResult<RowType> | ReadonlyQueryResult<RowType> => {
const { query, powerSync, queryChanged, options: hookOptions, active } = options;

// This ref is used to protect against cases where `queryChanged` changes multiple times too quickly to be
// picked up by the useEffect below. This typically happens when React.StrictMode is enabled.
const queryChangeRef = React.useRef(false);
if (queryChanged && !queryChangeRef.current) {
queryChangeRef.current = true;
}

function createWatchedQuery() {
if (!active) {
return null;
Expand All @@ -42,24 +35,55 @@ export const useWatchedQuery = <RowType = unknown>(
}

const [watchedQuery, setWatchedQuery] = React.useState(createWatchedQuery);
const disposePendingUpdateListener = React.useRef<() => void | null>(null);

React.useEffect(() => {
watchedQuery?.close();
setWatchedQuery(createWatchedQuery);
const newQuery = createWatchedQuery();
setWatchedQuery(newQuery);

return () => {
disposePendingUpdateListener.current?.();
newQuery?.close();
};
}, [powerSync, active]);

// Indicates that the query will be re-fetched due to a change in the query.
// Used when `isFetching` hasn't been set to true yet due to React execution.
React.useEffect(() => {
if (queryChangeRef.current) {
watchedQuery?.updateSettings({
query,
throttleMs: hookOptions.throttleMs,
reportFetching: hookOptions.reportFetching
});
queryChangeRef.current = false;
}
}, [queryChangeRef.current]);
/**
* Indicates that the query will be re-fetched due to a change in the query.
* We execute this in-line (not using an effect) since effects are delayed till after the hook returns.
* The `queryChanged` value should only be true for a single render.
* The `updateSettings` method is asynchronous, thus it will update the state asynchronously.
* In the React hooks we'd like to report that we are fetching the data for an updated query
* as soon as the query has been updated. This prevents a result flow where e.g. the hook:
* - already returned a result: isLoading, isFetching are both false
* - the query is updated, but the state is still isFetching=false from the previous state
* We override the isFetching status while the `updateSettings` method is running (if we report `isFetching`),
* we override this value just until the `updateSettings` method itself will update the `isFetching` status.
* We achieve this by registering a `settingsWillUpdate` listener on the `WatchedQuery`. This will fire
* just before the `isFetching` status is updated.
*/
if (queryChanged) {
// Keep track of this pending operation
watchedQuery?.updateSettings({
query,
throttleMs: hookOptions.throttleMs,
reportFetching: hookOptions.reportFetching
});
// This could have been called multiple times, clear any old listeners.
disposePendingUpdateListener.current?.();
disposePendingUpdateListener.current = watchedQuery?.registerListener({
settingsWillUpdate: () => {
// We'll use the fact that we have a listener at all as an indication
disposePendingUpdateListener.current?.();
disposePendingUpdateListener.current = null;
}
});
}

return useNullableWatchedQuerySubscription(watchedQuery);
const shouldReportCurrentlyFetching = (hookOptions.reportFetching ?? true) && !!disposePendingUpdateListener.current;
const result = useNullableWatchedQuerySubscription(watchedQuery);
return {
...result,
isFetching: result?.isFetching || shouldReportCurrentlyFetching
};
};
Loading