Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/loud-mangos-kick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/common': patch
'@powersync/web': patch
---

Fixed race condition in async closing of databases
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
"@actions/core": "^1.10.1",
"@changesets/cli": "2.27.2",
"@pnpm/workspace.find-packages": "^4.0.2",
"@vitest/browser": "^3.0.5",
"@vitest/browser": "^3.0.8",
"husky": "^9.0.11",
"lint-staged": "^15.2.2",
"playwright": "^1.50.1",
"playwright": "^1.51.0",
"prettier": "^3.2.5",
"typescript": "^5.7.2",
"vitest": "^3.0.5"
"vitest": "^3.0.8"
}
}
14 changes: 9 additions & 5 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ import { mutexRunExclusive } from '../utils/mutex.js';
import { throttleTrailing } from '../utils/throttle.js';
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
import { runOnSchemaChange } from './runOnSchemaChange.js';
import { BucketStorageAdapter, PSInternalTable } from './sync/bucket/BucketStorageAdapter.js';
import { CrudBatch } from './sync/bucket/CrudBatch.js';
import { CrudEntry, CrudEntryJSON } from './sync/bucket/CrudEntry.js';
import { CrudTransaction } from './sync/bucket/CrudTransaction.js';
import {
DEFAULT_CRUD_UPLOAD_THROTTLE_MS,
type AdditionalConnectionOptions,
type PowerSyncConnectionOptions,
DEFAULT_RETRY_DELAY_MS,
StreamingSyncImplementation,
StreamingSyncImplementationListener,
DEFAULT_RETRY_DELAY_MS,
type AdditionalConnectionOptions,
type PowerSyncConnectionOptions,
type RequiredAdditionalConnectionOptions
} from './sync/stream/AbstractStreamingSyncImplementation.js';
import { runOnSchemaChange } from './runOnSchemaChange.js';

export interface DisconnectAndClearOptions {
/** When set to false, data in local-only tables is preserved. */
Expand Down Expand Up @@ -505,13 +505,17 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
async close(options: PowerSyncCloseOptions = DEFAULT_POWERSYNC_CLOSE_OPTIONS) {
await this.waitForReady();

if (this.closed) {
return;
}

const { disconnect } = options;
if (disconnect) {
await this.disconnect();
}

await this.syncStreamImplementation?.dispose();
this.database.close();
await this.database.close();
this.closed = true;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/common/src/db/DBAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export interface DBLockOptions {
}

export interface DBAdapter extends BaseObserverInterface<DBAdapterListener>, DBGetUtils {
close: () => void;
close: () => void | Promise<void>;
execute: (query: string, params?: any[]) => Promise<QueryResult>;
executeBatch: (query: string, params?: any[][]) => Promise<QueryResult>;
name: string;
Expand Down
4 changes: 2 additions & 2 deletions packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ export class LockedAsyncDatabaseAdapter
* Shared workers might not actually close the connection if other
* tabs are still using it.
*/
close() {
async close() {
this._disposeTableChangeListener?.();
this.baseDB?.close?.();
await this.baseDB?.close?.();
}

async getAll<T>(sql: string, parameters?: any[] | undefined): Promise<T[]> {
Expand Down
7 changes: 4 additions & 3 deletions packages/web/tests/bucket_storage.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {
AbstractPowerSyncDatabase,
BucketStorageAdapter,
Checkpoint,
OpType,
OpTypeEnum,
OplogEntry,
Expand All @@ -8,10 +10,9 @@ import {
SyncDataBatch,
SyncDataBucket
} from '@powersync/common';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { AbstractPowerSyncDatabase, Checkpoint } from '@powersync/common';
import { PowerSyncDatabase, WASQLitePowerSyncDatabaseOpenFactory } from '@powersync/web';
import { Mutex } from 'async-mutex';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { testSchema } from './utils/testDb';

const putAsset1_1 = OplogEntry.fromRow({
Expand Down Expand Up @@ -57,7 +58,7 @@ const removeAsset1_5 = OplogEntry.fromRow({
checksum: 5
});

describe('Bucket Storage', () => {
describe('Bucket Storage', { sequential: true }, () => {
let db: AbstractPowerSyncDatabase;
let bucketStorage: BucketStorageAdapter;

Expand Down
46 changes: 28 additions & 18 deletions packages/web/tests/crud.test.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
import { AbstractPowerSyncDatabase, Column, ColumnType, CrudEntry, Schema, Table, UpdateType } from '@powersync/common';
import { PowerSyncDatabase } from '@powersync/web';
import { Column, ColumnType, CrudEntry, Schema, Table, UpdateType } from '@powersync/common';
import pDefer from 'p-defer';
import { v4 as uuid } from 'uuid';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { describe, expect, it } from 'vitest';
import { generateTestDb } from './utils/testDb';

const testId = '2290de4f-0488-4e50-abed-f8e8eb1d0b42';

describe('CRUD Tests', () => {
let powersync: AbstractPowerSyncDatabase;

beforeEach(async () => {
powersync = generateTestDb();
});

afterEach(async () => {
await powersync.disconnectAndClear();
await powersync.close();
});

describe('CRUD Tests', { sequential: true }, () => {
it('INSERT', async () => {
const powersync = generateTestDb();

expect(await powersync.getAll('SELECT * FROM ps_crud')).empty;

await powersync.execute('INSERT INTO assets(id, description) VALUES(?, ?)', [testId, 'test']);
Expand All @@ -37,6 +27,8 @@ describe('CRUD Tests', () => {
});

it('BATCH INSERT', async () => {
const powersync = generateTestDb();

expect(await powersync.getAll('SELECT * FROM ps_crud')).empty;

const query = `INSERT INTO assets(id, description) VALUES(?, ?)`;
Expand All @@ -63,6 +55,8 @@ describe('CRUD Tests', () => {
});

it('INSERT OR REPLACE', async () => {
const powersync = generateTestDb();

await powersync.execute('INSERT INTO assets(id, description) VALUES(?, ?)', [testId, 'test']);
await powersync.execute('DELETE FROM ps_crud WHERE 1');

Expand All @@ -85,6 +79,8 @@ describe('CRUD Tests', () => {
});

it('UPDATE', async () => {
const powersync = generateTestDb();

await powersync.execute('INSERT INTO assets(id, description, make) VALUES(?, ?, ?)', [testId, 'test', 'test']);
await powersync.execute('DELETE FROM ps_crud WHERE 1');

Expand All @@ -105,6 +101,8 @@ describe('CRUD Tests', () => {
});

it('BATCH UPDATE', async () => {
const powersync = generateTestDb();

await powersync.executeBatch('INSERT INTO assets(id, description, make) VALUES(?, ?, ?)', [
[testId, 'test', 'test'],
['mockId', 'test', 'test']
Expand Down Expand Up @@ -137,6 +135,8 @@ describe('CRUD Tests', () => {
});

it('DELETE', async () => {
const powersync = generateTestDb();

await powersync.execute('INSERT INTO assets(id, description, make) VALUES(?, ?, ?)', [testId, 'test', 'test']);
await powersync.execute('DELETE FROM ps_crud WHERE 1');

Expand All @@ -153,6 +153,8 @@ describe('CRUD Tests', () => {
});

it('UPSERT not supported', async () => {
const powersync = generateTestDb();

// Just shows that we cannot currently do this
await expect(
powersync.execute('INSERT INTO assets(id, description) VALUES(?, ?) ON CONFLICT DO UPDATE SET description = ?', [
Expand All @@ -164,9 +166,7 @@ describe('CRUD Tests', () => {
});

it('INSERT-only tables', async () => {
await powersync.disconnectAndClear();

powersync = new PowerSyncDatabase({
const powersync = generateTestDb({
/**
* Deleting the IndexDB seems to freeze the test.
* Use a new DB for each run to keep CRUD counters
Expand Down Expand Up @@ -212,6 +212,8 @@ describe('CRUD Tests', () => {
});

it('big numbers - integer', async () => {
const powersync = generateTestDb();

const bigNumber = 1 << 62;
await powersync.execute('INSERT INTO assets(id, quantity) VALUES(?, ?)', [testId, bigNumber]);

Expand All @@ -233,6 +235,8 @@ describe('CRUD Tests', () => {
});

it('big numbers - text', async () => {
const powersync = generateTestDb();

const bigNumber = 1 << 62;
await powersync.execute('INSERT INTO assets(id, quantity) VALUES(?, ?)', [testId, `${bigNumber}`]);

Expand Down Expand Up @@ -263,6 +267,8 @@ describe('CRUD Tests', () => {
});

it('Transaction grouping', async () => {
const powersync = generateTestDb();

expect(await powersync.getAll('SELECT * FROM ps_crud')).empty;
await powersync.writeTransaction(async (tx) => {
await tx.execute('INSERT INTO assets(id, description) VALUES(?, ?)', [testId, 'test1']);
Expand Down Expand Up @@ -292,6 +298,8 @@ describe('CRUD Tests', () => {
});

it('Transaction exclusivity', async () => {
const powersync = generateTestDb();

const outside = pDefer();
const inTx = pDefer();

Expand All @@ -313,6 +321,8 @@ describe('CRUD Tests', () => {
});

it('CRUD Batch Limits', async () => {
const powersync = generateTestDb();

const initialBatch = await powersync.getCrudBatch();
expect(initialBatch, 'Initial CRUD batch should be null').null;

Expand Down
86 changes: 48 additions & 38 deletions packages/web/tests/main.test.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,74 @@
import { AbstractPowerSyncDatabase } from '@powersync/common';
import { PowerSyncDatabase, WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web';
import { v4 as uuid } from 'uuid';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { describe, expect, it } from 'vitest';
import { TestDatabase, generateTestDb, testSchema } from './utils/testDb';
// TODO import tests from a common package

describe('Basic', () => {
let dbWithoutWebWorker: AbstractPowerSyncDatabase;
let dbWithWebWorker: AbstractPowerSyncDatabase;
let dbWithOPFS: AbstractPowerSyncDatabase;
describe(
'Basic - With Web Workers',
{ sequential: true },
describeBasicTests(() => generateTestDb())
);

beforeEach(() => {
dbWithoutWebWorker = generateTestDb({ useWebWorker: false });
dbWithWebWorker = generateTestDb();
dbWithOPFS = new PowerSyncDatabase({
database: new WASQLiteOpenFactory({ dbFilename: 'basic.sqlite', vfs: WASQLiteVFS.OPFSCoopSyncVFS }),
describe(
'Basic - Without Web Workers',
{ sequential: true },
describeBasicTests(() =>
generateTestDb({
database: {
dbFilename: 'basic-no-worker.sqlite'
},
flags: {
useWebWorker: false
},
schema: testSchema
});
});
})
)
);

/**
* Declares a test to be executed with multiple DB connections
*/
const itWithDBs = (name: string, test: (db: AbstractPowerSyncDatabase) => Promise<void>) => {
it(`${name} - with web worker`, () => test(dbWithWebWorker));
it(`${name} - without web worker`, () => test(dbWithoutWebWorker));
it(`${name} - with OPFS`, () => test(dbWithOPFS));
};
describe(
'Basic - With OPFS',
{ sequential: true },
describeBasicTests(() =>
generateTestDb({
database: new WASQLiteOpenFactory({
dbFilename: 'basic-opfs.sqlite',
vfs: WASQLiteVFS.OPFSCoopSyncVFS
}),
schema: testSchema
})
)
);

afterEach(async () => {
await dbWithWebWorker.disconnectAndClear();
await dbWithWebWorker.close();
await dbWithoutWebWorker.disconnectAndClear();
await dbWithoutWebWorker.close();
await dbWithOPFS.disconnectAndClear();
await dbWithOPFS.close();
});
function describeBasicTests(generateDB: () => PowerSyncDatabase) {
return () => {
it('should execute a select query using getAll', async () => {
const db = generateDB();

describe('executeQuery', () => {
itWithDBs('should execute a select query using getAll', async (db) => {
const result = await db.getAll('SELECT * FROM customers');
expect(result.length).toEqual(0);
});

itWithDBs('should allow inserts', async (db) => {
it('should allow inserts', async () => {
const db = generateDB();

const testName = 'Steven';
await db.execute('INSERT INTO customers (id, name) VALUES(?, ?)', [uuid(), testName]);
const result = await db.get<TestDatabase['customers']>('SELECT * FROM customers');

expect(result.name).equals(testName);
});
});

describe('executeBatchQuery', () => {
itWithDBs('should execute a select query using getAll', async (db) => {
it('should execute a select query using getAll', async () => {
const db = generateDB();

const result = await db.getAll('SELECT * FROM customers');
expect(result.length).toEqual(0);
});

itWithDBs('should allow batch inserts', async (db) => {
it('should allow batch inserts', async () => {
const db = generateDB();

const testName = 'Mugi';
await db.executeBatch('INSERT INTO customers (id, name) VALUES(?, ?)', [
[uuid(), testName],
Expand All @@ -72,5 +82,5 @@ describe('Basic', () => {
expect(result[1].name).equals('Steven');
expect(result[2].name).equals('Chris');
});
});
});
};
}
Loading