Skip to content

Commit 83e99e2

Browse files
wip: query interface
1 parent 69f7394 commit 83e99e2

File tree

9 files changed

+166
-155
lines changed

9 files changed

+166
-155
lines changed

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import {
3232
type PowerSyncConnectionOptions,
3333
type RequiredAdditionalConnectionOptions
3434
} from './sync/stream/AbstractStreamingSyncImplementation.js';
35-
import { WatchedQuery } from './watched/WatchedQuery.js';
35+
import { WatchedQuery, WatchedQueryOptions } from './watched/WatchedQuery.js';
3636
import { OnChangeQueryProcessor, WatchedQueryComparator } from './watched/processors/OnChangeQueryProcessor.js';
3737

3838
export interface DisconnectAndClearOptions {
@@ -871,30 +871,25 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
871871
return this.watchWithAsyncGenerator(sql, parameters, options);
872872
}
873873

874-
// TODO names
875-
incrementalWatch<DataType>(options: {
876-
sql: string;
877-
parameters?: any[];
878-
throttleMs?: number;
879-
customExecutor?: {
880-
initialData: DataType;
881-
execute: () => Promise<DataType>;
882-
};
883-
reportFetching?: boolean;
884-
}): WatchedQuery<DataType> {
885-
return new OnChangeQueryProcessor({
886-
db: this,
887-
comparator: {
888-
checkEquality: (a, b) => JSON.stringify(a) == JSON.stringify(b)
889-
},
890-
query: {
891-
sql: options.sql,
892-
parameters: options.parameters,
893-
throttleMs: options.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS,
894-
customExecutor: options.customExecutor,
895-
reportFetching: options.reportFetching
896-
}
897-
});
874+
// TODO names and types
875+
incrementalWatch<DataType>(
876+
options: { watchOptions: WatchedQueryOptions<DataType> } & {
877+
mode: 'comparison';
878+
comparator?: WatchedQueryComparator<DataType>;
879+
}
880+
): WatchedQuery<DataType> {
881+
switch (options.mode) {
882+
case 'comparison':
883+
return new OnChangeQueryProcessor({
884+
db: this,
885+
comparator: options.comparator ?? {
886+
checkEquality: (a, b) => JSON.stringify(a) == JSON.stringify(b)
887+
},
888+
watchOptions: options.watchOptions
889+
});
890+
default:
891+
throw new Error(`Invalid mode specified ${options.mode}`);
892+
}
898893
}
899894

900895
/**
@@ -919,18 +914,17 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
919914
db: this,
920915
// Comparisons are disabled if no comparator is provided
921916
comparator: options?.comparator,
922-
query: {
923-
sql,
924-
parameters,
917+
watchOptions: {
918+
placeholderData: null,
919+
query: {
920+
compile: () => ({
921+
sql: sql,
922+
parameters: parameters ?? []
923+
}),
924+
execute: () => this.executeReadOnly(sql, parameters)
925+
},
925926
throttleMs: options?.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS,
926-
reportFetching: false,
927-
// The default watch implementation returns QueryResult as the Data type
928-
customExecutor: {
929-
execute: async () => {
930-
return this.executeReadOnly(sql, parameters);
931-
},
932-
initialData: null
933-
}
927+
reportFetching: false
934928
}
935929
});
936930

packages/common/src/client/watched/WatchedQuery.ts

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1+
import { CompiledQuery } from 'src/types/types.js';
12
import { BaseListener, BaseObserverInterface } from '../../utils/BaseObserver.js';
23

34
export interface WatchedQueryState<Data> {
45
/**
5-
* Indicates the initial loading state (hard loading). Loading becomes false once the first set of results from the watched query is available or an error occurs.
6+
* Indicates the initial loading state (hard loading).
7+
* Loading becomes false once the first set of results from the watched query is available or an error occurs.
68
*/
79
isLoading: boolean;
810
/**
9-
* Indicates whether the query is currently fetching data, is true during the initial load and any time when the query is re-evaluating (useful for large queries).
11+
* Indicates whether the query is currently fetching data, is true during the initial load
12+
* and any time when the query is re-evaluating (useful for large queries).
1013
*/
1114
isFetching: boolean;
1215
/**
@@ -23,23 +26,29 @@ export interface WatchedQueryState<Data> {
2326
data: Data;
2427
}
2528

29+
/**
30+
* @internal
31+
* Similar to {@link CompatibleQuery}, except the `execute` method
32+
* does not enforce an Array result type.
33+
*/
34+
export interface WatchCompatibleQuery<ResultType> {
35+
execute(compiled: CompiledQuery): Promise<ResultType>;
36+
compile(): CompiledQuery;
37+
}
38+
2639
/**
2740
* @internal
2841
*/
2942
export interface WatchedQueryOptions<DataType> {
30-
sql: string;
31-
parameters?: any[];
32-
/** The minimum interval between queries. */
33-
throttleMs?: number;
43+
query: WatchCompatibleQuery<DataType>;
44+
3445
/**
35-
* Optional query executor responsible for executing the query.
36-
* This can be used to return query results which are mapped from the database.
37-
* Often this is useful for ORM queries or other query builders.
46+
* Initial result data which is presented while the initial loading is executing
3847
*/
39-
customExecutor?: {
40-
execute: () => Promise<DataType>;
41-
initialData: DataType;
42-
};
48+
placeholderData: DataType;
49+
50+
/** The minimum interval between queries. */
51+
throttleMs?: number;
4352
/**
4453
* If true (default) the watched query will update its state to report
4554
* on the fetching state of the query.
@@ -87,10 +96,10 @@ export interface WatchedQuery<Data> extends BaseObserverInterface<WatchedQueryLi
8796
subscribe(subscription: WatchedQuerySubscription<Data>): () => void;
8897

8998
/**
90-
* Updates the underlaying query.
99+
* Updates the underlaying query options.
91100
* This will trigger a re-evaluation of the query and update the state.
92101
*/
93-
updateQuery(query: WatchedQueryOptions<Data>): Promise<void>;
102+
updateSettings(options: WatchedQueryOptions<Data>): Promise<void>;
94103

95104
/**
96105
* Close the watched query and end all subscriptions.

packages/common/src/client/watched/processors/AbstractQueryProcessor.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import {
1515
*/
1616
export interface AbstractQueryProcessorOptions<Data> {
1717
db: AbstractPowerSyncDatabase;
18-
query: WatchedQueryOptions<Data>;
18+
watchOptions: WatchedQueryOptions<Data>;
1919
}
2020

2121
/**
@@ -64,22 +64,22 @@ export abstract class AbstractQueryProcessor<Data = unknown[]>
6464
isFetching: this.reportFetching, // Only set to true if we will report updates in future
6565
error: null,
6666
lastUpdated: null,
67-
data: options.query.customExecutor?.initialData ?? ([] as Data)
67+
data: options.watchOptions.placeholderData
6868
};
6969
this.initialized = this.init();
7070
}
7171

7272
protected get reportFetching() {
73-
return this.options.query.reportFetching ?? true;
73+
return this.options.watchOptions.reportFetching ?? true;
7474
}
7575

7676
/**
7777
* Updates the underlaying query.
7878
*/
79-
async updateQuery(query: WatchedQueryOptions<Data>) {
79+
async updateSettings(query: WatchedQueryOptions<Data>) {
8080
await this.initialized;
8181

82-
this.options.query = query;
82+
this.options.watchOptions = query;
8383
this.abortController.abort();
8484
this.abortController = new AbortController();
8585
await this.linkQuery({
@@ -116,7 +116,7 @@ export abstract class AbstractQueryProcessor<Data = unknown[]>
116116
db.registerListener({
117117
schemaChanged: async () => {
118118
await this.runWithReporting(async () => {
119-
await this.updateQuery(this.options.query);
119+
await this.updateSettings(this.options.watchOptions);
120120
});
121121
},
122122
closing: () => {
@@ -126,7 +126,7 @@ export abstract class AbstractQueryProcessor<Data = unknown[]>
126126

127127
// Initial setup
128128
await this.runWithReporting(async () => {
129-
await this.updateQuery(this.options.query);
129+
await this.updateSettings(this.options.watchOptions);
130130
});
131131
}
132132

packages/common/src/client/watched/processors/OnChangeQueryProcessor.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,11 @@ export class OnChangeQueryProcessor<Data> extends AbstractQueryProcessor<Data> {
6262
}
6363

6464
protected async linkQuery(options: LinkQueryOptions<Data>): Promise<void> {
65-
const { db, query } = this.options;
65+
const { db, watchOptions } = this.options;
6666
const { abortSignal } = options;
6767

68-
const tables = await db.resolveTables(query.sql, query.parameters);
68+
const compiledQuery = watchOptions.query.compile();
69+
const tables = await db.resolveTables(compiledQuery.sql, compiledQuery.parameters as any[]);
6970

7071
db.onChangeWithCallback(
7172
{
@@ -79,9 +80,7 @@ export class OnChangeQueryProcessor<Data> extends AbstractQueryProcessor<Data> {
7980
const partialStateUpdate: Partial<WatchedQueryState<Data>> = {};
8081

8182
// Always run the query if an underlaying table has changed
82-
const result = query.customExecutor
83-
? await query.customExecutor.execute()
84-
: ((await db.getAll(query.sql, query.parameters)) as Data);
83+
const result = await watchOptions.query.execute(compiledQuery);
8584

8685
if (this.reportFetching) {
8786
partialStateUpdate.isFetching = false;
@@ -110,7 +109,7 @@ export class OnChangeQueryProcessor<Data> extends AbstractQueryProcessor<Data> {
110109
{
111110
signal: abortSignal,
112111
tables,
113-
throttleMs: query.throttleMs,
112+
throttleMs: watchOptions.throttleMs,
114113
triggerImmediate: true // used to emit the initial state
115114
}
116115
);

packages/react/src/QueryStore.ts

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,31 @@
1-
import { AbstractPowerSyncDatabase, WatchedQuery } from '@powersync/common';
2-
import { Query } from './WatchedQuery';
1+
import { AbstractPowerSyncDatabase, WatchCompatibleQuery, WatchedQuery } from '@powersync/common';
32
import { AdditionalOptions } from './hooks/useQuery';
43

5-
export function generateQueryKey(sqlStatement: string, parameters: any[], options: AdditionalOptions): string {
4+
export function generateQueryKey(
5+
sqlStatement: string,
6+
parameters: ReadonlyArray<unknown>,
7+
options: AdditionalOptions
8+
): string {
69
return `${sqlStatement} -- ${JSON.stringify(parameters)} -- ${JSON.stringify(options)}`;
710
}
811

912
export class QueryStore {
10-
cache = new Map<string, WatchedQuery<unknown[]>>();
13+
cache = new Map<string, WatchedQuery<unknown>>();
1114

1215
constructor(private db: AbstractPowerSyncDatabase) {}
1316

14-
getQuery(key: string, query: Query<unknown>, options: AdditionalOptions) {
17+
getQuery(key: string, query: WatchCompatibleQuery<unknown>, options: AdditionalOptions) {
1518
if (this.cache.has(key)) {
1619
return this.cache.get(key);
1720
}
1821

19-
const customExecutor = typeof query.rawQuery !== 'string' ? query.rawQuery : null;
20-
2122
const watchedQuery = this.db.incrementalWatch({
22-
sql: query.sqlStatement,
23-
parameters: query.queryParameters,
24-
customExecutor: customExecutor
25-
? {
26-
initialData: [],
27-
execute: () => customExecutor.execute()
28-
}
29-
: undefined,
30-
throttleMs: options.throttleMs
23+
mode: 'comparison',
24+
watchOptions: {
25+
query,
26+
placeholderData: [],
27+
throttleMs: options.throttleMs
28+
}
3129
});
3230

3331
const disposer = watchedQuery.registerListener({

packages/react/src/WatchedQuery.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1-
import { CompilableQuery } from '@powersync/common';
2-
3-
export class Query<T> {
4-
rawQuery: string | CompilableQuery<T>;
5-
sqlStatement: string;
6-
queryParameters: any[];
7-
}
1+
// export class Query<T> {
2+
// rawQuery: string | CompilableQuery<T>;
3+
// sqlStatement: string;
4+
// queryParameters: any[];
5+
// }
86

97
// export interface WatchedQueryListener extends BaseListener {
108
//onUpdate: () => void;

0 commit comments

Comments
 (0)