Skip to content

Commit c06cafe

Browse files
committed
React hooks: Support sync streams
1 parent eff8cbf commit c06cafe

File tree

13 files changed

+353
-36
lines changed

13 files changed

+353
-36
lines changed

.changeset/angry-ducks-sneeze.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
'@powersync/react-native': minor
33
'@powersync/common': minor
44
'@powersync/web': minor
5+
'@powersync/node': minor
56
---
67

78
Add alpha support for sync streams, allowing different sets of data to be synced dynamically.

.changeset/mighty-colts-rule.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/react': minor
3+
---
4+
5+
Add hooks for sync streams

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
355355
return this.waitForStatus(statusMatches, signal);
356356
}
357357

358-
private async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise<void> {
358+
/**
359+
* Waits for the first sync status for which the `status` callback returns a truthy value.
360+
*/
361+
async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise<void> {
359362
if (predicate(this.currentStatus)) {
360363
return;
361364
}
@@ -364,16 +367,21 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
364367
const dispose = this.registerListener({
365368
statusChanged: (status) => {
366369
if (predicate(status)) {
367-
dispose();
368-
resolve();
370+
abort();
369371
}
370372
}
371373
});
372374

373-
signal?.addEventListener('abort', () => {
375+
function abort() {
374376
dispose();
375377
resolve();
376-
});
378+
}
379+
380+
if (signal?.aborted) {
381+
abort();
382+
} else {
383+
signal?.addEventListener('abort', abort);
384+
}
377385
});
378386
}
379387

packages/common/src/client/ConnectionManager.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,14 +324,15 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
324324
};
325325
}
326326

327-
private get activeStreams() {
327+
/**
328+
* @internal exposed for testing
329+
*/
330+
get activeStreams() {
328331
return [...this.locallyActiveSubscriptions.values()].map((a) => ({ name: a.name, params: a.parameters }));
329332
}
330333

331334
private subscriptionsMayHaveChanged() {
332-
if (this.syncStreamImplementation) {
333-
this.syncStreamImplementation.updateSubscriptions(this.activeStreams);
334-
}
335+
this.syncStreamImplementation?.updateSubscriptions(this.activeStreams);
335336
}
336337
}
337338

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import { useEffect, useMemo, useState } from 'react';
2+
import { usePowerSync } from './PowerSyncContext.js';
3+
import {
4+
AbstractPowerSyncDatabase,
5+
SyncStatus,
6+
SyncStreamStatus,
7+
SyncStreamSubscribeOptions,
8+
SyncStreamSubscription
9+
} from '@powersync/common';
10+
import { useStatus } from './useStatus.js';
11+
12+
/**
13+
* A sync stream to subscribe to in {@link useSyncStream}.
14+
*
15+
* For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams).
16+
*/
17+
export interface UseSyncStreamOptions extends SyncStreamSubscribeOptions {
18+
/**
19+
* The name of the stream to subscribe to.
20+
*/
21+
name: string;
22+
/**
23+
* Parameters for the stream subscription. A single stream can have multiple subscriptions with different parameter
24+
* sets.
25+
*/
26+
parameters?: Record<string, any>;
27+
}
28+
29+
/**
30+
* Creates a PowerSync stream subscription. The subscription is kept alive as long as the React component calling this
31+
* function. When it unmounts, {@link SyncStreamSubscription.unsubscribe} is called
32+
*
33+
* For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams).
34+
*
35+
* @returns The status for that stream, or `null` if the stream is currently being resolved.
36+
*/
37+
export function useSyncStream(options: UseSyncStreamOptions): SyncStreamStatus | null {
38+
const { name, parameters } = options;
39+
const db = usePowerSync();
40+
const status = useStatus();
41+
const [subscription, setSubscription] = useState<SyncStreamSubscription | null>(null);
42+
43+
useEffect(() => {
44+
let active = true;
45+
let subscription: SyncStreamSubscription | null = null;
46+
47+
db.syncStream(name, parameters)
48+
.subscribe(options)
49+
.then((sub) => {
50+
if (active) {
51+
subscription = sub;
52+
setSubscription(sub);
53+
} else {
54+
sub.unsubscribe();
55+
}
56+
});
57+
58+
return () => {
59+
active = false;
60+
subscription?.unsubscribe();
61+
};
62+
}, [name, parameters]);
63+
64+
return subscription && status.forStream(subscription);
65+
}
66+
67+
/**
68+
* @internal
69+
*/
70+
export function useAllSyncStreamsHaveSynced(
71+
db: AbstractPowerSyncDatabase,
72+
streams: UseSyncStreamOptions[] | undefined
73+
): boolean {
74+
// Since streams are a user-supplied array, they will likely be different each time this function is called. We don't
75+
// want to update underlying subscriptions each time, though.
76+
const hash = useMemo(() => streams && JSON.stringify(streams), [streams]);
77+
const [synced, setHasSynced] = useState(streams == null || streams.length == 0);
78+
79+
useEffect(() => {
80+
if (streams) {
81+
setHasSynced(false);
82+
83+
const promises: Promise<SyncStreamSubscription>[] = [];
84+
const abort = new AbortController();
85+
for (const stream of streams) {
86+
promises.push(db.syncStream(stream.name, stream.parameters).subscribe(stream));
87+
}
88+
89+
// First, wait for all subscribe() calls to make all subscriptions active.
90+
Promise.all(promises).then(async (streams) => {
91+
function allHaveSynced(status: SyncStatus) {
92+
return streams.every((s) => status.forStream(s)?.subscription?.hasSynced);
93+
}
94+
95+
// Wait for the effect to be cancelled or all streams having synced.
96+
await db.waitForStatus(allHaveSynced, abort.signal);
97+
if (abort.signal.aborted) {
98+
// Was cancelled
99+
} else {
100+
// Has synced, update public state.
101+
setHasSynced(true);
102+
103+
// Wait for cancellation before clearing subscriptions.
104+
await new Promise<void>((resolve) => {
105+
abort.signal.addEventListener('abort', () => resolve());
106+
});
107+
}
108+
109+
// Effect was definitely cancelled at this point, so drop the subscriptions.
110+
for (const stream of streams) {
111+
stream.unsubscribe();
112+
}
113+
});
114+
115+
return () => abort.abort();
116+
} else {
117+
// There are no streams, so all of them have synced.
118+
setHasSynced(true);
119+
return undefined;
120+
}
121+
}, [hash]);
122+
123+
return synced;
124+
}

packages/react/src/hooks/watched/useQuery.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { useSingleQuery } from './useSingleQuery.js';
44
import { useWatchedQuery } from './useWatchedQuery.js';
55
import { AdditionalOptions, DifferentialHookOptions, QueryResult, ReadonlyQueryResult } from './watch-types.js';
66
import { constructCompatibleQuery } from './watch-utils.js';
7+
import { useAllSyncStreamsHaveSynced } from '../streams.js';
78

89
/**
910
* A hook to access the results of a watched query.
@@ -58,15 +59,20 @@ export function useQuery<RowType = any>(
5859
) {
5960
const powerSync = usePowerSync();
6061
if (!powerSync) {
61-
return { isLoading: false, isFetching: false, data: [], error: new Error('PowerSync not configured.') };
62+
return {
63+
..._loadingState,
64+
isLoading: false,
65+
error: new Error('PowerSync not configured.')
66+
};
6267
}
6368
const { parsedQuery, queryChanged } = constructCompatibleQuery(query, parameters, options);
69+
const streamsHaveSynced = useAllSyncStreamsHaveSynced(powerSync, options?.streams);
6470
const runOnce = options?.runQueryOnce == true;
6571
const single = useSingleQuery<RowType>({
6672
query: parsedQuery,
6773
powerSync,
6874
queryChanged,
69-
active: runOnce
75+
active: runOnce && streamsHaveSynced
7076
});
7177
const watched = useWatchedQuery<RowType>({
7278
query: parsedQuery,
@@ -79,8 +85,14 @@ export function useQuery<RowType = any>(
7985
// We emit new data for each table change by default.
8086
rowComparator: options.rowComparator
8187
},
82-
active: !runOnce
88+
active: !runOnce && streamsHaveSynced
8389
});
8490

85-
return runOnce ? single : watched;
91+
if (!streamsHaveSynced) {
92+
return { ..._loadingState };
93+
}
94+
95+
return (runOnce ? single : watched) ?? _loadingState;
8696
}
97+
98+
const _loadingState = { isLoading: true, isFetching: false, data: [], error: undefined };

packages/react/src/hooks/watched/watch-types.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,20 @@
1-
import { DifferentialWatchedQueryComparator, SQLOnChangeOptions } from '@powersync/common';
1+
import { DifferentialWatchedQueryComparator, SQLOnChangeOptions, SyncSubscriptionDescription } from '@powersync/common';
2+
import { UseSyncStreamOptions } from '../streams.js';
23

34
export interface HookWatchOptions extends Omit<SQLOnChangeOptions, 'signal'> {
5+
/**
6+
* An optional array of sync streams (with names and parameters) backing the query.
7+
*
8+
* When set, `useQuery` will subscribe to those streams (and automatically handle unsubscribing from them, too).
9+
* Additionally, `useQuery` will remainin in a loading state until all of the streams are marked as
10+
* {@link SyncSubscriptionDescription.hasSynced}. This ensures the query is not missing rows that haven't been
11+
* downloaded.
12+
* Note however that after an initial sync, the query will not block itself while new rows are downloading. Instead,
13+
* consistent sync snapshots will be made available as they've been processed by PowerSync.
14+
*
15+
* @experimental Sync streams are currently in alpha.
16+
*/
17+
streams?: UseSyncStreamOptions[];
418
reportFetching?: boolean;
519
}
620

packages/react/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export * from './hooks/PowerSyncContext.js';
55
export { SuspenseQueryResult } from './hooks/suspense/SuspenseQueryResult.js';
66
export { useSuspenseQuery } from './hooks/suspense/useSuspenseQuery.js';
77
export { useWatchedQuerySuspenseSubscription } from './hooks/suspense/useWatchedQuerySuspenseSubscription.js';
8+
export { useSyncStream, UseSyncStreamOptions } from './hooks/streams.js';
89
export { useStatus } from './hooks/useStatus.js';
910
export { useQuery } from './hooks/watched/useQuery.js';
1011
export { useWatchedQuerySubscription } from './hooks/watched/useWatchedQuerySubscription.js';

packages/react/tests/QueryStore.test.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { AbstractPowerSyncDatabase, SQLWatchOptions } from '@powersync/common';
22
import { beforeEach, describe, expect, it } from 'vitest';
33
import { generateQueryKey, getQueryStore, QueryStore } from '../src/QueryStore';
4-
import { openPowerSync } from './useQuery.test';
4+
import { openPowerSync } from './utils';
55

66
describe('QueryStore', () => {
77
describe('generateQueryKey', () => {

0 commit comments

Comments
 (0)