Skip to content

Commit 28ebe48

Browse files
cleanup config sharing
1 parent d197e12 commit 28ebe48

File tree

8 files changed

+100
-105
lines changed

8 files changed

+100
-105
lines changed

packages/web/src/db/adapters/AsyncDatabaseConnection.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@ export type OnTableChangeCallback = (event: BatchedUpdateNotification) => void;
1717
* An async Database connection which provides basic async SQL methods.
1818
* This is usually a proxied through a web worker.
1919
*/
20-
export interface AsyncDatabaseConnection {
20+
export interface AsyncDatabaseConnection<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> {
2121
init(): Promise<void>;
2222
close(): Promise<void>;
2323
execute(sql: string, params?: any[]): Promise<ProxiedQueryResult>;
2424
executeBatch(sql: string, params?: any[]): Promise<ProxiedQueryResult>;
2525
registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void>;
26+
getConfig(): Promise<Config>;
2627
}
2728

28-
export type OpenAsyncDatabaseConnection<Options extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> = (
29-
options: Options
29+
export type OpenAsyncDatabaseConnection<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> = (
30+
config: Config
3031
) => AsyncDatabaseConnection;

packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { getNavigatorLocks } from '../..//shared/navigator';
1313
import { AsyncDatabaseConnection } from './AsyncDatabaseConnection';
1414
import { SharedConnectionWorker, WebDBAdapter } from './WebDBAdapter';
1515
import { WorkerWrappedAsyncDatabaseConnection } from './WorkerWrappedAsyncDatabaseConnection';
16+
import { ResolvedWebSQLOpenOptions } from './web-sql-flags';
1617

1718
/**
1819
* @internal
@@ -44,6 +45,7 @@ export class LockedAsyncDatabaseAdapter
4445
protected initPromise: Promise<void>;
4546
private _db: AsyncDatabaseConnection | null = null;
4647
protected _disposeTableChangeListener: (() => void) | null = null;
48+
private _config: ResolvedWebSQLOpenOptions | null = null;
4749

4850
constructor(protected options: LockedAsyncDatabaseAdapterOptions) {
4951
super();
@@ -83,23 +85,6 @@ export class LockedAsyncDatabaseAdapter
8385
return this._dbIdentifier;
8486
}
8587

86-
async shareConnection(): Promise<SharedConnectionWorker> {
87-
if (false == this._db instanceof WorkerWrappedAsyncDatabaseConnection) {
88-
throw new Error(`Only worker connections can be shared`);
89-
}
90-
return this._db.shareConnection();
91-
}
92-
93-
/**
94-
* Registers a table change notification callback with the base database.
95-
* This can be extended by custom implementations in order to handle proxy events.
96-
*/
97-
protected async registerOnChangeListener(db: AsyncDatabaseConnection) {
98-
this._disposeTableChangeListener = await db.registerOnTableChange((event) => {
99-
this.iterateListeners((cb) => cb.tablesUpdated?.(event));
100-
});
101-
}
102-
10388
/**
10489
* Init is automatic, this helps catch errors or explicitly await initialization
10590
*/
@@ -110,15 +95,40 @@ export class LockedAsyncDatabaseAdapter
11095
protected async _init() {
11196
this._db = await this.options.openConnection();
11297
await this._db.init();
98+
this._config = await this._db.getConfig();
11399
await this.registerOnChangeListener(this._db);
114100
this.iterateListeners((cb) => cb.initialized?.());
115101
}
116102

103+
getConfiguration(): ResolvedWebSQLOpenOptions {
104+
if (!this._config) {
105+
throw new Error(`Cannot get config before initialization is completed`);
106+
}
107+
return this._config;
108+
}
109+
117110
protected async waitForInitialized() {
118111
// Awaiting this will expose errors on function calls like .execute etc
119112
await this.initPromise;
120113
}
121114

115+
async shareConnection(): Promise<SharedConnectionWorker> {
116+
if (false == this._db instanceof WorkerWrappedAsyncDatabaseConnection) {
117+
throw new Error(`Only worker connections can be shared`);
118+
}
119+
return this._db.shareConnection();
120+
}
121+
122+
/**
123+
* Registers a table change notification callback with the base database.
124+
* This can be extended by custom implementations in order to handle proxy events.
125+
*/
126+
protected async registerOnChangeListener(db: AsyncDatabaseConnection) {
127+
this._disposeTableChangeListener = await db.registerOnTableChange((event) => {
128+
this.iterateListeners((cb) => cb.tablesUpdated?.(event));
129+
});
130+
}
131+
122132
/**
123133
* This is currently a no-op on web
124134
*/

packages/web/src/db/adapters/WebDBAdapter.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { DBAdapter } from '@powersync/common';
2+
import { ResolvedWebSQLOpenOptions } from './web-sql-flags';
23

34
export type SharedConnectionWorker = {
45
identifier: string;
@@ -10,4 +11,10 @@ export interface WebDBAdapter extends DBAdapter {
1011
* Get a MessagePort which can be used to share the internals of this connection.
1112
*/
1213
shareConnection(): Promise<SharedConnectionWorker>;
14+
15+
/**
16+
* Get the config options used to open this connection.
17+
* This is useful for sharing connections.
18+
*/
19+
getConfiguration(): ResolvedWebSQLOpenOptions;
1320
}

packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,8 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
6868
executeBatch(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
6969
return this.baseConnection.executeBatch(sql, params);
7070
}
71+
72+
getConfig(): Promise<ResolvedWebSQLOpenOptions> {
73+
return this.baseConnection.getConfig();
74+
}
7175
}

packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import * as SQLite from '@journeyapps/wa-sqlite';
22
import { BaseObserver, BatchedUpdateNotification } from '@powersync/common';
33
import { Mutex } from 'async-mutex';
44
import { AsyncDatabaseConnection, OnTableChangeCallback, ProxiedQueryResult } from '../AsyncDatabaseConnection';
5-
import { ResolvedWebSQLOpenOptions } from '../web-sql-flags';
5+
import { ResolvedWASQLiteOpenFactoryOptions } from './WASQLiteOpenFactory';
66

77
/**
88
* List of currently tested virtual filesystems
@@ -45,13 +45,6 @@ export type WASQLiteModuleFactory = (
4545
options: WASQLiteModuleFactoryOptions
4646
) => Promise<{ module: SQLiteModule; vfs: SQLiteVFS }>;
4747

48-
/**
49-
* @internal
50-
*/
51-
export interface WASQLiteOpenOptions extends ResolvedWebSQLOpenOptions {
52-
vfs?: WASQLiteVFS;
53-
}
54-
5548
/**
5649
* @internal
5750
*/
@@ -106,7 +99,10 @@ export const DEFAULT_MODULE_FACTORIES = {
10699
* WA-SQLite connection which directly interfaces with WA-SQLite.
107100
* This is usually instantiated inside a worker.
108101
*/
109-
export class WASqliteConnection extends BaseObserver<WASQLiteConnectionListener> implements AsyncDatabaseConnection {
102+
export class WASqliteConnection
103+
extends BaseObserver<WASQLiteConnectionListener>
104+
implements AsyncDatabaseConnection<ResolvedWASQLiteOpenFactoryOptions>
105+
{
110106
private _sqliteAPI: SQLiteAPI | null = null;
111107
private _dbP: number | null = null;
112108
private _moduleFactory: WASQLiteModuleFactory;
@@ -121,14 +117,14 @@ export class WASqliteConnection extends BaseObserver<WASQLiteConnectionListener>
121117
*/
122118
protected connectionId: number;
123119

124-
constructor(protected options: WASQLiteOpenOptions) {
120+
constructor(protected options: ResolvedWASQLiteOpenFactoryOptions) {
125121
super();
126122
this.updatedTables = new Set();
127123
this.updateTimer = null;
128124
this.broadcastChannel = null;
129125
this.connectionId = new Date().valueOf() + Math.random();
130126
this.statementMutex = new Mutex();
131-
this._moduleFactory = DEFAULT_MODULE_FACTORIES[this.options.vfs ?? WASQLiteVFS.IDBBatchAtomicVFS];
127+
this._moduleFactory = DEFAULT_MODULE_FACTORIES[this.options.vfs];
132128
}
133129

134130
protected get sqliteAPI() {
@@ -185,6 +181,7 @@ export class WASqliteConnection extends BaseObserver<WASQLiteConnectionListener>
185181
this._sqliteAPI = await this.openSQLiteAPI();
186182
await this.openDB();
187183
this.registerBroadcastListeners();
184+
await this.executeSingleStatement(`PRAGMA temp_store = ${this.options.temporaryStorage};`);
188185

189186
this.sqliteAPI.update_hook(this.dbP, (updateType: number, dbName: string | null, tableName: string | null) => {
190187
if (!tableName) {
@@ -195,6 +192,10 @@ export class WASqliteConnection extends BaseObserver<WASQLiteConnectionListener>
195192
});
196193
}
197194

195+
async getConfig(): Promise<ResolvedWASQLiteOpenFactoryOptions> {
196+
return this.options;
197+
}
198+
198199
fireUpdates() {
199200
this.updateTimer = null;
200201
const event: BatchedUpdateNotification = { tables: [...this.updatedTables], groupedUpdates: {}, rawUpdates: [] };

packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,11 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
9797
protected messagePort: MessagePort;
9898

9999
protected isInitialized: Promise<void>;
100+
protected dbAdapter: WebDBAdapter;
100101

101102
constructor(options: SharedWebStreamingSyncImplementationOptions) {
102103
super(options);
103-
104+
this.dbAdapter = options.db;
104105
/**
105106
* Configure or connect to the shared sync worker.
106107
* This worker will manage all syncing operations remotely.
@@ -147,7 +148,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
147148
const flags = { ...this.webOptions.flags, workers: undefined };
148149

149150
this.isInitialized = this.syncManager.setParams({
150-
dbName: this.options.identifier!,
151+
dbParams: this.dbAdapter.getConfiguration(),
151152
streamOptions: {
152153
crudUploadThrottleMs,
153154
identifier,

packages/web/src/worker/db/WASQLiteDB.worker.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
import '@journeyapps/wa-sqlite';
66
import * as Comlink from 'comlink';
77
import { AsyncDatabaseConnection } from '../../db/adapters/AsyncDatabaseConnection';
8-
import { WASQLiteOpenOptions, WASqliteConnection } from '../../db/adapters/wa-sqlite/WASQLiteConnection';
8+
import { WASqliteConnection } from '../../db/adapters/wa-sqlite/WASQLiteConnection';
9+
import { ResolvedWASQLiteOpenFactoryOptions } from '../../db/adapters/wa-sqlite/WASQLiteOpenFactory';
910
import { getNavigatorLocks } from '../../shared/navigator';
1011

1112
/**
@@ -22,10 +23,11 @@ const OPEN_DB_LOCK = 'open-wasqlite-db';
2223

2324
let nextClientId = 1;
2425

25-
const openWorkerConnection = async (options: WASQLiteOpenOptions): Promise<AsyncDatabaseConnection> => {
26+
const openWorkerConnection = async (options: ResolvedWASQLiteOpenFactoryOptions): Promise<AsyncDatabaseConnection> => {
2627
const connection = new WASqliteConnection(options);
2728
return {
2829
init: Comlink.proxy(() => connection.init()),
30+
getConfig: Comlink.proxy(() => connection.getConfig()),
2931
close: Comlink.proxy(() => connection.close()),
3032
execute: Comlink.proxy(async (sql: string, params?: any[]) => connection.execute(sql, params)),
3133
executeBatch: Comlink.proxy(async (sql: string, params?: any[]) => connection.executeBatch(sql, params)),
@@ -36,7 +38,7 @@ const openWorkerConnection = async (options: WASQLiteOpenOptions): Promise<Async
3638
};
3739
};
3840

39-
const openDBShared = async (options: WASQLiteOpenOptions): Promise<AsyncDatabaseConnection> => {
41+
const openDBShared = async (options: ResolvedWASQLiteOpenFactoryOptions): Promise<AsyncDatabaseConnection> => {
4042
// Prevent multiple simultaneous opens from causing race conditions
4143
return getNavigatorLocks().request(OPEN_DB_LOCK, async () => {
4244
const clientId = nextClientId++;

0 commit comments

Comments
 (0)