Skip to content

Commit f6b3ef4

Browse files
Improve hook implementation
1 parent 861fc2b commit f6b3ef4

File tree

10 files changed

+446
-167
lines changed

10 files changed

+446
-167
lines changed

demos/react-supabase-todolist/src/app/views/sql-console/page.tsx

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,48 @@ export type LoginFormParams = {
1111

1212
const DEFAULT_QUERY = 'SELECT * FROM lists';
1313

14-
export default function SQLConsolePage() {
15-
const inputRef = React.useRef<HTMLInputElement>();
16-
const [query, setQuery] = React.useState(DEFAULT_QUERY);
17-
const { data: querySQLResult } = useQuery(query);
18-
14+
const TableDisplay = ({ data }: { data: any[] }) => {
15+
console.log('Rendering table display', data);
1916
const queryDataGridResult = React.useMemo(() => {
20-
const firstItem = querySQLResult?.[0];
21-
console.log('running a query render');
17+
const firstItem = data?.[0];
2218
return {
2319
columns: firstItem
2420
? Object.keys(firstItem).map((field) => ({
2521
field,
2622
flex: 1
2723
}))
2824
: [],
29-
rows: querySQLResult
25+
rows: data
3026
};
31-
}, [querySQLResult]);
27+
}, [data]);
28+
29+
return (
30+
<S.QueryResultContainer>
31+
<DataGrid
32+
autoHeight={true}
33+
rows={queryDataGridResult.rows.map((r, index) => ({ ...r, id: r.id ?? index })) ?? []}
34+
columns={queryDataGridResult.columns}
35+
initialState={{
36+
pagination: {
37+
paginationModel: {
38+
pageSize: 20
39+
}
40+
}
41+
}}
42+
pageSizeOptions={[20]}
43+
disableRowSelectionOnClick
44+
/>
45+
</S.QueryResultContainer>
46+
);
47+
};
48+
export default function SQLConsolePage() {
49+
const inputRef = React.useRef<HTMLInputElement>();
50+
const [query, setQuery] = React.useState(DEFAULT_QUERY);
51+
const { data } = useQuery(query, [], { reportFetching: false });
52+
53+
React.useEffect(() => {
54+
console.log('Query result changed', data);
55+
}, [data]);
3256

3357
return (
3458
<NavigationPage title="SQL Console">
@@ -62,27 +86,7 @@ export default function SQLConsolePage() {
6286
</Button>
6387
</S.CenteredGrid>
6488
</S.CenteredGrid>
65-
66-
{queryDataGridResult ? (
67-
<S.QueryResultContainer>
68-
{queryDataGridResult.columns ? (
69-
<DataGrid
70-
autoHeight={true}
71-
rows={queryDataGridResult.rows?.map((r, index) => ({ ...r, id: r.id ?? index })) ?? []}
72-
columns={queryDataGridResult.columns}
73-
initialState={{
74-
pagination: {
75-
paginationModel: {
76-
pageSize: 20
77-
}
78-
}
79-
}}
80-
pageSizeOptions={[20]}
81-
disableRowSelectionOnClick
82-
/>
83-
) : null}
84-
</S.QueryResultContainer>
85-
) : null}
89+
<TableDisplay data={data} />
8690
</S.MainContainer>
8791
</NavigationPage>
8892
);

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -866,7 +866,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
866866
}
867867

868868
// TODO names
869-
watch2<T>(options: { sql: string; parameters?: any[]; throttleMs?: number }): WatchedQuery<T> {
869+
incrementalWatch<T>(options: {
870+
sql: string;
871+
parameters?: any[];
872+
throttleMs?: number;
873+
queryExecutor?: () => Promise<T[]>;
874+
reportFetching?: boolean;
875+
}): WatchedQuery<T> {
870876
return new WatchedQueryImpl({
871877
processor: new ComparisonQueryProcessor({
872878
db: this,
@@ -877,8 +883,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
877883
watchedQuery: {
878884
query: options.sql,
879885
parameters: options.parameters,
880-
throttleMs: options.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS
881-
// queryExecutor: todo
886+
throttleMs: options.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS,
887+
queryExecutor: options.queryExecutor,
888+
reportFetching: options.reportFetching
882889
}
883890
})
884891
});

packages/common/src/client/watched/WatchedQuery.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ export interface WatchedQueryOptions<T> {
2626
/** The minimum interval between queries. */
2727
throttleMs?: number;
2828
queryExecutor?: () => Promise<T[]>;
29+
/**
30+
* If true (default) the watched query will update its state to report
31+
* on the fetching state of the query.
32+
* Setting to false reduces the number of state changes if the fetch status
33+
* is not relevant to the consumer.
34+
*/
35+
reportFetching?: boolean;
2936
}
3037

3138
export interface WatchedQuery<T> {

packages/common/src/client/watched/processors/AbstractQueryProcessor.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ export abstract class AbstractQueryProcessor<T>
4040
this._stream = null;
4141
}
4242

43+
protected get reportFetching() {
44+
return this.options.watchedQuery.reportFetching ?? true;
45+
}
46+
4347
/**
4448
* Updates the underlaying query.
4549
*/
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { WatchedQueryResult } from '../WatchedQueryResult.js';
2+
import { AbstractQueryProcessor, LinkQueryStreamOptions } from './AbstractQueryProcessor.js';
3+
4+
/**
5+
* Uses the PowerSync onChange event to trigger watched queries.
6+
* Results are emitted on every change of the relevant tables.
7+
*/
8+
export class OnChangeQueryProcessor<T> extends AbstractQueryProcessor<T> {
9+
/**
10+
* Always returns the result set on every onChange event. Deltas are not supported by this processor.
11+
*/
12+
protected processResultSet(result: T[]): WatchedQueryResult<T> | null {
13+
return {
14+
all: result,
15+
delta: () => {
16+
throw new Error('Delta not implemented for OnChangeQueryProcessor');
17+
}
18+
};
19+
}
20+
21+
protected async linkStream(options: LinkQueryStreamOptions<T>): Promise<void> {
22+
const { db, watchedQuery } = this.options;
23+
const { stream, abortSignal } = options;
24+
25+
const tables = await db.resolveTables(watchedQuery.query, watchedQuery.parameters);
26+
27+
db.onChangeWithCallback(
28+
{
29+
onChange: async () => {
30+
console.log('onChange trigger for', this);
31+
// This fires for each change of the relevant tables
32+
try {
33+
if (this.reportFetching) {
34+
this.state.fetching = true;
35+
stream.enqueueData(this.state);
36+
}
37+
38+
let dirty = false;
39+
40+
// Always run the query if an underlaying table has changed
41+
const result = watchedQuery.queryExecutor
42+
? await watchedQuery.queryExecutor()
43+
: await db.getAll<T>(watchedQuery.query, watchedQuery.parameters);
44+
45+
if (this.reportFetching) {
46+
this.state.fetching = false;
47+
dirty = true;
48+
}
49+
50+
if (this.state.loading) {
51+
this.state.loading = false;
52+
dirty = true;
53+
}
54+
55+
// Check if the result has changed
56+
const watchedQueryResult = this.processResultSet(result);
57+
if (watchedQueryResult) {
58+
this.state.data = watchedQueryResult;
59+
this.state.lastUpdated = new Date();
60+
dirty = true;
61+
}
62+
63+
if (dirty) {
64+
stream.enqueueData(this.state);
65+
}
66+
} catch (error) {
67+
this.state.error = error;
68+
stream.enqueueData(this.state);
69+
// TODO?
70+
//stream.iterateListeners((l) => l.error?.(error));
71+
}
72+
},
73+
onError: (error) => {
74+
stream.close();
75+
stream.iterateListeners((l) => l.error?.(error));
76+
}
77+
},
78+
{
79+
signal: abortSignal,
80+
tables,
81+
throttleMs: watchedQuery.throttleMs,
82+
triggerImmediate: true // used to emit the initial state
83+
}
84+
);
85+
}
86+
}
Lines changed: 21 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,34 @@
1-
import { WatchedQueryOptions, WatchedQueryState } from '../../WatchedQuery.js';
2-
import {
3-
AbstractQueryProcessor,
4-
AbstractQueryProcessorOptions,
5-
LinkQueryStreamOptions
6-
} from '../AbstractQueryProcessor.js';
1+
import { WatchedQueryResult } from '../../WatchedQueryResult.js';
2+
import { AbstractQueryProcessorOptions } from '../AbstractQueryProcessor.js';
3+
import { OnChangeQueryProcessor } from '../OnChangeQueryProcessor.js';
74
import { WatchResultComparator } from './WatchComparator.js';
85

96
export interface ComparisonQueryProcessorOptions<T> extends AbstractQueryProcessorOptions<T> {
107
comparator: WatchResultComparator<T>;
11-
watchedQuery: WatchedQueryOptions<T>;
128
}
13-
14-
export class ComparisonQueryProcessor<T> extends AbstractQueryProcessor<T> {
15-
readonly state: WatchedQueryState<T> = {
16-
loading: true,
17-
fetching: true,
18-
error: null,
19-
lastUpdated: null,
20-
data: {
21-
all: [],
22-
delta: () => ({ added: [], removed: [], unchanged: [], updated: [] })
23-
}
24-
};
25-
9+
/**
10+
* TODO:
11+
* This currently checks if the entire result set has changed.
12+
* In some cases a deep comparison of the result might be required.
13+
* For example if result[1] is unchanged, it might be useful to keep the same object reference.
14+
*/
15+
export class ComparisonQueryProcessor<T> extends OnChangeQueryProcessor<T> {
2616
constructor(protected options: ComparisonQueryProcessorOptions<T>) {
2717
super(options);
2818
}
2919

30-
protected async linkStream(options: LinkQueryStreamOptions<T>): Promise<void> {
31-
const { db, watchedQuery } = this.options;
32-
const { stream, abortSignal } = options;
20+
protected processResultSet(result: T[]): WatchedQueryResult<T> | null {
21+
const { comparator } = this.options;
22+
const previous = this.state.data.all;
23+
const delta = comparator.compare(previous, result);
3324

34-
const tables = await db.resolveTables(watchedQuery.query, watchedQuery.parameters);
35-
36-
db.onChangeWithCallback(
37-
{
38-
onChange: async () => {
39-
console.log('onChange trigger for', this);
40-
// This fires for each change of the relevant tables
41-
try {
42-
this.state.fetching = true;
43-
stream.enqueueData(this.state);
44-
45-
// Always run the query if an underlaying table has changed
46-
const result = watchedQuery.queryExecutor
47-
? await watchedQuery.queryExecutor()
48-
: await db.getAll<T>(watchedQuery.query, watchedQuery.parameters);
49-
this.state.fetching = false;
50-
this.state.loading = false;
25+
if (delta.isEqual()) {
26+
return null; // the stream will not emit a change of data
27+
}
5128

52-
// Check if the result has changed
53-
const comparison = this.options.comparator.compare(this.state.data.all, result);
54-
if (!comparison.isEqual()) {
55-
this.state.data = {
56-
all: result,
57-
delta: () => comparison.delta() // lazy evaluation
58-
};
59-
this.state.lastUpdated = new Date();
60-
}
61-
// This is here to cancel any fetching state. Need to verify this does not cause excessive re-renders
62-
stream.enqueueData(this.state);
63-
} catch (error) {
64-
this.state.error = error;
65-
stream.enqueueData(this.state);
66-
// TODO;
67-
//stream.iterateListeners((l) => l.error?.(error));
68-
}
69-
},
70-
onError: (error) => {
71-
stream.close();
72-
stream.iterateListeners((l) => l.error?.(error));
73-
}
74-
},
75-
{
76-
signal: abortSignal,
77-
tables,
78-
throttleMs: watchedQuery.throttleMs,
79-
triggerImmediate: true
80-
}
81-
);
29+
return {
30+
all: result,
31+
delta: () => delta.delta() // lazy evaluation
32+
};
8233
}
8334
}

packages/common/src/client/watched/processors/comparison/WatchComparator.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ export abstract class AbstractWatchComparator<T> implements WatchResultComparato
3838
): void {
3939
const { validateEquality } = options;
4040

41+
if (state.currentItems.length == 0 && state.previousHashes.size == 0) {
42+
state.isEqual = true;
43+
return;
44+
}
45+
4146
if (state.resumeIndex >= state.currentItems.length) {
4247
// No more items to compare, we are done
4348
return;

packages/common/src/index.ts

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,37 @@
11
export * from './client/AbstractPowerSyncDatabase.js';
22
export * from './client/AbstractPowerSyncOpenFactory.js';
3-
export * from './client/SQLOpenFactory.js';
3+
export { compilableQueryWatch, CompilableQueryWatchHandler } from './client/compilableQueryWatch.js';
44
export * from './client/connection/PowerSyncBackendConnector.js';
55
export * from './client/connection/PowerSyncCredentials.js';
6-
export * from './client/sync/bucket/BucketStorageAdapter.js';
6+
export { MAX_OP_ID } from './client/constants.js';
77
export { runOnSchemaChange } from './client/runOnSchemaChange.js';
8-
export { CompilableQueryWatchHandler, compilableQueryWatch } from './client/compilableQueryWatch.js';
9-
export { UpdateType, CrudEntry, OpId } from './client/sync/bucket/CrudEntry.js';
10-
export * from './client/sync/bucket/SqliteBucketStorage.js';
8+
export * from './client/SQLOpenFactory.js';
9+
export * from './client/sync/bucket/BucketStorageAdapter.js';
1110
export * from './client/sync/bucket/CrudBatch.js';
11+
export { CrudEntry, OpId, UpdateType } from './client/sync/bucket/CrudEntry.js';
1212
export * from './client/sync/bucket/CrudTransaction.js';
13+
export * from './client/sync/bucket/OplogEntry.js';
14+
export * from './client/sync/bucket/OpType.js';
15+
export * from './client/sync/bucket/SqliteBucketStorage.js';
1316
export * from './client/sync/bucket/SyncDataBatch.js';
1417
export * from './client/sync/bucket/SyncDataBucket.js';
15-
export * from './client/sync/bucket/OpType.js';
16-
export * from './client/sync/bucket/OplogEntry.js';
1718
export * from './client/sync/stream/AbstractRemote.js';
1819
export * from './client/sync/stream/AbstractStreamingSyncImplementation.js';
1920
export * from './client/sync/stream/streaming-sync-types.js';
20-
export { MAX_OP_ID } from './client/constants.js';
2121

2222
export { ProgressWithOperations, SyncProgress } from './db/crud/SyncProgress.js';
2323
export * from './db/crud/SyncStatus.js';
2424
export * from './db/crud/UploadQueueStatus.js';
25-
export * from './db/schema/Schema.js';
26-
export * from './db/schema/Table.js';
25+
export * from './db/DBAdapter.js';
26+
export * from './db/schema/Column.js';
2727
export * from './db/schema/Index.js';
2828
export * from './db/schema/IndexedColumn.js';
29-
export * from './db/schema/Column.js';
29+
export * from './db/schema/Schema.js';
30+
export * from './db/schema/Table.js';
3031
export * from './db/schema/TableV2.js';
31-
export * from './db/crud/SyncStatus.js';
32-
export * from './db/crud/UploadQueueStatus.js';
33-
export * from './db/DBAdapter.js';
32+
33+
// TODO other exports
34+
export * from './client/watched/WatchedQuery.js';
3435

3536
export * from './utils/AbortOperation.js';
3637
export * from './utils/BaseObserver.js';

0 commit comments

Comments
 (0)