Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changeset/cuddly-dingos-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
'@powersync/node': patch
---
Fixed an issue where `readLock` and `writeLock` calls were unnecessarily serialized due to a shared mutex. This did not affect individual calls to `get`, `getAll`, or `getOptional`.
5 changes: 5 additions & 0 deletions .changeset/short-countries-swim.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

[Internal] Removed shared mutex implementation of `readLock` and `writeLock`.
17 changes: 3 additions & 14 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { Schema } from '../db/schema/Schema.js';
import { BaseObserver } from '../utils/BaseObserver.js';
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
import { throttleTrailing } from '../utils/async.js';
import { mutexRunExclusive } from '../utils/mutex.js';
import { ConnectionManager } from './ConnectionManager.js';
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
Expand Down Expand Up @@ -149,12 +148,6 @@ export const isPowerSyncDatabaseOptionsWithSettings = (test: any): test is Power
};

export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDBListener> {
/**
* Transactions should be queued in the DBAdapter, but we also want to prevent
* calls to `.execute` while an async transaction is running.
*/
protected static transactionMutex: Mutex = new Mutex();

/**
* Returns true if the connection is closed.
*/
Expand Down Expand Up @@ -678,8 +671,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* @returns The query result as an object with structured key-value pairs
*/
async execute(sql: string, parameters?: any[]) {
await this.waitForReady();
return this.database.execute(sql, parameters);
return this.writeLock((tx) => tx.execute(sql, parameters));
}

/**
Expand Down Expand Up @@ -753,7 +745,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*/
async readLock<T>(callback: (db: DBAdapter) => Promise<T>) {
await this.waitForReady();
return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, () => callback(this.database));
return this.database.readLock(callback);
}

/**
Expand All @@ -762,10 +754,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*/
async writeLock<T>(callback: (db: DBAdapter) => Promise<T>) {
await this.waitForReady();
return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, async () => {
const res = await callback(this.database);
return res;
});
return this.database.writeLock(callback);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Mutex } from 'async-mutex';
import Logger, { ILogger } from 'js-logger';
import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapter.js';
import { DBAdapter, extractTableUpdates, Transaction } from '../../../db/DBAdapter.js';
import { BaseObserver } from '../../../utils/BaseObserver.js';
import { MAX_OP_ID } from '../../constants.js';
import {
Expand All @@ -26,7 +25,6 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp

constructor(
private db: DBAdapter,
private mutex: Mutex,
private logger: ILogger = Logger.get('SqliteBucketStorage')
) {
super();
Expand Down
2 changes: 1 addition & 1 deletion packages/node/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
}

protected generateBucketStorageAdapter(): BucketStorageAdapter {
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex);
return new SqliteBucketStorage(this.database);
}

connect(
Expand Down
14 changes: 2 additions & 12 deletions packages/react-native/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import {
} from '@powersync/common';
import { ReactNativeRemote } from '../sync/stream/ReactNativeRemote';
import { ReactNativeStreamingSyncImplementation } from '../sync/stream/ReactNativeStreamingSyncImplementation';
import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick-sqlite/ReactNativeQuickSQLiteOpenFactory';
import { ReactNativeBucketStorageAdapter } from './../sync/bucket/ReactNativeBucketStorageAdapter';
import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick-sqlite/ReactNativeQuickSQLiteOpenFactory';

/**
* A PowerSync database which provides SQLite functionality
Expand Down Expand Up @@ -39,7 +39,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
}

protected generateBucketStorageAdapter(): BucketStorageAdapter {
return new ReactNativeBucketStorageAdapter(this.database, AbstractPowerSyncDatabase.transactionMutex);
return new ReactNativeBucketStorageAdapter(this.database);
}

protected generateSyncStreamImplementation(
Expand All @@ -60,14 +60,4 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
identifier: this.database.name
});
}

async readLock<T>(callback: (db: DBAdapter) => Promise<T>): Promise<T> {
await this.waitForReady();
return this.database.readLock(callback);
}

async writeLock<T>(callback: (db: DBAdapter) => Promise<T>): Promise<T> {
await this.waitForReady();
return this.database.writeLock(callback);
}
}
2 changes: 1 addition & 1 deletion packages/web/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
}

protected generateBucketStorageAdapter(): BucketStorageAdapter {
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex);
return new SqliteBucketStorage(this.database);
}

protected async runExclusive<T>(cb: () => Promise<T>) {
Expand Down
2 changes: 1 addition & 1 deletion packages/web/src/worker/sync/SharedSyncImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ export class SharedSyncImplementation
const syncParams = this.syncParams!;
// Create a new StreamingSyncImplementation for each connect call. This is usually done is all SDKs.
return new WebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger),
adapter: new SqliteBucketStorage(this.dbAdapter!, this.logger),
remote: new WebRemote(
{
invalidateCredentials: async () => {
Expand Down
7 changes: 3 additions & 4 deletions packages/web/tests/bucket_storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
SyncDataBucket
} from '@powersync/common';
import { PowerSyncDatabase, WASQLitePowerSyncDatabaseOpenFactory } from '@powersync/web';
import { Mutex } from 'async-mutex';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { testSchema } from './utils/testDb';

Expand Down Expand Up @@ -73,7 +72,7 @@ describe('Bucket Storage', { sequential: true }, () => {
schema: testSchema
});
await db.waitForReady();
bucketStorage = new SqliteBucketStorage(db.database, new Mutex());
bucketStorage = new SqliteBucketStorage(db.database);
});

afterEach(async () => {
Expand Down Expand Up @@ -418,7 +417,7 @@ describe('Bucket Storage', { sequential: true }, () => {

let powersync = factory.getInstance();
await powersync.waitForReady();
bucketStorage = new SqliteBucketStorage(powersync.database, new Mutex());
bucketStorage = new SqliteBucketStorage(powersync.database);

await bucketStorage.saveSyncData(
new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)])
Expand Down Expand Up @@ -462,7 +461,7 @@ describe('Bucket Storage', { sequential: true }, () => {

let powersync = factory.getInstance();
await powersync.waitForReady();
bucketStorage = new SqliteBucketStorage(powersync.database, new Mutex());
bucketStorage = new SqliteBucketStorage(powersync.database);

await bucketStorage.saveSyncData(
new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)])
Expand Down
9 changes: 4 additions & 5 deletions packages/web/tests/multiple_instances.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
SharedWebStreamingSyncImplementationOptions,
WebRemote
} from '@powersync/web';
import { Mutex } from 'async-mutex';

import { beforeAll, describe, expect, it, vi } from 'vitest';
import { WebDBAdapter } from '../src/db/adapters/WebDBAdapter';
Expand Down Expand Up @@ -126,7 +125,7 @@ describe('Multiple Instances', { sequential: true }, () => {
// They need to use the same identifier to use the same shared worker.
const identifier = 'streaming-sync-shared';
const syncOptions1: SharedWebStreamingSyncImplementationOptions = {
adapter: new SqliteBucketStorage(db.database, new Mutex()),
adapter: new SqliteBucketStorage(db.database),
remote: new WebRemote(connector1),
uploadCrud: async () => {
await connector1.uploadData(db);
Expand All @@ -140,7 +139,7 @@ describe('Multiple Instances', { sequential: true }, () => {
// Generate the second streaming sync implementation
const connector2 = new TestConnector();
const syncOptions2: SharedWebStreamingSyncImplementationOptions = {
adapter: new SqliteBucketStorage(db.database, new Mutex()),
adapter: new SqliteBucketStorage(db.database),
remote: new WebRemote(connector1),
uploadCrud: async () => {
await connector2.uploadData(db);
Expand Down Expand Up @@ -190,7 +189,7 @@ describe('Multiple Instances', { sequential: true }, () => {

// Create the first streaming client
const stream1 = new SharedWebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(db.database, new Mutex()),
adapter: new SqliteBucketStorage(db.database),
remote: new WebRemote(connector1),
uploadCrud: async () => {
triggerUpload1();
Expand All @@ -216,7 +215,7 @@ describe('Multiple Instances', { sequential: true }, () => {
});

const stream2 = new SharedWebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(db.database, new Mutex()),
adapter: new SqliteBucketStorage(db.database),
remote: new WebRemote(connector1),
uploadCrud: async () => {
triggerUpload2();
Expand Down
2 changes: 1 addition & 1 deletion packages/web/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const config: UserConfigExport = {
// Don't optimise these packages as they contain web workers and WASM files.
// https://github.com/vitejs/vite/issues/11672#issuecomment-1415820673
exclude: ['@journeyapps/wa-sqlite', '@powersync/web'],
include: ['bson', 'comlink']
include: ['bson', 'comlink', 'async-mutex']
},
plugins: [wasm(), topLevelAwait()],
test: {
Expand Down