Skip to content

Commit d6290a4

Browse files
committed
Don't wait for streams by default
1 parent 437e90b commit d6290a4

File tree

3 files changed

+48
-15
lines changed

3 files changed

+48
-15
lines changed

packages/react/src/hooks/streams.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
SyncStreamSubscription
99
} from '@powersync/common';
1010
import { useStatus } from './useStatus.js';
11+
import { QuerySyncStreamOptions } from './watched/watch-types.js';
1112

1213
/**
1314
* A sync stream to subscribe to in {@link useSyncStream}.
@@ -51,12 +52,15 @@ export function useSyncStream(options: UseSyncStreamOptions): SyncStreamStatus |
5152
subscription = sub;
5253
setSubscription(sub);
5354
} else {
55+
// The cleanup function already ran, unsubscribe immediately.
5456
sub.unsubscribe();
5557
}
5658
});
5759

5860
return () => {
5961
active = false;
62+
// If we don't have a subscription yet, it'll still get cleaned up once the promise resolves because we've set
63+
// active to false.
6064
subscription?.unsubscribe();
6165
};
6266
}, [name, parameters]);
@@ -69,12 +73,12 @@ export function useSyncStream(options: UseSyncStreamOptions): SyncStreamStatus |
6973
*/
7074
export function useAllSyncStreamsHaveSynced(
7175
db: AbstractPowerSyncDatabase,
72-
streams: UseSyncStreamOptions[] | undefined
76+
streams: QuerySyncStreamOptions[] | undefined
7377
): boolean {
7478
// Since streams are a user-supplied array, they will likely be different each time this function is called. We don't
7579
// want to update underlying subscriptions each time, though.
7680
const hash = useMemo(() => streams && JSON.stringify(streams), [streams]);
77-
const [synced, setHasSynced] = useState(streams == null || streams.length == 0);
81+
const [synced, setHasSynced] = useState(streams == null || streams.every((e) => e.waitForStream != true));
7882

7983
useEffect(() => {
8084
if (streams) {
@@ -87,9 +91,12 @@ export function useAllSyncStreamsHaveSynced(
8791
}
8892

8993
// First, wait for all subscribe() calls to make all subscriptions active.
90-
Promise.all(promises).then(async (streams) => {
94+
Promise.all(promises).then(async (resolvedStreams) => {
9195
function allHaveSynced(status: SyncStatus) {
92-
return streams.every((s) => status.forStream(s)?.subscription?.hasSynced);
96+
return resolvedStreams.every((s, i) => {
97+
const request = streams[i];
98+
return !request.waitForStream || status.forStream(s)?.subscription?.hasSynced;
99+
});
93100
}
94101

95102
// Wait for the effect to be cancelled or all streams having synced.
@@ -107,7 +114,7 @@ export function useAllSyncStreamsHaveSynced(
107114
}
108115

109116
// Effect was definitely cancelled at this point, so drop the subscriptions.
110-
for (const stream of streams) {
117+
for (const stream of resolvedStreams) {
111118
stream.unsubscribe();
112119
}
113120
});

packages/react/src/hooks/watched/watch-types.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,30 @@ export interface HookWatchOptions extends Omit<SQLOnChangeOptions, 'signal'> {
66
* An optional array of sync streams (with names and parameters) backing the query.
77
*
88
* When set, `useQuery` will subscribe to those streams (and automatically handle unsubscribing from them, too).
9-
* Additionally, `useQuery` will remainin in a loading state until all of the streams are marked as
10-
* {@link SyncSubscriptionDescription.hasSynced}. This ensures the query is not missing rows that haven't been
11-
* downloaded.
9+
*
10+
* If {@link QuerySyncStreamOptions} is set on a stream, `useQuery` will remain in a loading state until that stream
11+
* is marked as {@link SyncSubscriptionDescription.hasSynced}. This ensures the query is not missing rows that haven't
12+
* been downloaded.
1213
* Note however that after an initial sync, the query will not block itself while new rows are downloading. Instead,
1314
* consistent sync snapshots will be made available as they've been processed by PowerSync.
1415
*
1516
* @experimental Sync streams are currently in alpha.
1617
*/
17-
streams?: UseSyncStreamOptions[];
18+
streams?: QuerySyncStreamOptions[];
1819
reportFetching?: boolean;
1920
}
2021

22+
/**
23+
* Additional options to control how `useQuery` behaves when subscribing to a stream.
24+
*/
25+
export interface QuerySyncStreamOptions extends UseSyncStreamOptions {
26+
/**
27+
* When set to `true`, a `useQuery` hook will remain in a loading state as long as the stream is resolving or
28+
* downloading for the first time (in other words, until {@link SyncSubscriptionDescription.hasSynced} is true).
29+
*/
30+
waitForStream?: boolean;
31+
}
32+
2133
export interface AdditionalOptions extends HookWatchOptions {
2234
runQueryOnce?: boolean;
2335
}

packages/react/tests/streams.test.tsx

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { openPowerSync } from './utils';
66
import { PowerSyncContext } from '../src/hooks/PowerSyncContext';
77
import { useSyncStream, UseSyncStreamOptions } from '../src/hooks/streams';
88
import { useQuery } from '../src/hooks/watched/useQuery';
9+
import { QuerySyncStreamOptions } from '../src/hooks/watched/watch-types';
910

1011
describe('stream hooks', () => {
1112
let db: AbstractPowerSyncDatabase;
@@ -51,10 +52,13 @@ describe('stream hooks', () => {
5152
expect(currentStreams()).toStrictEqual([]);
5253
});
5354

54-
it('useQuery', async () => {
55-
const { result } = renderHook(() => useQuery('SELECT 1', [], { streams: [{ name: 'a' }] }), {
56-
wrapper: testWrapper
57-
});
55+
it('useQuery waiting on stream', async () => {
56+
const { result } = renderHook(
57+
() => useQuery('SELECT 1', [], { streams: [{ name: 'a', waitForStream: true }] }),
58+
{
59+
wrapper: testWrapper
60+
}
61+
);
5862
expect(result.current).toMatchObject({ isLoading: true });
5963
// Including the stream should subscribe.
6064
await waitFor(() => expect(currentStreams()).toHaveLength(1), { timeout: 1000, interval: 100 });
@@ -68,6 +72,16 @@ describe('stream hooks', () => {
6872
await waitFor(() => expect(result.current.data).toHaveLength(1), { timeout: 1000, interval: 100 });
6973
});
7074

75+
it('useQuery not waiting on stream', async () => {
76+
// By default, it should still run the query immediately instead of waiting for the stream to resolve.
77+
const { result } = renderHook(() => useQuery('SELECT 1', [], { streams: [{ name: 'a' }] }), {
78+
wrapper: testWrapper
79+
});
80+
81+
// Not resolving the subscription.
82+
await waitFor(() => expect(result.current.data).toHaveLength(1), { timeout: 1000, interval: 100 });
83+
});
84+
7185
it('unsubscribes on unmount', async () => {
7286
const { unmount } = renderHook(() => useQuery('SELECT 1', [], { streams: [{ name: 'a' }, { name: 'b' }] }), {
7387
wrapper: testWrapper
@@ -80,7 +94,7 @@ describe('stream hooks', () => {
8094

8195
it('handles stream parameter changes', async () => {
8296
// Start without streams
83-
let streams: UseSyncStreamOptions[] = [];
97+
let streams: QuerySyncStreamOptions[] = [];
8498
let streamUpdateListeners: (() => void)[] = [];
8599

86100
const { result } = renderHook(
@@ -108,7 +122,7 @@ describe('stream hooks', () => {
108122
await waitFor(() => expect(result.current.data).toHaveLength(1), { timeout: 1000, interval: 100 });
109123

110124
// Adopt streams - this should reset back to loading
111-
streams = [{ name: 'a' }];
125+
streams = [{ name: 'a', waitForStream: true }];
112126
act(() => streamUpdateListeners.forEach((cb) => cb()));
113127
expect(result.current).toMatchObject({ isLoading: true });
114128

0 commit comments

Comments
 (0)