Skip to content

Commit 1312157

Browse files
committed
Merge remote-tracking branch 'origin/main' into improve-logging
2 parents 172ac2e + 9fb898d commit 1312157

File tree

11 files changed

+26
-42
lines changed

11 files changed

+26
-42
lines changed

.changeset/cuddly-dingos-check.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
'@powersync/node': patch
3+
---
4+
Fixed an issue where `readLock` and `writeLock` calls were unnecessarily serialized due to a shared mutex. This did not affect individual calls to `get`, `getAll`, or `getOptional`.

.changeset/short-countries-swim.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
[Internal] Removed shared mutex implementation of `readLock` and `writeLock`.

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import { Schema } from '../db/schema/Schema.js';
1616
import { BaseObserver } from '../utils/BaseObserver.js';
1717
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
1818
import { throttleTrailing } from '../utils/async.js';
19-
import { mutexRunExclusive } from '../utils/mutex.js';
2019
import { ConnectionManager } from './ConnectionManager.js';
2120
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
2221
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
@@ -148,12 +147,6 @@ export const isPowerSyncDatabaseOptionsWithSettings = (test: any): test is Power
148147
};
149148

150149
export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDBListener> {
151-
/**
152-
* Transactions should be queued in the DBAdapter, but we also want to prevent
153-
* calls to `.execute` while an async transaction is running.
154-
*/
155-
protected static transactionMutex: Mutex = new Mutex();
156-
157150
/**
158151
* Returns true if the connection is closed.
159152
*/
@@ -677,8 +670,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
677670
* @returns The query result as an object with structured key-value pairs
678671
*/
679672
async execute(sql: string, parameters?: any[]) {
680-
await this.waitForReady();
681-
return this.database.execute(sql, parameters);
673+
return this.writeLock((tx) => tx.execute(sql, parameters));
682674
}
683675

684676
/**
@@ -752,7 +744,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
752744
*/
753745
async readLock<T>(callback: (db: DBAdapter) => Promise<T>) {
754746
await this.waitForReady();
755-
return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, () => callback(this.database));
747+
return this.database.readLock(callback);
756748
}
757749

758750
/**
@@ -761,10 +753,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
761753
*/
762754
async writeLock<T>(callback: (db: DBAdapter) => Promise<T>) {
763755
await this.waitForReady();
764-
return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, async () => {
765-
const res = await callback(this.database);
766-
return res;
767-
});
756+
return this.database.writeLock(callback);
768757
}
769758

770759
/**

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import { Mutex } from 'async-mutex';
21
import Logger, { ILogger } from 'js-logger';
3-
import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapter.js';
2+
import { DBAdapter, extractTableUpdates, Transaction } from '../../../db/DBAdapter.js';
43
import { BaseObserver } from '../../../utils/BaseObserver.js';
54
import { MAX_OP_ID } from '../../constants.js';
65
import {
@@ -26,7 +25,6 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
2625

2726
constructor(
2827
private db: DBAdapter,
29-
private mutex: Mutex,
3028
private logger: ILogger = Logger.get('SqliteBucketStorage')
3129
) {
3230
super();

packages/node/src/db/PowerSyncDatabase.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
6464
}
6565

6666
protected generateBucketStorageAdapter(): BucketStorageAdapter {
67-
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex, this.logger);
67+
return new SqliteBucketStorage(this.database, this.logger);
6868
}
6969

7070
connect(

packages/react-native/src/db/PowerSyncDatabase.ts

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import {
99
} from '@powersync/common';
1010
import { ReactNativeRemote } from '../sync/stream/ReactNativeRemote';
1111
import { ReactNativeStreamingSyncImplementation } from '../sync/stream/ReactNativeStreamingSyncImplementation';
12-
import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick-sqlite/ReactNativeQuickSQLiteOpenFactory';
1312
import { ReactNativeBucketStorageAdapter } from './../sync/bucket/ReactNativeBucketStorageAdapter';
13+
import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick-sqlite/ReactNativeQuickSQLiteOpenFactory';
1414

1515
/**
1616
* A PowerSync database which provides SQLite functionality
@@ -39,7 +39,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
3939
}
4040

4141
protected generateBucketStorageAdapter(): BucketStorageAdapter {
42-
return new ReactNativeBucketStorageAdapter(this.database, AbstractPowerSyncDatabase.transactionMutex, this.logger);
42+
return new ReactNativeBucketStorageAdapter(this.database, this.logger);
4343
}
4444

4545
protected generateSyncStreamImplementation(
@@ -61,14 +61,4 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
6161
logger: this.logger
6262
});
6363
}
64-
65-
async readLock<T>(callback: (db: DBAdapter) => Promise<T>): Promise<T> {
66-
await this.waitForReady();
67-
return this.database.readLock(callback);
68-
}
69-
70-
async writeLock<T>(callback: (db: DBAdapter) => Promise<T>): Promise<T> {
71-
await this.waitForReady();
72-
return this.database.writeLock(callback);
73-
}
7464
}

packages/web/src/db/PowerSyncDatabase.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
172172
}
173173

174174
protected generateBucketStorageAdapter(): BucketStorageAdapter {
175-
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex);
175+
return new SqliteBucketStorage(this.database);
176176
}
177177

178178
protected async runExclusive<T>(cb: () => Promise<T>) {

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ export class SharedSyncImplementation
371371
const syncParams = this.syncParams!;
372372
// Create a new StreamingSyncImplementation for each connect call. This is usually done is all SDKs.
373373
return new WebStreamingSyncImplementation({
374-
adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger),
374+
adapter: new SqliteBucketStorage(this.dbAdapter!, this.logger),
375375
remote: new WebRemote(
376376
{
377377
invalidateCredentials: async () => {

packages/web/tests/bucket_storage.test.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import {
1111
SyncDataBucket
1212
} from '@powersync/common';
1313
import { PowerSyncDatabase, WASQLitePowerSyncDatabaseOpenFactory } from '@powersync/web';
14-
import { Mutex } from 'async-mutex';
1514
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
1615
import { testSchema } from './utils/testDb';
1716

@@ -73,7 +72,7 @@ describe('Bucket Storage', { sequential: true }, () => {
7372
schema: testSchema
7473
});
7574
await db.waitForReady();
76-
bucketStorage = new SqliteBucketStorage(db.database, new Mutex());
75+
bucketStorage = new SqliteBucketStorage(db.database);
7776
});
7877

7978
afterEach(async () => {
@@ -418,7 +417,7 @@ describe('Bucket Storage', { sequential: true }, () => {
418417

419418
let powersync = factory.getInstance();
420419
await powersync.waitForReady();
421-
bucketStorage = new SqliteBucketStorage(powersync.database, new Mutex());
420+
bucketStorage = new SqliteBucketStorage(powersync.database);
422421

423422
await bucketStorage.saveSyncData(
424423
new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)])
@@ -462,7 +461,7 @@ describe('Bucket Storage', { sequential: true }, () => {
462461

463462
let powersync = factory.getInstance();
464463
await powersync.waitForReady();
465-
bucketStorage = new SqliteBucketStorage(powersync.database, new Mutex());
464+
bucketStorage = new SqliteBucketStorage(powersync.database);
466465

467466
await bucketStorage.saveSyncData(
468467
new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)])

packages/web/tests/multiple_instances.test.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import {
1010
SharedWebStreamingSyncImplementationOptions,
1111
WebRemote
1212
} from '@powersync/web';
13-
import { Mutex } from 'async-mutex';
1413

1514
import { beforeAll, describe, expect, it, vi } from 'vitest';
1615
import { WebDBAdapter } from '../src/db/adapters/WebDBAdapter';
@@ -126,7 +125,7 @@ describe('Multiple Instances', { sequential: true }, () => {
126125
// They need to use the same identifier to use the same shared worker.
127126
const identifier = 'streaming-sync-shared';
128127
const syncOptions1: SharedWebStreamingSyncImplementationOptions = {
129-
adapter: new SqliteBucketStorage(db.database, new Mutex()),
128+
adapter: new SqliteBucketStorage(db.database),
130129
remote: new WebRemote(connector1),
131130
uploadCrud: async () => {
132131
await connector1.uploadData(db);
@@ -140,7 +139,7 @@ describe('Multiple Instances', { sequential: true }, () => {
140139
// Generate the second streaming sync implementation
141140
const connector2 = new TestConnector();
142141
const syncOptions2: SharedWebStreamingSyncImplementationOptions = {
143-
adapter: new SqliteBucketStorage(db.database, new Mutex()),
142+
adapter: new SqliteBucketStorage(db.database),
144143
remote: new WebRemote(connector1),
145144
uploadCrud: async () => {
146145
await connector2.uploadData(db);
@@ -190,7 +189,7 @@ describe('Multiple Instances', { sequential: true }, () => {
190189

191190
// Create the first streaming client
192191
const stream1 = new SharedWebStreamingSyncImplementation({
193-
adapter: new SqliteBucketStorage(db.database, new Mutex()),
192+
adapter: new SqliteBucketStorage(db.database),
194193
remote: new WebRemote(connector1),
195194
uploadCrud: async () => {
196195
triggerUpload1();
@@ -216,7 +215,7 @@ describe('Multiple Instances', { sequential: true }, () => {
216215
});
217216

218217
const stream2 = new SharedWebStreamingSyncImplementation({
219-
adapter: new SqliteBucketStorage(db.database, new Mutex()),
218+
adapter: new SqliteBucketStorage(db.database),
220219
remote: new WebRemote(connector1),
221220
uploadCrud: async () => {
222221
triggerUpload2();

0 commit comments

Comments
 (0)