@@ -50,58 +50,60 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
5050
5151 @override
5252 Future <T > readLock <T >(Future <T > Function (SqliteReadContext tx) callback,
53- {Duration ? lockTimeout}) async {
53+ {Duration ? lockTimeout, String ? debugContext }) async {
5454 await _expandPool ();
5555
56- bool haveLock = false ;
57- var completer = Completer <T >();
56+ return _runZoned (() async {
57+ bool haveLock = false ;
58+ var completer = Completer <T >();
59+
60+ var futures = _readConnections.sublist (0 ).map ((connection) async {
61+ try {
62+ return await connection.readLock ((ctx) async {
63+ if (haveLock) {
64+ // Already have a different lock - release this one.
65+ return false ;
66+ }
67+ haveLock = true ;
68+
69+ var future = callback (ctx);
70+ completer.complete (future);
71+
72+ // We have to wait for the future to complete before we can release the
73+ // lock.
74+ try {
75+ await future;
76+ } catch (_) {
77+ // Ignore
78+ }
79+
80+ return true ;
81+ }, lockTimeout: lockTimeout, debugContext: debugContext);
82+ } on TimeoutException {
83+ return false ;
84+ }
85+ });
86+
87+ final stream = Stream <bool >.fromFutures (futures);
88+ var gotAny = await stream.any ((element) => element);
89+
90+ if (! gotAny) {
91+ // All TimeoutExceptions
92+ throw TimeoutException ('Failed to get a read connection' , lockTimeout);
93+ }
5894
59- var futures = _readConnections.sublist (0 ).map ((connection) async {
6095 try {
61- return await connection.readLock ((ctx) async {
62- if (haveLock) {
63- // Already have a different lock - release this one.
64- return false ;
65- }
66- haveLock = true ;
67-
68- var future = callback (ctx);
69- completer.complete (future);
70-
71- // We have to wait for the future to complete before we can release the
72- // lock.
73- try {
74- await future;
75- } catch (_) {
76- // Ignore
77- }
78-
79- return true ;
80- }, lockTimeout: lockTimeout);
81- } on TimeoutException {
82- return false ;
96+ return await completer.future;
97+ } catch (e) {
98+ // throw e;
99+ rethrow ;
83100 }
84- });
85-
86- final stream = Stream <bool >.fromFutures (futures);
87- var gotAny = await stream.any ((element) => element);
88-
89- if (! gotAny) {
90- // All TimeoutExceptions
91- throw TimeoutException ('Failed to get a read connection' , lockTimeout);
92- }
93-
94- try {
95- return await completer.future;
96- } catch (e) {
97- // throw e;
98- rethrow ;
99- }
101+ }, debugContext: debugContext ?? 'get*()' );
100102 }
101103
102104 @override
103105 Future <T > writeLock <T >(Future <T > Function (SqliteWriteContext tx) callback,
104- {Duration ? lockTimeout}) {
106+ {Duration ? lockTimeout, String ? debugContext }) {
105107 if (closed) {
106108 throw AssertionError ('Closed' );
107109 }
@@ -113,7 +115,25 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
113115 mutex: mutex,
114116 readOnly: false ,
115117 openFactory: _factory);
116- return _writeConnection! .writeLock (callback, lockTimeout: lockTimeout);
118+ return _runZoned (() {
119+ return _writeConnection! .writeLock (callback,
120+ lockTimeout: lockTimeout, debugContext: debugContext);
121+ }, debugContext: debugContext ?? 'execute()' );
122+ }
123+
124+ /// The [Mutex] on individual connections do already error in recursive locks.
125+ ///
126+ /// We duplicate the same check here, to:
127+ /// 1. Also error when the recursive transaction is handled by a different
128+ /// connection (with a different lock).
129+ /// 2. Give a more specific error message when it happens.
130+ T _runZoned <T >(T Function () callback, {required String debugContext}) {
131+ if (Zone .current[this ] != null ) {
132+ throw LockError (
133+ 'Recursive lock is not allowed. Use `tx.$debugContext ` instead of `db.$debugContext `.' );
134+ }
135+ var zone = Zone .current.fork (zoneValues: {this : true });
136+ return zone.run (callback);
117137 }
118138
119139 Future <void > _expandPool () async {
0 commit comments