@@ -4,16 +4,7 @@ import 'package:disposebag/disposebag.dart' show DisposeBag;
44import 'package:distinct_value_connectable_stream/distinct_value_connectable_stream.dart'
55 show DistinctValueConnectableExtensions, DistinctValueStream;
66import 'package:meta/meta.dart' show visibleForTesting;
7- import 'package:rxdart_ext/rxdart_ext.dart'
8- show
9- PublishSubject,
10- Rx,
11- SwitchMapExtension,
12- ExhaustMapExtension,
13- DoExtensions,
14- OnErrorExtensions,
15- ScanExtension,
16- StartWithExtension;
7+ import 'package:rxdart_ext/rxdart_ext.dart' ;
178
189import 'loader_message.dart' ;
1910import 'loader_state.dart' ;
@@ -22,6 +13,37 @@ import 'utils.dart';
2213
2314// ignore_for_file: close_sinks
2415
16+ /// Defines which flatMap behavior should be applied whenever a new values is emitted.
17+ enum FlatMapPolicy {
18+ /// uses [FlatMapExtension.flatMap] .
19+ merge,
20+
21+ /// uses [Stream.asyncExpand] .
22+ concat,
23+
24+ /// uses [SwitchMapExtension.switchMap] .
25+ latest,
26+
27+ /// uses [ExhaustMapExtension.exhaustMap] .
28+ first,
29+ }
30+
31+ extension _FlatMapWithPolicy <T > on Stream <T > {
32+ Stream <R > flatMapWithPolicy <R >(
33+ FlatMapPolicy policy, Stream <R > Function (T ) transform) {
34+ switch (policy) {
35+ case FlatMapPolicy .merge:
36+ return flatMap (transform);
37+ case FlatMapPolicy .concat:
38+ return asyncExpand (transform);
39+ case FlatMapPolicy .latest:
40+ return switchMap (transform);
41+ case FlatMapPolicy .first:
42+ return exhaustMap (transform);
43+ }
44+ }
45+ }
46+
2547/// BLoC that handles loading and refreshing data
2648class LoaderBloc <Content extends Object > {
2749 static const _tag = '« stream_loader »' ;
@@ -32,22 +54,23 @@ class LoaderBloc<Content extends Object> {
3254 /// Message stream
3355 final Stream <LoaderMessage <Content >> message$;
3456
35- /// Call this function fetch data
57+ /// Call this function to fetch data
3658 final void Function () fetch;
3759
3860 /// Call this function to refresh data
3961 final Future <void > Function () refresh;
4062
4163 /// Clean up resources
42- final Future <void > Function () dispose;
64+ Future <void > dispose () => _dispose ();
65+ final Future <void > Function () _dispose;
4366
4467 LoaderBloc ._({
45- required this . dispose,
68+ required Future < void > Function () dispose,
4669 required this .state$,
4770 required this .fetch,
4871 required this .refresh,
4972 required this .message$,
50- });
73+ }) : _dispose = dispose ;
5174
5275 /// Construct a [LoaderBloc]
5376 /// The [loaderFunction] is a function return a stream of [Content] s (must be not null).
@@ -63,6 +86,10 @@ class LoaderBloc<Content extends Object> {
6386 Stream <Content > Function ()? refresherFunction,
6487 Content ? initialContent,
6588 void Function (String )? logger,
89+ FlatMapPolicy loaderFlatMapPolicy =
90+ FlatMapPolicy .latest, // default is `switchMap`
91+ FlatMapPolicy refreshFlatMapPolicy =
92+ FlatMapPolicy .first, // default is `exhaustMap`
6693 }) {
6794 refresherFunction ?? = () => Stream <Content >.empty ();
6895
@@ -73,7 +100,8 @@ class LoaderBloc<Content extends Object> {
73100 final controllers = < StreamController <dynamic >> [fetchS, refreshS, messageS];
74101
75102 /// Input actions to state
76- final fetchChanges = fetchS.stream.switchMap (
103+ final fetchChanges = fetchS.stream.flatMapWithPolicy (
104+ loaderFlatMapPolicy,
77105 (_) => Rx .defer (loaderFunction)
78106 .doOnData (
79107 (content) => messageS.add (LoaderMessage .fetchSuccess (content)))
@@ -84,7 +112,8 @@ class LoaderBloc<Content extends Object> {
84112 .onErrorReturnWith (
85113 (e, _) => LoaderPartialStateChange .fetchFailure (e)),
86114 );
87- final refreshChanges = refreshS.stream.exhaustMap (
115+ final refreshChanges = refreshS.stream.flatMapWithPolicy (
116+ refreshFlatMapPolicy,
88117 (completer) => Rx .defer (refresherFunction! )
89118 .doOnData (
90119 (content) => messageS.add (LoaderMessage .refreshSuccess (content)))
0 commit comments