Skip to content

Commit 55dcd05

Browse files
Support single read and single write connection. Implement table update hooks.
1 parent b6afc44 commit 55dcd05

File tree

1 file changed

+58
-32
lines changed

1 file changed

+58
-32
lines changed

packages/capacitor/src/adapter/CapacitorSQLiteAdapter.ts

Lines changed: 58 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@ import { PowerSyncCore } from '../plugin/PowerSyncCore';
1616

1717
export class CapacitorSQLiteAdapter extends BaseObserver<DBAdapterListener> implements DBAdapter {
1818
protected _writeConnection: SQLiteDBConnection | null;
19+
protected _readConnection: SQLiteDBConnection | null;
1920
protected initializedPromise: Promise<void>;
2021
protected lock: Lock;
21-
// TODO update hooks
22-
protected tableUpdatesCache: Set<string>;
2322

2423
constructor(protected options: SQLOpenOptions) {
2524
super();
2625
this._writeConnection = null;
26+
this._readConnection = null;
2727
this.lock = new Lock();
28-
this.tableUpdatesCache = new Set();
2928
this.initializedPromise = this.init();
3029
}
3130

@@ -36,37 +35,51 @@ export class CapacitorSQLiteAdapter extends BaseObserver<DBAdapterListener> impl
3635
return this._writeConnection;
3736
}
3837

38+
protected get readConnection(): SQLiteDBConnection {
39+
if (!this._readConnection) {
40+
throw new Error('Init not completed yet');
41+
}
42+
return this._readConnection;
43+
}
44+
3945
private async init() {
4046
await PowerSyncCore.registerCore();
4147
const sqlite = new SQLiteConnection(CapacitorSQLite);
42-
try {
43-
const existing = await sqlite.isConnection(this.options.dbFilename, false);
44-
if (existing.result) {
45-
await sqlite.closeConnection(this.options.dbFilename, false);
46-
}
47-
} catch (ex) {
48-
console.error('Error retrieving existing connection:', ex);
48+
for (const isReadOnly of [true, false]) {
49+
// Close any existing native connection if it already exists.
50+
// It seems like the isConnection and retrieveConnection methods
51+
// only check a JS side map of connections.
52+
// On hot reload this JS cache can be cleared, while the connection
53+
// still exists natively. and `createConnection` will fail if it already exists.
54+
await sqlite.closeConnection(this.options.dbFilename, isReadOnly).catch(() => {});
4955
}
5056

57+
// TODO support encryption eventually
5158
this._writeConnection = await sqlite.createConnection(this.options.dbFilename, false, 'no-encryption', 1, false);
59+
this._readConnection = await sqlite.createConnection(this.options.dbFilename, true, 'no-encryption', 1, true);
60+
61+
// TODO validate WAL mode
5262
await this._writeConnection.open();
63+
await this._readConnection.open();
64+
65+
this.writeConnection.query("SELECT powersync_update_hooks('install')");
5366
}
5467

5568
async close(): Promise<void> {
5669
await this.initializedPromise;
5770
await this.writeConnection.close();
71+
await this.readConnection.close();
5872
}
5973
get name() {
6074
return this.options.dbFilename;
6175
}
6276

63-
protected generateLockContext(): LockContext {
77+
protected generateLockContext(db: SQLiteDBConnection): LockContext {
6478
const execute = async (query: string, params: any[] = []): Promise<QueryResult> => {
65-
await this.initializedPromise;
66-
const db = this.writeConnection;
6779
// TODO verify transactions
6880
// AND handle this better. This driver does not support returning results
6981
// for execute methods
82+
7083
if (query.toLowerCase().trim().startsWith('select')) {
7184
let result = await db.query(query, params);
7285
let arrayResult = result.values ?? [];
@@ -94,8 +107,6 @@ export class CapacitorSQLiteAdapter extends BaseObserver<DBAdapterListener> impl
94107
};
95108

96109
const executeQuery = async (query: string, params?: any[]): Promise<QueryResult> => {
97-
await this.initializedPromise;
98-
const db = this.writeConnection;
99110
let result = await db.query(query, params);
100111

101112
let arrayResult = result.values ?? [];
@@ -150,25 +161,29 @@ export class CapacitorSQLiteAdapter extends BaseObserver<DBAdapterListener> impl
150161
}
151162

152163
async executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
153-
await this.initializedPromise;
154-
let result = await this.writeConnection.executeSet(
155-
params.map((param) => ({
156-
statement: query,
157-
values: param
158-
}))
159-
);
164+
return this.writeLock(async (tx) => {
165+
let result = await this.writeConnection.executeSet(
166+
params.map((param) => ({
167+
statement: query,
168+
values: param
169+
}))
170+
);
160171

161-
return {
162-
rowsAffected: result.changes?.changes ?? 0,
163-
insertId: result.changes?.lastId
164-
};
172+
return {
173+
rowsAffected: result.changes?.changes ?? 0,
174+
insertId: result.changes?.lastId
175+
};
176+
});
165177
}
166178

167179
/**
168180
* We're not using separate read/write locks here because we can't implement connection pools on top of SQL.js.
169181
*/
170182
readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
171-
return this.writeLock(fn, options);
183+
return this.lock.acquire('read_lock', async () => {
184+
await this.initializedPromise;
185+
return await fn(this.generateLockContext(this.readConnection));
186+
});
172187
}
173188

174189
readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
@@ -178,16 +193,21 @@ export class CapacitorSQLiteAdapter extends BaseObserver<DBAdapterListener> impl
178193
}
179194

180195
writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
181-
return this.lock.acquire('lock', async () => {
196+
return this.lock.acquire('write_lock', async () => {
182197
await this.initializedPromise;
183-
const result = await fn(this.generateLockContext());
198+
const result = await fn(this.generateLockContext(this.writeConnection));
184199

200+
// Fetch table updates
201+
const updates = await this.writeConnection.query("SELECT powersync_update_hooks('get') AS table_name");
202+
const jsonUpdates = updates.values?.[0];
203+
if (!jsonUpdates || !jsonUpdates.table_name) {
204+
throw new Error('Could not fetch table updates');
205+
}
185206
const notification: BatchedUpdateNotification = {
186207
rawUpdates: [],
187-
tables: Array.from(this.tableUpdatesCache),
208+
tables: JSON.parse(jsonUpdates.table_name),
188209
groupedUpdates: {}
189210
};
190-
this.tableUpdatesCache.clear();
191211
this.iterateListeners((l) => l.tablesUpdated?.(notification));
192212
return result;
193213
});
@@ -200,7 +220,13 @@ export class CapacitorSQLiteAdapter extends BaseObserver<DBAdapterListener> impl
200220
}
201221

202222
refreshSchema(): Promise<void> {
203-
return this.get("PRAGMA table_info('sqlite_master')");
223+
return this.writeLock(async (writeTx) => {
224+
return this.readLock(async (readTx) => {
225+
const updateQuery = `PRAGMA table_info('sqlite_master')`;
226+
await writeTx.get(updateQuery);
227+
await readTx.get(updateQuery);
228+
});
229+
});
204230
}
205231

206232
getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {

0 commit comments

Comments
 (0)