1
1
import 'dart:async' ;
2
2
3
- import 'package:meta/meta.dart' ;
4
-
5
3
import 'logger.dart' ;
6
4
import 'reducer.dart' ;
7
5
import 'reducer_exception.dart' ;
@@ -34,10 +32,10 @@ extension ReduxStoreExt<Action> on Stream<Action> {
34
32
/// * Param [State] The type of the State
35
33
/// * Param [Action] The type of the Actions
36
34
Stream <State > reduxStore <State >({
37
- @ required State Function () initialStateSupplier,
38
- @ required Iterable <SideEffect <Action , State >> sideEffects,
39
- @ required Reducer <Action , State > reducer,
40
- RxReduxLogger logger,
35
+ required State Function () initialStateSupplier,
36
+ required Iterable <SideEffect <Action , State >> sideEffects,
37
+ required Reducer <Action , State > reducer,
38
+ RxReduxLogger ? logger,
41
39
}) =>
42
40
transform (
43
41
ReduxStoreStreamTransformer (
@@ -66,7 +64,7 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
66
64
final S Function () _initialStateSupplier;
67
65
final Iterable <SideEffect <A , S >> _sideEffects;
68
66
final Reducer <A , S > _reducer;
69
- final RxReduxLogger _logger;
67
+ final RxReduxLogger ? _logger;
70
68
71
69
/// * Param [initialStateSupplier] A function that computes the initial state. The computation is
72
70
/// * done lazily once an observer subscribes. The computed initial state will be emitted directly
@@ -77,26 +75,20 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
77
75
/// * Param [S] The type of the State
78
76
/// * Param [A] The type of the Actions
79
77
ReduxStoreStreamTransformer ({
80
- @required S Function () initialStateSupplier,
81
- @required Iterable <SideEffect <A , S >> sideEffects,
82
- @required Reducer <A , S > reducer,
83
- RxReduxLogger logger,
84
- }) : assert (initialStateSupplier != null ,
85
- 'initialStateSupplier cannot be null' ),
86
- assert (sideEffects != null , 'sideEffects cannot be null' ),
87
- assert (sideEffects.every ((sideEffect) => sideEffect != null ),
88
- 'All sideEffects must be not null' ),
89
- assert (reducer != null , 'reducer cannot be null' ),
90
- _initialStateSupplier = initialStateSupplier,
78
+ required S Function () initialStateSupplier,
79
+ required Iterable <SideEffect <A , S >> sideEffects,
80
+ required Reducer <A , S > reducer,
81
+ RxReduxLogger ? logger,
82
+ }) : _initialStateSupplier = initialStateSupplier,
91
83
_sideEffects = sideEffects,
92
84
_reducer = reducer,
93
85
_logger = logger;
94
86
95
87
@override
96
88
Stream <S > bind (Stream <A > stream) {
97
- StreamController <S > controller;
98
- List <StreamSubscription <dynamic >> subscriptions;
99
- StreamController <WrapperAction < A >> actionController ;
89
+ late StreamController <S > controller;
90
+ List <StreamSubscription <Object ?>> ? subscriptions;
91
+ StreamController <WrapperAction > ? _actionController ;
100
92
101
93
void onListen () {
102
94
S state;
@@ -109,20 +101,20 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
109
101
return ;
110
102
}
111
103
112
- void onDataActually (WrapperAction <A > wrapper) {
113
- final action = wrapper.action;
104
+ void onDataActually (WrapperAction wrapper) {
114
105
final type = wrapper.type;
115
106
final currentState = state;
116
107
117
108
// add initial state
118
- if (identical (type, ActionType .initial)) {
109
+ if (identical (wrapper, WrapperAction .initial)) {
119
110
final message = '\n '
120
111
' ⟶ Action : $type \n '
121
112
' ⟹ Current state: $currentState ' ;
122
113
_logger? .call (message);
123
114
return controller.add (currentState);
124
115
}
125
116
117
+ final action = wrapper.action <A >();
126
118
try {
127
119
final newState = _reducer (currentState, action);
128
120
controller.add (newState);
@@ -151,35 +143,39 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
151
143
}
152
144
}
153
145
154
- actionController = StreamController <WrapperAction <A >>.broadcast ();
146
+ final actionController =
147
+ _actionController = StreamController <WrapperAction >.broadcast ();
155
148
156
149
// Call reducer on each action.
157
150
final subscriptionActionController =
158
151
actionController.stream.listen (onDataActually);
159
152
160
153
// Add initial action
161
- actionController.add (WrapperAction ( null , ActionType .initial) );
154
+ actionController.add (WrapperAction .initial);
162
155
163
156
// Listening to upstream actions
164
- final subscriptionUpstream = stream
165
- .map ((action) => WrapperAction (action, ActionType .external ))
166
- .listen (
167
- actionController.add,
168
- onError: controller.addError,
169
- onDone: controller.close,
170
- );
157
+ final subscriptionUpstream =
158
+ stream.map ((action) => WrapperAction .external (action)).listen (
159
+ actionController.add,
160
+ onError: controller.addError,
161
+ onDone: controller.close,
162
+ );
171
163
172
164
final getState = () => state;
165
+ final actionStream = actionController.stream
166
+ .map ((wrapper) => wrapper.action <A >())
167
+ .asBroadcastStream (onCancel: (s) => s.cancel ());
173
168
174
169
subscriptions = [
175
- ..._listenSideEffects (actionController, getState, controller),
170
+ ..._listenSideEffects (
171
+ actionController, getState, controller, actionStream),
176
172
subscriptionUpstream,
177
- subscriptionActionController
173
+ subscriptionActionController,
178
174
];
179
175
}
180
176
181
177
Future <void > onCancel () async {
182
- final future = actionController ? .close ();
178
+ final future = _actionController ? .close ();
183
179
final cancelFutures = subscriptions? .map ((s) => s.cancel ());
184
180
final futures = [...? cancelFutures, if (future != null ) future];
185
181
@@ -199,38 +195,35 @@ class ReduxStoreStreamTransformer<A, S> extends StreamTransformerBase<A, S> {
199
195
controller = StreamController <S >(
200
196
sync : true ,
201
197
onListen: onListen,
202
- onPause: () => subscriptions.forEach ((s) => s.pause ()),
203
- onResume: () => subscriptions.forEach ((s) => s.resume ()),
198
+ onPause: () => subscriptions? .forEach ((s) => s.pause ()),
199
+ onResume: () => subscriptions? .forEach ((s) => s.resume ()),
204
200
onCancel: onCancel,
205
201
);
206
202
}
207
203
208
204
return controller.stream;
209
205
}
210
206
211
- Iterable <StreamSubscription <dynamic >> _listenSideEffects (
212
- StreamController <WrapperAction < A > > actionController,
207
+ Iterable <StreamSubscription <Object ? >> _listenSideEffects (
208
+ StreamController <WrapperAction > actionController,
213
209
GetState <S > getState,
214
- StreamController <S > controller,
210
+ StreamController <S > stateController,
211
+ Stream <A > actionStream,
215
212
) {
216
213
return _sideEffects.mapIndexed (
217
214
(index, sideEffect) {
218
215
Stream <A > actions;
219
216
try {
220
- actions = sideEffect (
221
- actionController.stream.map ((wrapper) => wrapper.action),
222
- getState,
223
- );
217
+ actions = sideEffect (actionStream, getState);
224
218
} catch (e, s) {
225
219
actions = Stream .error (e, s);
226
220
}
227
221
228
222
return actions
229
- .map (
230
- (action) => WrapperAction (action, ActionType .sideEffect (index)))
223
+ .map ((action) => WrapperAction .sideEffect (action, index))
231
224
.listen (
232
225
actionController.add,
233
- onError: controller .addError,
226
+ onError: stateController .addError,
234
227
// Swallow onDone because just if one SideEffect reaches onDone
235
228
// we don't want to make everything incl. ReduxStore and other SideEffects reach onDone
236
229
);
0 commit comments