diff --git a/.changeset/kind-cats-check.md b/.changeset/kind-cats-check.md new file mode 100644 index 000000000..86c1b915b --- /dev/null +++ b/.changeset/kind-cats-check.md @@ -0,0 +1,5 @@ +--- +'@powersync/op-sqlite': patch +--- + +Improved queueing for read connections diff --git a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts index 312bcdca1..000e7be83 100644 --- a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts +++ b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts @@ -35,10 +35,12 @@ export class OPSQLiteDBAdapter extends BaseObserver implement protected initialized: Promise; - protected readConnections: OPSQLiteConnection[] | null; + protected readConnections: Array<{ busy: boolean; connection: OPSQLiteConnection }> | null; protected writeConnection: OPSQLiteConnection | null; + private readQueue: Array<() => void> = []; + constructor(protected options: OPSQLiteAdapterOptions) { super(); this.name = this.options.name; @@ -88,7 +90,7 @@ export class OPSQLiteDBAdapter extends BaseObserver implement let dbName = './'.repeat(i + 1) + dbFilename; const conn = await this.openConnection(dbName); await conn.execute('PRAGMA query_only = true'); - this.readConnections.push(conn); + this.readConnections.push({ busy: false, connection: conn }); } } @@ -145,36 +147,46 @@ export class OPSQLiteDBAdapter extends BaseObserver implement close() { this.initialized.then(() => { this.writeConnection!.close(); - this.readConnections!.forEach((c) => c.close()); + this.readConnections!.forEach((c) => c.connection.close()); }); } async readLock(fn: (tx: OPSQLiteConnection) => Promise, options?: DBLockOptions): Promise { await this.initialized; - // TODO: Use async queues to handle multiple read connections - const sortedConnections = this.readConnections!.map((connection, index) => ({ - lockKey: `${LockType.READ}-${index}`, - connection - })).sort((a, b) => { - const aBusy = this.locks.isBusy(a.lockKey); - const bBusy = this.locks.isBusy(b.lockKey); - // Sort by ones which are not busy - return aBusy > bBusy ? 1 : 0; + return new Promise(async (resolve, reject) => { + const execute = async () => { + // Find an available connection that is not busy + const availableConnection = this.readConnections!.find((conn) => !conn.busy); + + // If we have an available connection, use it + if (availableConnection) { + availableConnection.busy = true; + try { + resolve(await fn(availableConnection.connection)); + } catch (error) { + reject(error); + } finally { + availableConnection.busy = false; + // After query execution, process any queued tasks + this.processQueue(); + } + } else { + // If no available connections, add to the queue + this.readQueue.push(execute); + } + }; + + execute(); }); + } - return new Promise(async (resolve, reject) => { - try { - await this.locks.acquire( - sortedConnections[0].lockKey, - async () => { - resolve(await fn(sortedConnections[0].connection)); - }, - { timeout: options?.timeoutMs } - ); - } catch (ex) { - reject(ex); + private async processQueue(): Promise { + if (this.readQueue.length > 0) { + const next = this.readQueue.shift(); + if (next) { + next(); } - }); + } } async writeLock(fn: (tx: OPSQLiteConnection) => Promise, options?: DBLockOptions): Promise {