Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/kind-cats-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/op-sqlite': patch
---

Improved queueing for read connections
60 changes: 36 additions & 24 deletions packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement

protected initialized: Promise<void>;

protected readConnections: OPSQLiteConnection[] | null;
protected readConnections: Array<{ busy: boolean; connection: OPSQLiteConnection }> | null;

protected writeConnection: OPSQLiteConnection | null;

private readQueue: Array<() => void> = [];

constructor(protected options: OPSQLiteAdapterOptions) {
super();
this.name = this.options.name;
Expand Down Expand Up @@ -88,7 +90,7 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
let dbName = './'.repeat(i + 1) + dbFilename;
const conn = await this.openConnection(dbName);
await conn.execute('PRAGMA query_only = true');
this.readConnections.push(conn);
this.readConnections.push({ busy: false, connection: conn });
}
}

Expand Down Expand Up @@ -145,36 +147,46 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
close() {
this.initialized.then(() => {
this.writeConnection!.close();
this.readConnections!.forEach((c) => c.close());
this.readConnections!.forEach((c) => c.connection.close());
});
}

async readLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
await this.initialized;
// TODO: Use async queues to handle multiple read connections
const sortedConnections = this.readConnections!.map((connection, index) => ({
lockKey: `${LockType.READ}-${index}`,
connection
})).sort((a, b) => {
const aBusy = this.locks.isBusy(a.lockKey);
const bBusy = this.locks.isBusy(b.lockKey);
// Sort by ones which are not busy
return aBusy > bBusy ? 1 : 0;
return new Promise(async (resolve, reject) => {
const execute = async () => {
// Find an available connection that is not busy
const availableConnection = this.readConnections!.find((conn) => !conn.busy);

// If we have an available connection, use it
if (availableConnection) {
availableConnection.busy = true;
try {
resolve(await fn(availableConnection.connection));
} catch (error) {
reject(error);
} finally {
availableConnection.busy = false;
// After query execution, process any queued tasks
this.processQueue();
}
} else {
// If no available connections, add to the queue
this.readQueue.push(execute);
}
};

execute();
});
}

return new Promise(async (resolve, reject) => {
try {
await this.locks.acquire(
sortedConnections[0].lockKey,
async () => {
resolve(await fn(sortedConnections[0].connection));
},
{ timeout: options?.timeoutMs }
);
} catch (ex) {
reject(ex);
private async processQueue(): Promise<void> {
if (this.readQueue.length > 0) {
const next = this.readQueue.shift();
if (next) {
next();
}
});
}
}

async writeLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
Expand Down
Loading