1
1
import 'dart:async' ;
2
2
3
- import 'package:meta/meta.dart' ;
4
-
5
3
import 'logger.dart' ;
6
4
import 'reducer.dart' ;
7
5
import 'reducer_exception.dart' ;
@@ -34,10 +32,10 @@ extension ReduxStoreExt<Action> on Stream<Action> {
34
32
/// * Param [State] The type of the State
35
33
/// * Param [Action] The type of the Actions
36
34
Stream <State > reduxStore <State >({
37
- @ required State Function () initialStateSupplier,
38
- @ required Iterable <SideEffect <Action , State >> sideEffects,
39
- @ required Reducer <Action , State > reducer,
40
- RxReduxLogger logger,
35
+ required State Function () initialStateSupplier,
36
+ required Iterable <SideEffect <Action , State >> sideEffects,
37
+ required Reducer <Action , State > reducer,
38
+ RxReduxLogger ? logger,
41
39
}) =>
42
40
transform (
43
41
ReduxStoreStreamTransformer (
@@ -66,7 +64,7 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
66
64
final S Function () _initialStateSupplier;
67
65
final Iterable <SideEffect <A , S >> _sideEffects;
68
66
final Reducer <A , S > _reducer;
69
- final RxReduxLogger _logger;
67
+ final RxReduxLogger ? _logger;
70
68
71
69
/// * Param [initialStateSupplier] A function that computes the initial state. The computation is
72
70
/// * done lazily once an observer subscribes. The computed initial state will be emitted directly
@@ -77,26 +75,20 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
77
75
/// * Param [S] The type of the State
78
76
/// * Param [A] The type of the Actions
79
77
ReduxStoreStreamTransformer ({
80
- @required S Function () initialStateSupplier,
81
- @required Iterable <SideEffect <A , S >> sideEffects,
82
- @required Reducer <A , S > reducer,
83
- RxReduxLogger logger,
84
- }) : assert (initialStateSupplier != null ,
85
- 'initialStateSupplier cannot be null' ),
86
- assert (sideEffects != null , 'sideEffects cannot be null' ),
87
- assert (sideEffects.every ((sideEffect) => sideEffect != null ),
88
- 'All sideEffects must be not null' ),
89
- assert (reducer != null , 'reducer cannot be null' ),
90
- _initialStateSupplier = initialStateSupplier,
78
+ required S Function () initialStateSupplier,
79
+ required Iterable <SideEffect <A , S >> sideEffects,
80
+ required Reducer <A , S > reducer,
81
+ RxReduxLogger ? logger,
82
+ }) : _initialStateSupplier = initialStateSupplier,
91
83
_sideEffects = sideEffects,
92
84
_reducer = reducer,
93
85
_logger = logger;
94
86
95
87
@override
96
88
Stream <S > bind (Stream <A > stream) {
97
- StreamController <S > controller;
98
- List <StreamSubscription <dynamic >> subscriptions;
99
- StreamController <WrapperAction <A >> actionController ;
89
+ late StreamController <S > controller;
90
+ List <StreamSubscription <dynamic >>? subscriptions;
91
+ StreamController <WrapperAction <A >>? _actionController ;
100
92
101
93
void onListen () {
102
94
S state;
@@ -110,7 +102,6 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
110
102
}
111
103
112
104
void onDataActually (WrapperAction <A > wrapper) {
113
- final action = wrapper.action;
114
105
final type = wrapper.type;
115
106
final currentState = state;
116
107
@@ -123,6 +114,7 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
123
114
return controller.add (currentState);
124
115
}
125
116
117
+ final action = wrapper.action! ;
126
118
try {
127
119
final newState = _reducer (currentState, action);
128
120
controller.add (newState);
@@ -151,7 +143,8 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
151
143
}
152
144
}
153
145
154
- actionController = StreamController <WrapperAction <A >>.broadcast ();
146
+ final actionController =
147
+ _actionController = StreamController <WrapperAction <A >>.broadcast ();
155
148
156
149
// Call reducer on each action.
157
150
final subscriptionActionController =
@@ -170,16 +163,24 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
170
163
);
171
164
172
165
final getState = () => state;
166
+ final actionStream = actionController.stream
167
+ .map ((wrapper) => wrapper.action! )
168
+ .asBroadcastStream (onCancel: (s) => s.cancel ());
173
169
174
170
subscriptions = [
175
- ..._listenSideEffects (actionController, getState, controller),
171
+ ..._listenSideEffects (
172
+ actionController,
173
+ getState,
174
+ controller,
175
+ actionStream,
176
+ ),
176
177
subscriptionUpstream,
177
178
subscriptionActionController
178
179
];
179
180
}
180
181
181
182
Future <void > onCancel () async {
182
- final future = actionController ? .close ();
183
+ final future = _actionController ? .close ();
183
184
final cancelFutures = subscriptions? .map ((s) => s.cancel ());
184
185
final futures = [...? cancelFutures, if (future != null ) future];
185
186
@@ -199,8 +200,8 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
199
200
controller = StreamController <S >(
200
201
sync : true ,
201
202
onListen: onListen,
202
- onPause: () => subscriptions.forEach ((s) => s.pause ()),
203
- onResume: () => subscriptions.forEach ((s) => s.resume ()),
203
+ onPause: () => subscriptions? .forEach ((s) => s.pause ()),
204
+ onResume: () => subscriptions? .forEach ((s) => s.resume ()),
204
205
onCancel: onCancel,
205
206
);
206
207
}
@@ -211,26 +212,24 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
211
212
Iterable <StreamSubscription <dynamic >> _listenSideEffects (
212
213
StreamController <WrapperAction <A >> actionController,
213
214
GetState <S > getState,
214
- StreamController <S > controller,
215
+ StreamController <S > stateController,
216
+ Stream <A > actionStream,
215
217
) {
216
218
return _sideEffects.mapIndexed (
217
219
(index, sideEffect) {
218
220
Stream <A > actions;
219
221
try {
220
- actions = sideEffect (
221
- actionController.stream.map ((wrapper) => wrapper.action),
222
- getState,
223
- );
222
+ actions = sideEffect (actionStream, getState);
224
223
} catch (e, s) {
225
224
actions = Stream .error (e, s);
226
225
}
227
226
227
+ final sideEffectAction = ActionType .sideEffect (index);
228
228
return actions
229
- .map (
230
- (action) => WrapperAction (action, ActionType .sideEffect (index)))
229
+ .map ((action) => WrapperAction (action, sideEffectAction))
231
230
.listen (
232
231
actionController.add,
233
- onError: controller .addError,
232
+ onError: stateController .addError,
234
233
// Swallow onDone because just if one SideEffect reaches onDone
235
234
// we don't want to make everything incl. ReduxStore and other SideEffects reach onDone
236
235
);
0 commit comments