Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/kind-cats-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/op-sqlite': patch
---

Added queueing for read connections
59 changes: 35 additions & 24 deletions packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement

protected initialized: Promise<void>;

protected readConnections: OPSQLiteConnection[] | null;
protected readConnections: Array<{ busy: boolean; connection: OPSQLiteConnection }> | null;

protected writeConnection: OPSQLiteConnection | null;

private readQueue: Array<() => void> = [];

constructor(protected options: OPSQLiteAdapterOptions) {
super();
this.name = this.options.name;
Expand Down Expand Up @@ -88,7 +90,7 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
let dbName = './'.repeat(i + 1) + dbFilename;
const conn = await this.openConnection(dbName);
await conn.execute('PRAGMA query_only = true');
this.readConnections.push(conn);
this.readConnections.push({ busy: false, connection: conn });
}
}

Expand Down Expand Up @@ -145,36 +147,45 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
close() {
this.initialized.then(() => {
this.writeConnection!.close();
this.readConnections!.forEach((c) => c.close());
this.readConnections!.forEach((c) => c.connection.close());
});
}

async readLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
await this.initialized;
// TODO: Use async queues to handle multiple read connections
const sortedConnections = this.readConnections!.map((connection, index) => ({
lockKey: `${LockType.READ}-${index}`,
connection
})).sort((a, b) => {
const aBusy = this.locks.isBusy(a.lockKey);
const bBusy = this.locks.isBusy(b.lockKey);
// Sort by ones which are not busy
return aBusy > bBusy ? 1 : 0;
return new Promise(async (resolve, reject) => {
const execute = async () => {
// Find an available connection that is not busy
const availableConnection = this.readConnections!.find((conn) => !conn.busy);

// If we have an available connection, use it
if (availableConnection) {
availableConnection.busy = true;
try {
resolve(await fn(availableConnection.connection));
} catch (error) {
reject(error);
} finally {
// After query execution, process any queued tasks
this.processQueue();
}
} else {
// If no available connections, add to the queue
this.readQueue.push(execute);
}
};

execute();
});
}

return new Promise(async (resolve, reject) => {
try {
await this.locks.acquire(
sortedConnections[0].lockKey,
async () => {
resolve(await fn(sortedConnections[0].connection));
},
{ timeout: options?.timeoutMs }
);
} catch (ex) {
reject(ex);
private async processQueue(): Promise<void> {
if (this.readQueue.length > 0) {
const next = this.readQueue.shift();
if (next) {
next();
}
});
}
}

async writeLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
Expand Down
Loading