diff --git a/.changeset/cuddly-dingos-check.md b/.changeset/cuddly-dingos-check.md new file mode 100644 index 000000000..68d7c23fa --- /dev/null +++ b/.changeset/cuddly-dingos-check.md @@ -0,0 +1,4 @@ +--- +'@powersync/node': patch +--- +Fixed an issue where `readLock` and `writeLock` calls were unnecessarily serialized due to a shared mutex. This did not affect individual calls to `get`, `getAll`, or `getOptional`. diff --git a/.changeset/short-countries-swim.md b/.changeset/short-countries-swim.md new file mode 100644 index 000000000..267f082ee --- /dev/null +++ b/.changeset/short-countries-swim.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +[Internal] Removed shared mutex implementation of `readLock` and `writeLock`. \ No newline at end of file diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 590747969..6eca6203e 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -16,7 +16,6 @@ import { Schema } from '../db/schema/Schema.js'; import { BaseObserver } from '../utils/BaseObserver.js'; import { ControlledExecutor } from '../utils/ControlledExecutor.js'; import { throttleTrailing } from '../utils/async.js'; -import { mutexRunExclusive } from '../utils/mutex.js'; import { ConnectionManager } from './ConnectionManager.js'; import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js'; import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js'; @@ -149,12 +148,6 @@ export const isPowerSyncDatabaseOptionsWithSettings = (test: any): test is Power }; export abstract class AbstractPowerSyncDatabase extends BaseObserver { - /** - * Transactions should be queued in the DBAdapter, but we also want to prevent - * calls to `.execute` while an async transaction is running. - */ - protected static transactionMutex: Mutex = new Mutex(); - /** * Returns true if the connection is closed. */ @@ -678,8 +671,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver tx.execute(sql, parameters)); } /** @@ -753,7 +745,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver(callback: (db: DBAdapter) => Promise) { await this.waitForReady(); - return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, () => callback(this.database)); + return this.database.readLock(callback); } /** @@ -762,10 +754,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver(callback: (db: DBAdapter) => Promise) { await this.waitForReady(); - return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, async () => { - const res = await callback(this.database); - return res; - }); + return this.database.writeLock(callback); } /** diff --git a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts index 258b32acc..8cf5a86d7 100644 --- a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts +++ b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts @@ -1,6 +1,5 @@ -import { Mutex } from 'async-mutex'; import Logger, { ILogger } from 'js-logger'; -import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapter.js'; +import { DBAdapter, extractTableUpdates, Transaction } from '../../../db/DBAdapter.js'; import { BaseObserver } from '../../../utils/BaseObserver.js'; import { MAX_OP_ID } from '../../constants.js'; import { @@ -26,7 +25,6 @@ export class SqliteBucketStorage extends BaseObserver imp constructor( private db: DBAdapter, - private mutex: Mutex, private logger: ILogger = Logger.get('SqliteBucketStorage') ) { super(); diff --git a/packages/node/src/db/PowerSyncDatabase.ts b/packages/node/src/db/PowerSyncDatabase.ts index e4ce8c481..a7c6c9e00 100644 --- a/packages/node/src/db/PowerSyncDatabase.ts +++ b/packages/node/src/db/PowerSyncDatabase.ts @@ -64,7 +64,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { } protected generateBucketStorageAdapter(): BucketStorageAdapter { - return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex); + return new SqliteBucketStorage(this.database); } connect( diff --git a/packages/react-native/src/db/PowerSyncDatabase.ts b/packages/react-native/src/db/PowerSyncDatabase.ts index efd40e8bb..128c99013 100644 --- a/packages/react-native/src/db/PowerSyncDatabase.ts +++ b/packages/react-native/src/db/PowerSyncDatabase.ts @@ -9,8 +9,8 @@ import { } from '@powersync/common'; import { ReactNativeRemote } from '../sync/stream/ReactNativeRemote'; import { ReactNativeStreamingSyncImplementation } from '../sync/stream/ReactNativeStreamingSyncImplementation'; -import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick-sqlite/ReactNativeQuickSQLiteOpenFactory'; import { ReactNativeBucketStorageAdapter } from './../sync/bucket/ReactNativeBucketStorageAdapter'; +import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick-sqlite/ReactNativeQuickSQLiteOpenFactory'; /** * A PowerSync database which provides SQLite functionality @@ -39,7 +39,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { } protected generateBucketStorageAdapter(): BucketStorageAdapter { - return new ReactNativeBucketStorageAdapter(this.database, AbstractPowerSyncDatabase.transactionMutex); + return new ReactNativeBucketStorageAdapter(this.database); } protected generateSyncStreamImplementation( @@ -60,14 +60,4 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { identifier: this.database.name }); } - - async readLock(callback: (db: DBAdapter) => Promise): Promise { - await this.waitForReady(); - return this.database.readLock(callback); - } - - async writeLock(callback: (db: DBAdapter) => Promise): Promise { - await this.waitForReady(); - return this.database.writeLock(callback); - } } diff --git a/packages/web/src/db/PowerSyncDatabase.ts b/packages/web/src/db/PowerSyncDatabase.ts index 8351b4f1b..891a033d0 100644 --- a/packages/web/src/db/PowerSyncDatabase.ts +++ b/packages/web/src/db/PowerSyncDatabase.ts @@ -172,7 +172,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { } protected generateBucketStorageAdapter(): BucketStorageAdapter { - return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex); + return new SqliteBucketStorage(this.database); } protected async runExclusive(cb: () => Promise) { diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index 0af122ead..e02c9506e 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -371,7 +371,7 @@ export class SharedSyncImplementation const syncParams = this.syncParams!; // Create a new StreamingSyncImplementation for each connect call. This is usually done is all SDKs. return new WebStreamingSyncImplementation({ - adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger), + adapter: new SqliteBucketStorage(this.dbAdapter!, this.logger), remote: new WebRemote( { invalidateCredentials: async () => { diff --git a/packages/web/tests/bucket_storage.test.ts b/packages/web/tests/bucket_storage.test.ts index 66cce87e9..2c173ded9 100644 --- a/packages/web/tests/bucket_storage.test.ts +++ b/packages/web/tests/bucket_storage.test.ts @@ -11,7 +11,6 @@ import { SyncDataBucket } from '@powersync/common'; import { PowerSyncDatabase, WASQLitePowerSyncDatabaseOpenFactory } from '@powersync/web'; -import { Mutex } from 'async-mutex'; import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { testSchema } from './utils/testDb'; @@ -73,7 +72,7 @@ describe('Bucket Storage', { sequential: true }, () => { schema: testSchema }); await db.waitForReady(); - bucketStorage = new SqliteBucketStorage(db.database, new Mutex()); + bucketStorage = new SqliteBucketStorage(db.database); }); afterEach(async () => { @@ -418,7 +417,7 @@ describe('Bucket Storage', { sequential: true }, () => { let powersync = factory.getInstance(); await powersync.waitForReady(); - bucketStorage = new SqliteBucketStorage(powersync.database, new Mutex()); + bucketStorage = new SqliteBucketStorage(powersync.database); await bucketStorage.saveSyncData( new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) @@ -462,7 +461,7 @@ describe('Bucket Storage', { sequential: true }, () => { let powersync = factory.getInstance(); await powersync.waitForReady(); - bucketStorage = new SqliteBucketStorage(powersync.database, new Mutex()); + bucketStorage = new SqliteBucketStorage(powersync.database); await bucketStorage.saveSyncData( new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)]) diff --git a/packages/web/tests/multiple_instances.test.ts b/packages/web/tests/multiple_instances.test.ts index 4e11704f8..f87851e15 100644 --- a/packages/web/tests/multiple_instances.test.ts +++ b/packages/web/tests/multiple_instances.test.ts @@ -10,7 +10,6 @@ import { SharedWebStreamingSyncImplementationOptions, WebRemote } from '@powersync/web'; -import { Mutex } from 'async-mutex'; import { beforeAll, describe, expect, it, vi } from 'vitest'; import { WebDBAdapter } from '../src/db/adapters/WebDBAdapter'; @@ -126,7 +125,7 @@ describe('Multiple Instances', { sequential: true }, () => { // They need to use the same identifier to use the same shared worker. const identifier = 'streaming-sync-shared'; const syncOptions1: SharedWebStreamingSyncImplementationOptions = { - adapter: new SqliteBucketStorage(db.database, new Mutex()), + adapter: new SqliteBucketStorage(db.database), remote: new WebRemote(connector1), uploadCrud: async () => { await connector1.uploadData(db); @@ -140,7 +139,7 @@ describe('Multiple Instances', { sequential: true }, () => { // Generate the second streaming sync implementation const connector2 = new TestConnector(); const syncOptions2: SharedWebStreamingSyncImplementationOptions = { - adapter: new SqliteBucketStorage(db.database, new Mutex()), + adapter: new SqliteBucketStorage(db.database), remote: new WebRemote(connector1), uploadCrud: async () => { await connector2.uploadData(db); @@ -190,7 +189,7 @@ describe('Multiple Instances', { sequential: true }, () => { // Create the first streaming client const stream1 = new SharedWebStreamingSyncImplementation({ - adapter: new SqliteBucketStorage(db.database, new Mutex()), + adapter: new SqliteBucketStorage(db.database), remote: new WebRemote(connector1), uploadCrud: async () => { triggerUpload1(); @@ -216,7 +215,7 @@ describe('Multiple Instances', { sequential: true }, () => { }); const stream2 = new SharedWebStreamingSyncImplementation({ - adapter: new SqliteBucketStorage(db.database, new Mutex()), + adapter: new SqliteBucketStorage(db.database), remote: new WebRemote(connector1), uploadCrud: async () => { triggerUpload2(); diff --git a/packages/web/vitest.config.ts b/packages/web/vitest.config.ts index 2136d770c..0e586da8d 100644 --- a/packages/web/vitest.config.ts +++ b/packages/web/vitest.config.ts @@ -31,7 +31,7 @@ const config: UserConfigExport = { // Don't optimise these packages as they contain web workers and WASM files. // https://github.com/vitejs/vite/issues/11672#issuecomment-1415820673 exclude: ['@journeyapps/wa-sqlite', '@powersync/web'], - include: ['bson', 'comlink'] + include: ['bson', 'comlink', 'async-mutex'] }, plugins: [wasm(), topLevelAwait()], test: {