Skip to content

Commit 2249522

Browse files
authored
Refactor KeyAndValue, Logger. Add AsyncQueue (#5)
* wip * check * @nonvirtual * stack_trace * update fake_storage.dart * default_logger_test.dart and fix default_logger.dartr * test WriteFailureEvent * clear event tests * remove events * == * test throws error * throws all * update tests * fixed tests and add async_queue.dart * fix async_queue.dart * fix async_queue.dart * fix async_queue.dart * fix async_queue.dart * cancel then close * rm ex
1 parent 4d58dfc commit 2249522

File tree

13 files changed

+725
-212
lines changed

13 files changed

+725
-212
lines changed
File renamed without changes.

lib/src/async/async_queue.dart

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import 'dart:async';
2+
3+
import 'package:meta/meta.dart';
4+
5+
class _AsyncQueueEntry<T> {
6+
final Completer<T> completer;
7+
8+
final AsyncQueueBlock<T> block;
9+
10+
_AsyncQueueEntry(this.completer, this.block);
11+
}
12+
13+
/// Function that returns a [Future].
14+
@internal
15+
typedef AsyncQueueBlock<T> = Future<T> Function();
16+
17+
/// A serial queue that executes single block at a time and it does this in FIFO order, first in, first out.
18+
/// Serial queue are often used to synchronize access to a specific value or resource to prevent data races to occur.
19+
@internal
20+
class AsyncQueue<T> {
21+
final _blockS = StreamController<_AsyncQueueEntry<T>>();
22+
StreamSubscription<T>? _subscription;
23+
24+
/// Construct [AsyncQueue].
25+
AsyncQueue() {
26+
_subscription = _blockS.stream.asyncMap((entry) {
27+
final completer = entry.completer;
28+
29+
Future<T> future;
30+
try {
31+
future = entry.block();
32+
} catch (e, s) {
33+
completer.completeError(e, s);
34+
rethrow;
35+
}
36+
37+
return future.then((v) {
38+
completer.complete(v);
39+
return v;
40+
}).onError<Object>((Object e, StackTrace s) {
41+
completer.completeError(e, s);
42+
throw e;
43+
});
44+
}).listen(null, onError: (Object _) {});
45+
}
46+
47+
/// Close queue.
48+
Future<void> dispose() {
49+
if (_subscription == null || _blockS.isClosed) {
50+
throw StateError('AsyncQueue has been disposed!');
51+
}
52+
final future = _subscription!.cancel().then<void>((_) => _blockS.close());
53+
_subscription = null;
54+
return future;
55+
}
56+
57+
/// Add block to queue.
58+
Future<T> enqueue(AsyncQueueBlock<T> block) {
59+
if (_subscription == null || _blockS.isClosed) {
60+
throw StateError('AsyncQueue has been disposed!');
61+
}
62+
63+
final completer = Completer<T>.sync();
64+
_blockS.add(_AsyncQueueEntry(completer, block));
65+
return completer.future;
66+
}
67+
}

lib/src/impl/real_storage.dart

Lines changed: 92 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import 'package:disposebag/disposebag.dart' hide Logger;
44
import 'package:meta/meta.dart';
55
import 'package:rxdart_ext/rxdart_ext.dart';
66

7-
import '../async_memoizer.dart';
7+
import '../async/async_memoizer.dart';
8+
import '../async/async_queue.dart';
89
import '../interface/rx_storage.dart';
910
import '../interface/storage.dart';
1011
import '../logger/event.dart';
@@ -18,11 +19,16 @@ import '../model/key_and_value.dart';
1819
/// Default [RxStorage] implementation.
1920
class RealRxStorage<Key extends Object, Options,
2021
S extends Storage<Key, Options>> implements RxStorage<Key, Options> {
21-
static const _initialKeyValue =
22-
KeyAndValue('rx_storage', 'Petrus Nguyen Thai Hoc <[email protected]>');
22+
static const _initialKeyValue = KeyAndValue<Object, Object>(
23+
'rx_storage', 'Petrus Nguyen Thai Hoc <[email protected]>', String);
2324

2425
/// Trigger subject
25-
final _keyValuesSubject = PublishSubject<Map<Key, Object?>>();
26+
final _keyValuesSubject =
27+
PublishSubject<Map<Key, KeyAndValue<Key, Object?>>>();
28+
29+
/// Write queue.
30+
/// Basic lock mechanism to prevent concurrent access to asynchronous code.
31+
final _writeQueue = AsyncQueue<Object?>();
2632

2733
final _disposeMemo = AsyncMemoizer<void>();
2834
late final _bag =
@@ -140,7 +146,7 @@ class RealRxStorage<Key extends Object, Options,
140146

141147
/// Add changed map to subject to trigger.
142148
@protected
143-
void sendChange(Map<Key, Object?> map) {
149+
void sendChange(Map<Key, KeyAndValue<Key, Object?>> map) {
144150
assert(_debugAssertNotDisposed());
145151
assert(map != null);
146152

@@ -151,9 +157,9 @@ class RealRxStorage<Key extends Object, Options,
151157
}
152158
}
153159

154-
/// Log event.
160+
/// Log event if logging is enabled.
155161
@protected
156-
void log(LoggerEvent<Key, Options> event) {
162+
void logIfEnabled(LoggerEvent<Key, Options> event) {
157163
assert(_debugAssertNotDisposed());
158164
assert(event != null);
159165

@@ -162,6 +168,11 @@ class RealRxStorage<Key extends Object, Options,
162168
}
163169
}
164170

171+
/// Enqueue writing task to a queue.
172+
@protected
173+
Future<T> enqueueWritingTask<T>(AsyncQueueBlock<T> block) =>
174+
_writeQueue.enqueue(block).then((value) => value as T);
175+
165176
//
166177
// Get and set methods (implements [Storage])
167178
//
@@ -186,7 +197,7 @@ class RealRxStorage<Key extends Object, Options,
186197
(value, _) {
187198
if (_isLogEnabled) {
188199
_publishLog(
189-
ReadValueSuccessEvent(KeyAndValue(key, value), T, options));
200+
ReadValueSuccessEvent(KeyAndValue(key, value, T), options));
190201
}
191202
},
192203
(error, _) {
@@ -218,46 +229,52 @@ class RealRxStorage<Key extends Object, Options,
218229
}
219230

220231
@override
221-
Future<void> clear([Options? options]) async {
232+
Future<void> clear([Options? options]) {
222233
assert(_debugAssertNotDisposed());
223234

224-
final keys = (await _useStorage((s) => s.readAll(options))).keys;
225-
226-
return await useStorageWithHandlers(
227-
(s) => s.clear(options),
228-
(_, __) {
229-
sendChange({for (final k in keys) k: null});
230-
if (_isLogEnabled) {
231-
_publishLog(ClearSuccessEvent(options));
232-
}
233-
},
234-
(error, _) {
235-
if (_isLogEnabled) {
236-
_publishLog(ClearFailureEvent(error, options));
237-
}
238-
},
239-
);
235+
return enqueueWritingTask(() async {
236+
final keys = (await _useStorage((s) => s.readAll(options))).keys;
237+
238+
return await useStorageWithHandlers(
239+
(s) => s.clear(options),
240+
(_, __) {
241+
sendChange({for (final k in keys) k: KeyAndValue(k, null, Null)});
242+
243+
if (_isLogEnabled) {
244+
_publishLog(ClearSuccessEvent(options));
245+
}
246+
},
247+
(error, _) {
248+
if (_isLogEnabled) {
249+
_publishLog(ClearFailureEvent(error, options));
250+
}
251+
},
252+
);
253+
});
240254
}
241255

242256
@override
243257
Future<void> remove(Key key, [Options? options]) {
244258
assert(_debugAssertNotDisposed());
245259
assert(key != null);
246260

247-
return useStorageWithHandlers(
248-
(s) => s.remove(key, options),
249-
(_, __) {
250-
sendChange({key: null});
251-
if (_isLogEnabled) {
252-
_publishLog(RemoveSuccessEvent(key, options));
253-
}
254-
},
255-
(error, _) {
256-
if (_isLogEnabled) {
257-
_publishLog(RemoveFailureEvent(key, options, error));
258-
}
259-
},
260-
);
261+
return enqueueWritingTask(() {
262+
return useStorageWithHandlers(
263+
(s) => s.remove(key, options),
264+
(_, __) {
265+
sendChange({key: KeyAndValue(key, null, Null)});
266+
267+
if (_isLogEnabled) {
268+
_publishLog(RemoveSuccessEvent(key, options));
269+
}
270+
},
271+
(error, _) {
272+
if (_isLogEnabled) {
273+
_publishLog(RemoveFailureEvent(key, options, error));
274+
}
275+
},
276+
);
277+
});
261278
}
262279

263280
@override
@@ -267,21 +284,26 @@ class RealRxStorage<Key extends Object, Options,
267284
assert(key != null);
268285
assert(encoder != null);
269286

270-
return useStorageWithHandlers(
271-
(s) => s.write(key, value, encoder, options),
272-
(_, __) {
273-
sendChange({key: value});
274-
if (_isLogEnabled) {
275-
_publishLog(WriteSuccessEvent(KeyAndValue(key, value), T, options));
276-
}
277-
},
278-
(error, __) {
279-
if (_isLogEnabled) {
280-
_publishLog(
281-
WriteFailureEvent(KeyAndValue(key, value), T, options, error));
282-
}
283-
},
284-
);
287+
return enqueueWritingTask(() {
288+
return useStorageWithHandlers(
289+
(s) => s.write(key, value, encoder, options),
290+
(_, __) {
291+
final keyAndValue = KeyAndValue(key, value, T);
292+
293+
sendChange({key: keyAndValue});
294+
295+
if (_isLogEnabled) {
296+
_publishLog(WriteSuccessEvent(keyAndValue, options));
297+
}
298+
},
299+
(error, __) {
300+
if (_isLogEnabled) {
301+
_publishLog(
302+
WriteFailureEvent(KeyAndValue(key, value, T), options, error));
303+
}
304+
},
305+
);
306+
});
285307
}
286308

287309
//
@@ -294,22 +316,26 @@ class RealRxStorage<Key extends Object, Options,
294316
assert(_debugAssertNotDisposed());
295317
assert(key != null);
296318

319+
FutureOr<T?> convert(KeyAndValue<Object, Object?> entry) {
320+
if (identical(entry, _initialKeyValue)) {
321+
return _useStorage((s) => s.read<T>(key, decoder, options));
322+
} else {
323+
// ignore assertion if [entry.type] is `Null` or `dynamic`.
324+
assert(entry.type == Null || entry.type == dynamic || entry.type == T);
325+
return entry.value as FutureOr<T?>;
326+
}
327+
}
328+
297329
final stream = _keyValuesSubject
298330
.toSingleSubscriptionStream()
299-
.mapNotNull((map) => map.containsKey(key)
300-
? KeyAndValue<Object, Object?>(key, map[key])
301-
: null)
331+
.mapNotNull<KeyAndValue<Object, Object?>>((map) => map[key])
302332
.startWith(_initialKeyValue) // Dummy value to trigger initial load.
303-
.asyncMap<T?>(
304-
(entry) => identical(entry, _initialKeyValue)
305-
? _useStorage((s) => s.read<T>(key, decoder, options))
306-
: entry.value as FutureOr<T?>,
307-
);
333+
.asyncMap<T?>(convert);
308334

309335
return _isLogEnabled
310336
? stream
311337
.doOnData((value) =>
312-
_publishLog(OnDataStreamEvent(KeyAndValue(key, value))))
338+
_publishLog(OnDataStreamEvent(KeyAndValue(key, value, T))))
313339
.doOnError((e, s) => _publishLog(
314340
OnErrorStreamEvent(RxStorageError(e, s ?? StackTrace.empty))))
315341
: stream;
@@ -330,7 +356,9 @@ class RealRxStorage<Key extends Object, Options,
330356
Future<void> dispose() {
331357
assert(_debugAssertNotDisposed());
332358

333-
return _disposeMemo.runOnce(_bag.dispose).then((_) => _onDispose?.call());
359+
return _disposeMemo
360+
.runOnce(() => _writeQueue.dispose().then((_) => _bag.dispose()))
361+
.then((_) => _onDispose?.call());
334362
}
335363
}
336364

0 commit comments

Comments
 (0)