Skip to content

Commit ceae348

Browse files
committed
Rejecting pending read/write operations when the database is closed for op-sqlite.
1 parent 0377d5f commit ceae348

File tree

4 files changed

+59
-38
lines changed

4 files changed

+59
-38
lines changed

.changeset/khaki-bottles-sniff.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/op-sqlite': patch
3+
---
4+
5+
Rejecting pending read/write operations when the database is closed.

packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
3232
protected writeConnection: OPSQLiteConnection | null;
3333

3434
private readQueue: Array<() => void> = [];
35+
private abortController: AbortController;
3536

3637
constructor(protected options: OPSQLiteAdapterOptions) {
3738
super();
@@ -40,6 +41,7 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
4041
this.locks = new Lock();
4142
this.readConnections = null;
4243
this.writeConnection = null;
44+
this.abortController = new AbortController();
4345
this.initialized = this.init();
4446
}
4547

@@ -155,6 +157,10 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
155157

156158
close() {
157159
this.initialized.then(() => {
160+
// Abort any pending operations
161+
this.abortController.abort();
162+
this.readQueue = [];
163+
158164
this.writeConnection!.close();
159165
this.readConnections!.forEach((c) => c.connection.close());
160166
});
@@ -203,17 +209,30 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
203209

204210
return new Promise(async (resolve, reject) => {
205211
try {
212+
// Set up abort signal listener
213+
const abortListener = () => {
214+
reject(new Error('Database connection was closed'));
215+
};
216+
this.abortController.signal.addEventListener('abort', abortListener);
217+
206218
await this.locks
207219
.acquire(
208220
LockType.WRITE,
209221
async () => {
222+
// Check if operation was aborted before executing
223+
if (this.abortController.signal.aborted) {
224+
reject(new Error('Database connection was closed'));
225+
}
210226
resolve(await fn(this.writeConnection!));
211227
},
212228
{ timeout: options?.timeoutMs }
213229
)
214230
.then(() => {
215231
// flush updates once a write lock has been released
216232
this.writeConnection!.flushUpdates();
233+
})
234+
.finally(() => {
235+
this.abortController.signal.removeEventListener('abort', abortListener);
217236
});
218237
} catch (ex) {
219238
reject(ex);

tools/powersynctests/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pnpm android
4646
### iOS
4747

4848
```sh
49-
detox test --configuration ios.sim.debug
49+
pnpm detox test --configuration ios.sim.debug
5050
```
5151

5252
### Android

tools/powersynctests/src/tests/queries.test.ts

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -575,42 +575,39 @@ export function registerBaseTests() {
575575
expect(duration).lessThan(2000);
576576
});
577577

578-
// // TODO: Behaviour differs between RNQS and OP-sqlite. Might need to fix this one in the OP-sqlite package
579-
// it('Should handle multiple closes', async () => {
580-
// // // Bulk insert 50000 rows without using a transaction
581-
// const bulkInsertCommands = [];
582-
// const statement = `INSERT INTO t1(id, c) VALUES(?, ?)`;
583-
584-
// for (let i = 0; i < 50000; i++) {
585-
// bulkInsertCommands.push([[i + 1, `value${i + 1}`]]);
586-
// }
587-
588-
// await db.executeBatch(statement, bulkInsertCommands);
589-
// db.close();
590-
591-
// for (let i = 1; i < 10; i++) {
592-
// db = createDatabase();
593-
594-
// // ensure a regular query works
595-
// const pExecute = await db.execute(`SELECT * FROM t1 `);
596-
// expect(pExecute.rows?.length).to.equal(50000);
597-
598-
// // Queue a bunch of write locks, these will fail due to the db being closed
599-
// // before they are accepted.
600-
// const tests = [
601-
// db.execute(`SELECT * FROM t1 `),
602-
// db.execute(`SELECT * FROM t1 `),
603-
// db.execute(`SELECT * FROM t1 `),
604-
// db.execute(`SELECT * FROM t1 `),
605-
// ];
606-
607-
// await db.close();
608-
609-
// // const results = await Promise.allSettled(tests);
610-
// // expect(results.map(r => r.status)).deep.equal(
611-
// // Array(tests.length).fill('rejected'),
612-
// // );
613-
// }
614-
// });
578+
it('Should handle multiple closes', async () => {
579+
// Bulk insert 10000 rows without using a transaction
580+
const bulkInsertCommands = [];
581+
const statement = `INSERT INTO t1(id, c) VALUES(uuid(), ?)`;
582+
583+
for (let i = 0; i < 10000; i++) {
584+
bulkInsertCommands.push([[`value${i + 1}`]]);
585+
}
586+
587+
await db.executeBatch(statement, bulkInsertCommands);
588+
db.close();
589+
590+
for (let i = 1; i < 10; i++) {
591+
db = createDatabase();
592+
593+
// ensure a regular query works
594+
const pExecute = await db.execute(`SELECT * FROM t1 `);
595+
expect(pExecute.rows?.length).to.equal(10000);
596+
597+
// Queue a bunch of write locks, these will fail due to the db being closed
598+
// before they are accepted.
599+
const tests = [
600+
db.execute(`SELECT * FROM t1 `),
601+
db.execute(`SELECT * FROM t1 `),
602+
db.execute(`SELECT * FROM t1 `),
603+
db.execute(`SELECT * FROM t1 `)
604+
];
605+
606+
db.close();
607+
608+
const results = await Promise.allSettled(tests);
609+
expect(results.map((r) => r.status)).deep.equal(Array(tests.length).fill('rejected'));
610+
}
611+
});
615612
});
616613
}

0 commit comments

Comments
 (0)