Skip to content

Commit 20f7f21

Browse files
committed
Setup queue for read connections
1 parent 76fe38c commit 20f7f21

File tree

1 file changed

+45
-25
lines changed

1 file changed

+45
-25
lines changed

packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
3535

3636
protected initialized: Promise<void>;
3737

38-
protected readConnections: OPSQLiteConnection[] | null;
38+
protected readConnections: Array<{ lockKey: string; connection: OPSQLiteConnection }> | null;
3939

4040
protected writeConnection: OPSQLiteConnection | null;
4141

42+
private readQueue: Array<() => void> = [];
43+
4244
constructor(protected options: OPSQLiteAdapterOptions) {
4345
super();
4446
this.name = this.options.name;
@@ -88,7 +90,8 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
8890
let dbName = './'.repeat(i + 1) + dbFilename;
8991
const conn = await this.openConnection(dbName);
9092
await conn.execute('PRAGMA query_only = true');
91-
this.readConnections.push(conn);
93+
// this.readConnections.push(conn);
94+
this.readConnections.push({ lockKey: `${LockType.READ}-${i}`, connection: conn });
9295
}
9396
}
9497

@@ -102,7 +105,8 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
102105
await DB.execute('SELECT powersync_init()');
103106

104107
return new OPSQLiteConnection({
105-
baseDB: DB
108+
baseDB: DB,
109+
name: dbFilename
106110
});
107111
}
108112

@@ -145,38 +149,54 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
145149
close() {
146150
this.initialized.then(() => {
147151
this.writeConnection!.close();
148-
this.readConnections!.forEach((c) => c.close());
152+
this.readConnections!.forEach((c) => c.connection.close());
149153
});
150154
}
151155

152156
async readLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
153157
await this.initialized;
154-
// TODO: Use async queues to handle multiple read connections
155-
const sortedConnections = this.readConnections!.map((connection, index) => ({
156-
lockKey: `${LockType.READ}-${index}`,
157-
connection
158-
})).sort((a, b) => {
159-
const aBusy = this.locks.isBusy(a.lockKey);
160-
const bBusy = this.locks.isBusy(b.lockKey);
161-
// Sort by ones which are not busy
162-
return aBusy > bBusy ? 1 : 0;
163-
});
164158

165159
return new Promise(async (resolve, reject) => {
166-
try {
167-
await this.locks.acquire(
168-
sortedConnections[0].lockKey,
169-
async () => {
170-
resolve(await fn(sortedConnections[0].connection));
171-
},
172-
{ timeout: options?.timeoutMs }
173-
);
174-
} catch (ex) {
175-
reject(ex);
176-
}
160+
const execute = async () => {
161+
// Find an available connection that is not locked
162+
const availableConnection = this.readConnections!.find((conn) => !this.locks.isBusy(conn.lockKey));
163+
164+
// If we have an available connection, use it
165+
if (availableConnection) {
166+
await this.locks.acquire(
167+
availableConnection.lockKey,
168+
async () => {
169+
try {
170+
console.log('Executing read query on connection', availableConnection.connection.name);
171+
resolve(await fn(availableConnection.connection));
172+
} catch (error) {
173+
reject(error);
174+
} finally {
175+
// After query execution, process any queued tasks
176+
this.processQueue();
177+
}
178+
},
179+
{ timeout: options?.timeoutMs }
180+
);
181+
} else {
182+
// If no available connections, add to the queue
183+
this.readQueue.push(execute);
184+
}
185+
};
186+
187+
execute();
177188
});
178189
}
179190

191+
private async processQueue() {
192+
if (this.readQueue.length > 0) {
193+
const next = this.readQueue.shift();
194+
if (next) {
195+
next();
196+
}
197+
}
198+
}
199+
180200
async writeLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
181201
await this.initialized;
182202

0 commit comments

Comments
 (0)