Skip to content

Commit 5c7c76e

Browse files
Invert Watched Query creation API. Update hook packages to provide differentiator implementation.
1 parent 3a6bef6 commit 5c7c76e

28 files changed

+597
-639
lines changed

.changeset/little-bananas-fetch.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
---
44

55
- Added additional listeners for `closing` and `closed` events in `AbstractPowerSyncDatabase`.
6-
- Added `incrementalWatch` API for enhanced watched queries.
6+
- Added `query` and `customQuery` APIs for enhanced watched queries.
77
- Added `triggerImmediate` option to the `onChange` API. This allows emitting an initial event which can be useful for downstream use cases.

demos/yjs-react-supabase-text-collab/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ docker run \
4444
-p 8080:8080 \
4545
-e POWERSYNC_CONFIG_B64=$(base64 -i ./powersync.yaml) \
4646
-e POWERSYNC_SYNC_RULES_B64=$(base64 -i ./sync-rules.yaml) \
47-
--env-file ./.env \
47+
--env-file ./.env.local \
4848
--network supabase_network_yjs-react-supabase-text-collab \
4949
--name my-powersync journeyapps/powersync-service:latest
5050
```

demos/yjs-react-supabase-text-collab/src/library/powersync/PowerSyncYjsProvider.ts

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import * as Y from 'yjs';
22

33
import { b64ToUint8Array, Uint8ArrayTob64 } from '@/library/binary-utils';
4-
import { AbstractPowerSyncDatabase, GetAllQuery, IncrementalWatchMode } from '@powersync/web';
4+
import { AbstractPowerSyncDatabase } from '@powersync/web';
55
import { ObservableV2 } from 'lib0/observable';
66
import { v4 as uuidv4 } from 'uuid';
77
import { DocumentUpdates } from './AppSchema';
@@ -41,22 +41,20 @@ export class PowerSyncYjsProvider extends ObservableV2<PowerSyncYjsEvents> {
4141
* This will be used to apply updates from other editors.
4242
* When we received an added item we apply the update to the Yjs document.
4343
*/
44-
const updateQuery = db.incrementalWatch({ mode: IncrementalWatchMode.DIFFERENTIAL }).build({
45-
watch: {
46-
query: new GetAllQuery<DocumentUpdates>({
47-
sql: /* sql */ `
48-
SELECT
49-
*
50-
FROM
51-
document_updates
52-
WHERE
53-
document_id = ?
54-
AND editor_id != ?
55-
`,
56-
parameters: [documentId, this.id]
57-
})
58-
}
59-
});
44+
const updateQuery = db
45+
.query<DocumentUpdates>({
46+
sql: /* sql */ `
47+
SELECT
48+
*
49+
FROM
50+
document_updates
51+
WHERE
52+
document_id = ?
53+
AND editor_id != ?
54+
`,
55+
parameters: [documentId, this.id]
56+
})
57+
.differentialWatch();
6058

6159
this.abortController.signal.addEventListener(
6260
'abort',
@@ -73,8 +71,8 @@ export class PowerSyncYjsProvider extends ObservableV2<PowerSyncYjsEvents> {
7371
let synced = false;
7472

7573
updateQuery.registerListener({
76-
onData: async (diff) => {
77-
for (const added of diff.added) {
74+
onStateChange: async () => {
75+
for (const added of updateQuery.state.diff.added) {
7876
Y.applyUpdateV2(doc, b64ToUint8Array(added.update_b64));
7977
}
8078
if (!synced) {

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import { ControlledExecutor } from '../utils/ControlledExecutor.js';
1818
import { throttleTrailing } from '../utils/async.js';
1919
import { mutexRunExclusive } from '../utils/mutex.js';
2020
import { ConnectionManager } from './ConnectionManager.js';
21+
import { CustomQuery } from './CustomQuery.js';
22+
import { ArrayQueryDefinition, Query } from './Query.js';
2123
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
2224
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
2325
import { BucketStorageAdapter, PSInternalTable } from './sync/bucket/BucketStorageAdapter.js';
@@ -33,9 +35,9 @@ import {
3335
type PowerSyncConnectionOptions,
3436
type RequiredAdditionalConnectionOptions
3537
} from './sync/stream/AbstractStreamingSyncImplementation.js';
36-
import { IncrementalWatchMode } from './watched/WatchedQueryBuilder.js';
37-
import { WatchedQueryBuilderMap } from './watched/WatchedQueryBuilderMap.js';
38-
import { FalsyComparator, WatchedQueryComparator } from './watched/processors/comparators.js';
38+
import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/WatchedQuery.js';
39+
import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
40+
import { WatchedQueryComparator } from './watched/processors/comparators.js';
3941

4042
export interface DisconnectAndClearOptions {
4143
/** When set to false, data in local-only tables is preserved. */
@@ -139,8 +141,6 @@ export const DEFAULT_POWERSYNC_CLOSE_OPTIONS: PowerSyncCloseOptions = {
139141
disconnect: true
140142
};
141143

142-
export const DEFAULT_WATCH_THROTTLE_MS = 30;
143-
144144
export const DEFAULT_POWERSYNC_DB_OPTIONS = {
145145
retryDelayMs: 5000,
146146
logger: Logger.get('PowerSyncDatabase'),
@@ -890,41 +890,59 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
890890
}
891891

892892
/**
893-
* Watch a SQL query which incrementally emits updates for result sets.
894-
* This is useful for only getting updates when the result set changes, or viewing the change in the result set over time.
895-
* @returns A {@link WatchedQueryBuilder} for the specified watch mode.
893+
* Allows defining a query which can be used to build a {@link WatchedQuery}.
894+
* The defined query will be executed with {@link AbstractPowerSyncDatabase.getAll}.
895+
* An optional mapper function can be provided to transform the results.
896896
*
897-
* For a comparison based watch, use {@link IncrementalWatchMode.COMPARISON}.
898-
* See {@link ComparisonWatchedQueryBuilder} for more details.
899897
* @example
900898
* ```javascript
901-
* const watchedQuery = powerSync
902-
* .incrementalWatch({ mode: IncrementalWatchMode.COMPARISON })
903-
* .build({
904-
* // ... Options
905-
* })
899+
* const watchedTodos = powersync.query({
900+
* sql: `SELECT photo_id as id FROM todos WHERE photo_id IS NOT NULL`,
901+
* parameters: [],
902+
* mapper: (row) => ({
903+
* ...row,
904+
* created_at: new Date(row.created_at as string)
905+
* })
906+
* })
907+
* .watch()
908+
* // OR use .differentialWatch() for fine-grained watches.
906909
* ```
910+
*/
911+
query<RowType>(query: ArrayQueryDefinition<RowType>): Query<RowType> {
912+
const { sql, parameters = [], mapper } = query;
913+
const compatibleQuery: WatchCompatibleQuery<RowType[]> = {
914+
compile: () => ({
915+
sql,
916+
parameters
917+
}),
918+
execute: async ({ sql, parameters }) => {
919+
const result = await this.getAll(sql, parameters);
920+
return mapper ? result.map(mapper) : (result as RowType[]);
921+
}
922+
};
923+
return this.customQuery(compatibleQuery);
924+
}
925+
926+
/**
927+
* Allows building a {@link WatchedQuery} using an existing {@link WatchCompatibleQuery}.
928+
* The watched query will use the provided {@link WatchCompatibleQuery.execute} method to query results.
907929
*
908-
* For a differential based watch , use {@link IncrementalWatchMode.DIFFERENTIAL}.
909-
* See {@link DifferentialWatchedQueryBuilder} for more details.
910930
* @example
911931
* ```javascript
912-
* const watchedQuery = powerSync
913-
* .incrementalWatch({ mode: IncrementalWatchMode.DIFFERENTIAL })
914-
* .build({
915-
* // ... Options
916-
* })
932+
*
933+
* // Potentially a query from an ORM like Drizzle
934+
* const query = db.select().from(lists);
935+
*
936+
* const watchedTodos = powersync.customQuery(query)
937+
* .watch()
938+
* // OR use .differentialWatch() for fine-grained watches.
917939
* ```
918940
*/
919-
incrementalWatch<Mode extends IncrementalWatchMode>(options: { mode: Mode }): WatchedQueryBuilderMap[Mode] {
920-
const { mode } = options;
921-
const builderFactory = WatchedQueryBuilderMap[mode];
922-
if (!builderFactory) {
923-
throw new Error(
924-
`Unsupported watch mode: ${mode}. Please specify one of [${Object.values(IncrementalWatchMode).join(', ')}]`
925-
);
926-
}
927-
return builderFactory(this) as WatchedQueryBuilderMap[Mode];
941+
customQuery<RowType>(query: WatchCompatibleQuery<RowType[]>): Query<RowType> {
942+
return new CustomQuery({
943+
db: this,
944+
query
945+
});
928946
}
929947

930948
/**
@@ -944,21 +962,22 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
944962
if (!onResult) {
945963
throw new Error('onResult is required');
946964
}
965+
const { comparator } = options ?? {};
947966

948-
const { comparator = FalsyComparator } = options ?? {};
949-
// This watch method which provides static SQL is currently only compatible with the comparison mode.
950-
// Uses shared incremental watch logic under the hood, but maintains the same external API as the old watch method.
951-
const watchedQuery = this.incrementalWatch({ mode: IncrementalWatchMode.COMPARISON }).build<QueryResult | null>({
967+
// This API yields a QueryResult type.
968+
// This is not a standard Array result, which makes it incompatible with the .query API.
969+
const watchedQuery = new OnChangeQueryProcessor({
970+
db: this,
952971
comparator,
953-
watch: {
972+
placeholderData: null,
973+
watchOptions: {
954974
query: {
955975
compile: () => ({
956976
sql: sql,
957977
parameters: parameters ?? []
958978
}),
959979
execute: () => this.executeReadOnly(sql, parameters)
960980
},
961-
placeholderData: null,
962981
reportFetching: false,
963982
throttleMs: options?.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS
964983
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { AbstractPowerSyncDatabase } from './AbstractPowerSyncDatabase.js';
2+
import { Query, StandardWatchedQueryOptions } from './Query.js';
3+
import { FalsyComparator } from './watched/processors/comparators.js';
4+
import {
5+
DifferentialQueryProcessor,
6+
DifferentialWatchedQueryOptions
7+
} from './watched/processors/DifferentialQueryProcessor.js';
8+
import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
9+
import { DEFAULT_WATCH_QUERY_OPTIONS, WatchCompatibleQuery, WatchedQueryOptions } from './watched/WatchedQuery.js';
10+
11+
/**
12+
* @internal
13+
*/
14+
export interface CustomQueryOptions<RowType> {
15+
db: AbstractPowerSyncDatabase;
16+
query: WatchCompatibleQuery<RowType[]>;
17+
}
18+
19+
/**
20+
* @internal
21+
*/
22+
export class CustomQuery<RowType> implements Query<RowType> {
23+
constructor(protected options: CustomQueryOptions<RowType>) {}
24+
25+
protected resolveOptions(options: WatchedQueryOptions) {
26+
return {
27+
reportFetching: options?.reportFetching ?? DEFAULT_WATCH_QUERY_OPTIONS.reportFetching,
28+
throttleMs: options?.throttleMs ?? DEFAULT_WATCH_QUERY_OPTIONS.throttleMs
29+
};
30+
}
31+
32+
watch(watchOptions: StandardWatchedQueryOptions<RowType>) {
33+
return new OnChangeQueryProcessor<RowType[]>({
34+
db: this.options.db,
35+
comparator: watchOptions?.comparator ?? FalsyComparator,
36+
placeholderData: watchOptions?.placeholderData ?? [],
37+
watchOptions: {
38+
...this.resolveOptions(watchOptions),
39+
query: this.options.query
40+
}
41+
});
42+
}
43+
44+
differentialWatch(differentialWatchOptions: DifferentialWatchedQueryOptions<RowType>) {
45+
return new DifferentialQueryProcessor<RowType>({
46+
db: this.options.db,
47+
differentiator: differentialWatchOptions?.differentiator,
48+
placeholderData: differentialWatchOptions?.placeholderData ?? [],
49+
watchOptions: {
50+
...this.resolveOptions(differentialWatchOptions),
51+
query: this.options.query
52+
}
53+
});
54+
}
55+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { ArrayComparator } from './watched/processors/comparators.js';
2+
import {
3+
DifferentialWatchedQuery,
4+
DifferentialWatchedQueryOptions
5+
} from './watched/processors/DifferentialQueryProcessor.js';
6+
import { ComparisonWatchedQuery } from './watched/processors/OnChangeQueryProcessor.js';
7+
import { WatchedQueryOptions } from './watched/WatchedQuery.js';
8+
9+
/**
10+
* Query parameters for {@link ArrayQueryDefinition.parameters}
11+
*/
12+
export type QueryParam = string | number | boolean | null | undefined | bigint | Uint8Array;
13+
14+
/**
15+
* Options for building a query with {@link AbstractPowerSyncDatabase.query}.
16+
* This query will be executed with {@link AbstractPowerSyncDatabase.getAll}.
17+
*/
18+
export interface ArrayQueryDefinition<RowType = unknown> {
19+
sql: string;
20+
parameters?: ReadonlyArray<QueryParam>;
21+
/**
22+
* Maps the raw SQLite row to a custom typed object.
23+
* @example
24+
* ```javascript
25+
* mapper: (row) => ({
26+
* ...row,
27+
* created_at: new Date(row.created_at as string),
28+
* })
29+
* ```
30+
*/
31+
mapper?: (row: Record<string, unknown>) => RowType;
32+
}
33+
34+
/**
35+
* Options for {@link Query.watch}.
36+
*/
37+
export interface StandardWatchedQueryOptions<RowType> extends WatchedQueryOptions {
38+
/**
39+
* Optional comparator which processes the items of an array of rows.
40+
* The comparator compares the result set rows by index using the {@link ArrayComparatorOptions.compareBy} function.
41+
* The comparator reports a changed result set as soon as a row does not match the previous result set.
42+
*
43+
* @example
44+
* ```javascript
45+
* comparator: new ArrayComparator({
46+
* compareBy: (item) => JSON.stringify(item)
47+
* })
48+
* ```
49+
*/
50+
comparator?: ArrayComparator<RowType>;
51+
52+
/**
53+
* The initial data state reported while the query is loading for the first time.
54+
* @default []
55+
*/
56+
placeholderData?: RowType[];
57+
}
58+
59+
export interface Query<RowType> {
60+
/**
61+
* Creates a {@link WatchedQuery} which watches and emits results of the linked query.
62+
*
63+
* By default the returned watched query will emit changes whenever a change to the underlying SQLite tables is made.
64+
* These changes might not be relevant to the query, but the query will emit a new result set.
65+
*
66+
* A {@link StandardWatchedQueryOptions.comparator} can be provided to limit the data emissions. The watched query will still
67+
* query the underlying DB on a underlying table changes, but the result will only be emitted if the comparator detects a change in the results.
68+
*
69+
* The comparator in this method is optimized and returns early as soon as it detects a change. Each data emission will correlate to a change in the result set,
70+
* but note that the result set will not maintain internal object references to the previous result set. If internal object references are needed,
71+
* consider using {@link Query.differentialWatch} instead.
72+
*/
73+
watch(options?: StandardWatchedQueryOptions<RowType>): ComparisonWatchedQuery<RowType[]>;
74+
75+
/**
76+
* Creates a {@link WatchedQuery} which watches and emits results of the linked query.
77+
*
78+
* This query method watches for changes in the underlying SQLite tables and runs the query on each table change.
79+
* The difference between the current and previous result set is computed.
80+
* The watched query will not emit changes if the result set is identical to the previous result set.
81+
* If the result set is different, the watched query will emit the new result set and provide a detailed diff of the changes.
82+
*
83+
* The deep differentiation allows maintaining result set object references between result emissions.
84+
* The {@link DifferentialWatchedQuery.state.data} array will contain the previous row references for unchanged rows.
85+
* A detailed diff of the changes can be accessed via {@link DifferentialWatchedQuery.state.diff}.
86+
*/
87+
differentialWatch(options?: DifferentialWatchedQueryOptions<RowType>): DifferentialWatchedQuery<RowType>;
88+
}

0 commit comments

Comments
 (0)