@@ -29,6 +29,7 @@ SideEffect<A, S> _onEachActionSideEffect<A, S>(StreamSink<A> outputSink) {
29
29
/// Redux store based on [Stream] .
30
30
class RxReduxStore <A , S > {
31
31
final void Function (A ) _dispatch;
32
+ final void Function (Stream <A >) _dispatchMany;
32
33
33
34
final GetState <S > _getState;
34
35
final Stream <S > _stateStream;
@@ -38,6 +39,7 @@ class RxReduxStore<A, S> {
38
39
39
40
const RxReduxStore ._(
40
41
this ._dispatch,
42
+ this ._dispatchMany,
41
43
this ._getState,
42
44
this ._stateStream,
43
45
this ._actionStream,
@@ -80,19 +82,26 @@ class RxReduxStore<A, S> {
80
82
.asBroadcastStream (onCancel: (subscription) => subscription.cancel ());
81
83
82
84
var currentState = initialState;
83
- final subscription = stateStream.listen (
84
- (newState) => currentState = newState,
85
- onError: errorHandler,
86
- );
85
+ final subscriptions = < StreamSubscription <Object >> [
86
+ stateStream.listen (
87
+ (newState) => currentState = newState,
88
+ onError: errorHandler,
89
+ ),
90
+ ];
87
91
88
92
return RxReduxStore ._(
89
93
actionController.add,
94
+ (actions) => subscriptions.add (actions.listen (actionController.add)),
90
95
() => currentState,
91
96
stateStream,
92
97
actionOutputController.stream,
93
98
() async {
99
+ if (subscriptions.length == 1 ) {
100
+ await subscriptions[0 ].cancel ();
101
+ } else {
102
+ await Future .wait (subscriptions.map ((s) => s.cancel ()));
103
+ }
94
104
await actionController.close ();
95
- await subscription.cancel ();
96
105
},
97
106
);
98
107
}
@@ -169,6 +178,21 @@ class RxReduxStore<A, S> {
169
178
/// store.dispatch(SubmitLogin());
170
179
void dispatch (A action) => _dispatch (action);
171
180
181
+ /// Dispatch [Stream] of actions to store.
182
+ ///
183
+ /// The [StreamSubscription] from listening [actionStream]
184
+ /// will be cancelled when calling [dispose] .
185
+ /// Therefore, don't forget to call [dispose] to avoid memory leaks.
186
+ ///
187
+ /// ### Example:
188
+ ///
189
+ /// abstract class Action {}
190
+ /// class LoadNextPageAction implements Action {}
191
+ ///
192
+ /// Stream<LoadNextPageAction> loadNextPageActionStream;
193
+ /// store.dispatchMany(loadNextPageActionStream);
194
+ void dispatchMany (Stream <A > actionStream) => _dispatchMany (actionStream);
195
+
172
196
/// Dispose all resources.
173
197
/// This method is typically called in `dispose` method of Flutter `State` object.
174
198
///
@@ -183,3 +207,17 @@ class RxReduxStore<A, S> {
183
207
/// }
184
208
Future <void > dispose () => _dispose ();
185
209
}
210
+
211
+ /// Dispatch this action to [store] .
212
+ extension DispatchToExtension <A > on A {
213
+ /// Dispatch this action to [store] .
214
+ /// See [RxReduxStore.dispatch] .
215
+ void dispatchTo <S >(RxReduxStore <A , S > store) => store.dispatch (this );
216
+ }
217
+
218
+ /// /// Dispatch this actions [Stream] to [store] .
219
+ extension DispatchToStreamExtension <A > on Stream <A > {
220
+ /// Dispatch this actions [Stream] to [store] .
221
+ /// See [RxReduxStore.dispatchMany] .
222
+ void dispatchTo <S >(RxReduxStore <A , S > store) => store.dispatchMany (this );
223
+ }
0 commit comments