Skip to content

Commit ea41b34

Browse files
committed
wip
1 parent c76b138 commit ea41b34

File tree

2 files changed

+78
-1
lines changed

2 files changed

+78
-1
lines changed

packages/sqlite_async/lib/src/native/database/connection_pool.dart

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,55 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
241241
connections.addAll(_allReadConnections);
242242
return connections;
243243
}
244+
245+
Future<T> withAllConnections<T>(
246+
Future<T> Function(
247+
SqliteWriteContext writer, List<SqliteReadContext> readers)
248+
block) async {
249+
final blockCompleter = Completer<T>();
250+
final (write, reads) = await _lockAllConns<T>(blockCompleter);
251+
252+
try {
253+
final res = await block(write, reads);
254+
blockCompleter.complete(res);
255+
return res;
256+
} catch (e, st) {
257+
blockCompleter.completeError(e, st);
258+
rethrow;
259+
}
260+
}
261+
262+
/// Locks all connections, returning the acquired contexts.
263+
/// We pass a completer that would be called after the locks are taken.
264+
Future<(SqliteWriteContext, List<SqliteReadContext>)> _lockAllConns<T>(
265+
Completer<T> lockCompleter) async {
266+
final List<Completer<SqliteReadContext>> readLockedCompleters = [];
267+
final Completer<SqliteWriteContext> writeLockedCompleter = Completer();
268+
269+
// Take the write lock
270+
writeLock((ctx) {
271+
writeLockedCompleter.complete(ctx);
272+
return lockCompleter.future;
273+
});
274+
275+
// Take all the read locks
276+
for (final readConn in _allReadConnections) {
277+
final completer = Completer<SqliteReadContext>();
278+
readLockedCompleters.add(completer);
279+
280+
readConn.readLock((ctx) {
281+
completer.complete(ctx);
282+
return lockCompleter.future;
283+
});
284+
}
285+
286+
// Wait after all locks are taken
287+
final contexts = await Future.wait([
288+
writeLockedCompleter.future,
289+
...readLockedCompleters.map((e) => e.future)
290+
]);
291+
return (contexts.first as SqliteWriteContext, contexts.sublist(1));
292+
}
244293
}
245294

246295
typedef ReadCallback<T> = Future<T> Function(SqliteReadContext tx);

packages/sqlite_async/test/native/basic_test.dart

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,35 @@ void main() {
6969
}
7070
});
7171

72-
test('Concurrency 2', () async {
72+
test('with all connections', () async {
73+
final db = SqliteDatabase.withFactory(
74+
await testUtils.testFactory(path: path),
75+
maxReaders: 3);
76+
await db.initialize();
77+
await createTables(db);
78+
79+
print("${DateTime.now()} start");
80+
81+
final withAllConnsFut = () async {
82+
await Future.delayed(const Duration(milliseconds: 20));
83+
await db.withAllConnections((writer, readers) async {
84+
print("${DateTime.now()} in withAllConnections");
85+
await Future.delayed(const Duration(seconds: 5));
86+
});
87+
}();
88+
89+
var readFutures = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11].map((i) => db.get(
90+
'SELECT ? as i, test_sleep(?) as sleep, test_connection_name() as connection',
91+
[i, 5 + Random().nextInt(10)]));
92+
93+
final futures = [...readFutures, withAllConnsFut];
94+
95+
await for (var result in Stream.fromFutures(futures)) {
96+
print("${DateTime.now()} $result");
97+
}
98+
});
99+
100+
test('Concurren 2', () async {
73101
final db1 = await testUtils.setupDatabase(path: path, maxReaders: 3);
74102
final db2 = await testUtils.setupDatabase(path: path, maxReaders: 3);
75103

0 commit comments

Comments
 (0)