11import 'dart:async' ;
2+ import 'dart:collection' ;
23
34import 'mutex.dart' ;
45import 'port_channel.dart' ;
@@ -12,7 +13,9 @@ import 'update_notification.dart';
1213class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
1314 SqliteConnection ? _writeConnection;
1415
15- final List <SqliteConnectionImpl > _readConnections = [];
16+ final Set <SqliteConnectionImpl > _allReadConnections = {};
17+ final Queue <SqliteConnectionImpl > _availableReadConnections = Queue ();
18+ final Queue <_PendingItem > _queue = Queue ();
1619
1720 final SqliteOpenFactory _factory;
1821 final SerializedPortClient _upstreamPort;
@@ -58,62 +61,60 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
5861 return await _writeConnection! .getAutoCommit ();
5962 }
6063
61- @override
62- Future <T > readLock <T >(Future <T > Function (SqliteReadContext tx) callback,
63- {Duration ? lockTimeout, String ? debugContext}) async {
64- await _expandPool ();
65-
66- return _runZoned (() async {
67- bool haveLock = false ;
68- var completer = Completer <T >();
69-
70- var futures = _readConnections.sublist (0 ).map ((connection) async {
71- if (connection.closed) {
72- _readConnections.remove (connection);
73- }
74- try {
75- return await connection.readLock ((ctx) async {
76- if (haveLock) {
77- // Already have a different lock - release this one.
78- return false ;
79- }
80- haveLock = true ;
81-
82- var future = callback (ctx);
83- completer.complete (future);
84-
85- // We have to wait for the future to complete before we can release the
86- // lock.
87- try {
88- await future;
89- } catch (_) {
90- // Ignore
91- }
92-
93- return true ;
94- }, lockTimeout: lockTimeout, debugContext: debugContext);
95- } on TimeoutException {
96- return false ;
97- } on ClosedException {
98- return false ;
99- }
100- });
101-
102- final stream = Stream <bool >.fromFutures (futures);
103- var gotAny = await stream.any ((element) => element);
104-
105- if (! gotAny) {
106- // All TimeoutExceptions
107- throw TimeoutException ('Failed to get a read connection' , lockTimeout);
64+ void _nextRead () {
65+ if (_queue.isEmpty) {
66+ // Wait for queue item
67+ return ;
68+ } else if (closed) {
69+ while (_queue.isNotEmpty) {
70+ final nextItem = _queue.removeFirst ();
71+ nextItem.completer.completeError (const ClosedException ());
10872 }
73+ return ;
74+ }
75+
76+ while (_availableReadConnections.isNotEmpty &&
77+ _availableReadConnections.last.closed) {
78+ // Remove connections that may have errored
79+ final connection = _availableReadConnections.removeLast ();
80+ _allReadConnections.remove (connection);
81+ }
82+
83+ if (_availableReadConnections.isEmpty &&
84+ _allReadConnections.length == maxReaders) {
85+ // Wait for available connection
86+ return ;
87+ }
10988
89+ final nextItem = _queue.removeFirst ();
90+ nextItem.completer.complete (Future .sync (() async {
91+ final nextConnection = _availableReadConnections.isEmpty
92+ ? await _expandPool ()
93+ : _availableReadConnections.removeLast ();
11094 try {
111- return await completer.future;
112- } catch (e) {
113- // throw e;
114- rethrow ;
95+ final result = await nextConnection.readLock (nextItem.callback);
96+ return result;
97+ } finally {
98+ _availableReadConnections.add (nextConnection);
99+ _nextRead ();
115100 }
116- }, debugContext: debugContext ?? 'get*()' );
101+ }));
102+ }
103+
104+ @override
105+ Future <T > readLock <T >(ReadCallback <T > callback,
106+ {Duration ? lockTimeout, String ? debugContext}) async {
107+ if (closed) {
108+ throw ClosedException ();
109+ }
110+ final zone = _getZone (debugContext: debugContext ?? 'get*()' );
111+ final item = _PendingItem ((ctx) {
112+ return zone.runUnary (callback, ctx);
113+ });
114+ _queue.add (item);
115+ _nextRead ();
116+
117+ return await item.completer.future;
117118 }
118119
119120 @override
@@ -146,41 +147,38 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
146147 /// connection (with a different lock).
147148 /// 2. Give a more specific error message when it happens.
148149 T _runZoned <T >(T Function () callback, {required String debugContext}) {
150+ return _getZone (debugContext: debugContext).run (callback);
151+ }
152+
153+ Zone _getZone ({required String debugContext}) {
149154 if (Zone .current[this ] != null ) {
150155 throw LockError (
151156 'Recursive lock is not allowed. Use `tx.$debugContext ` instead of `db.$debugContext `.' );
152157 }
153- var zone = Zone .current.fork (zoneValues: {this : true });
154- return zone.run (callback);
158+ return Zone .current.fork (zoneValues: {this : true });
155159 }
156160
157- Future <void > _expandPool () async {
158- if (closed || _readConnections.length >= maxReaders) {
159- return ;
160- }
161- bool hasCapacity = _readConnections
162- .any ((connection) => ! connection.locked && ! connection.closed);
163- if (! hasCapacity) {
164- var name = debugName == null
165- ? null
166- : '$debugName -${_readConnections .length + 1 }' ;
167- var connection = SqliteConnectionImpl (
168- upstreamPort: _upstreamPort,
169- primary: false ,
170- updates: updates,
171- debugName: name,
172- mutex: mutex,
173- readOnly: true ,
174- openFactory: _factory);
175- _readConnections.add (connection);
176-
177- // Edge case:
178- // If we don't await here, there is a chance that a different connection
179- // is used for the transaction, and that it finishes and deletes the database
180- // while this one is still opening. This is specifically triggered in tests.
181- // To avoid that, we wait for the connection to be ready.
182- await connection.ready;
183- }
161+ Future <SqliteConnectionImpl > _expandPool () async {
162+ var name = debugName == null
163+ ? null
164+ : '$debugName -${_allReadConnections .length + 1 }' ;
165+ var connection = SqliteConnectionImpl (
166+ upstreamPort: _upstreamPort,
167+ primary: false ,
168+ updates: updates,
169+ debugName: name,
170+ mutex: mutex,
171+ readOnly: true ,
172+ openFactory: _factory);
173+ _allReadConnections.add (connection);
174+
175+ // Edge case:
176+ // If we don't await here, there is a chance that a different connection
177+ // is used for the transaction, and that it finishes and deletes the database
178+ // while this one is still opening. This is specifically triggered in tests.
179+ // To avoid that, we wait for the connection to be ready.
180+ await connection.ready;
181+ return connection;
184182 }
185183
186184 @override
@@ -190,7 +188,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
190188 // It is possible that `readLock()` removes connections from the pool while we're
191189 // closing connections, but not possible for new connections to be added.
192190 // Create a copy of the list, to avoid this triggering "Concurrent modification during iteration"
193- final toClose = _readConnections. sublist ( 0 );
191+ final toClose = _allReadConnections. toList ( );
194192 for (var connection in toClose) {
195193 // Wait for connection initialization, so that any existing readLock()
196194 // requests go through before closing.
@@ -203,3 +201,12 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
203201 await _writeConnection? .close ();
204202 }
205203}
204+
205+ typedef ReadCallback <T > = Future <T > Function (SqliteReadContext tx);
206+
207+ class _PendingItem {
208+ ReadCallback <dynamic > callback;
209+ Completer <dynamic > completer = Completer .sync ();
210+
211+ _PendingItem (this .callback);
212+ }
0 commit comments