@@ -5,15 +5,16 @@ import 'package:rxdart/rxdart.dart';
55
66import 'error.dart' ;
77
8- // ignore_for_file: unnecessary_null_comparison
9-
108/// Signature for strategies that build widgets based on asynchronous interaction.
11- typedef RxWidgetBuilder <T > = Widget Function (BuildContext context, T ? data);
9+ typedef RxWidgetBuilder <T > = Widget Function (BuildContext context, T data);
1210
1311/// Rx stream builder that will pre-populate the streams initial data if the
1412/// given stream is an stream that holds the streams current value such
1513/// as a [ValueStream] or a [ReplayStream]
16- class RxStreamBuilder <T > extends StreamBuilder <T > {
14+ class RxStreamBuilder <T > extends StatefulWidget {
15+ final RxWidgetBuilder <T > _builder;
16+ final ValueStream <T > _stream;
17+
1718 /// Creates a new [RxStreamBuilder] that builds itself based on the latest
1819 /// snapshot of interaction with the specified [stream] and whose build
1920 /// strategy is given by [builder] .
@@ -25,45 +26,71 @@ class RxStreamBuilder<T> extends StreamBuilder<T> {
2526 /// effects as it may be called multiple times.
2627 RxStreamBuilder ({
2728 Key ? key,
29+ required ValueStream <T > stream,
2830 required RxWidgetBuilder <T > builder,
29- required Stream <T > stream,
30- T ? initialData,
31- }) : assert (builder != null ),
32- assert (stream != null ),
33- super (
34- key: key,
35- initialData: getInitialData (initialData, stream),
36- builder: _createStreamBuilder <T >(builder),
37- stream: stream,
38- );
31+ }) : _builder = builder,
32+ _stream = stream;
33+
34+ @override
35+ _RxStreamBuilderState <T > createState () => _RxStreamBuilderState ();
3936
4037 /// Get latest value from stream or return `null` .
4138 @visibleForTesting
42- static T ? getInitialData <T >(T ? initialData, Stream <T > stream) {
43- if (initialData != null ) {
44- return initialData ;
39+ static T getInitialData <T >(ValueStream <T > stream) {
40+ if (stream.hasValue ) {
41+ return stream.value ;
4542 }
43+ throw ArgumentError .value (stream, 'stream' , 'does not have value' );
44+ }
45+ }
4646
47- if (stream is ValueStream <T > && stream.hasValue) {
48- return stream.requireValue;
49- }
47+ class _RxStreamBuilderState <T > extends State <RxStreamBuilder <T >> {
48+ late T value;
49+ StreamSubscription <T >? subscription;
50+
51+ @override
52+ void initState () {
53+ super .initState ();
54+ subscribe ();
55+ }
5056
51- if (stream is ReplayStream <T >) {
52- final values = stream.values;
53- if (values.isNotEmpty) {
54- return values.last;
55- }
57+ @override
58+ void didUpdateWidget (covariant RxStreamBuilder <T > oldWidget) {
59+ super .didUpdateWidget (oldWidget);
60+ if (oldWidget._stream != widget._stream) {
61+ unsubscribe ();
62+ subscribe ();
5663 }
64+ }
65+
66+ @override
67+ void dispose () {
68+ unsubscribe ();
69+ super .dispose ();
70+ }
5771
58- return null ;
72+ @override
73+ Widget build (BuildContext context) => widget._builder (context, value);
74+
75+ void subscribe () {
76+ value = RxStreamBuilder .getInitialData (widget._stream);
77+
78+ subscription = widget._stream.listen (
79+ (v) => setState (() => value = v),
80+ onError: (Object e, StackTrace s) {
81+ FlutterError .reportError (
82+ FlutterErrorDetails (
83+ exception: UnhandledStreamError (e),
84+ stack: s,
85+ library: 'flutter_bloc_pattern' ,
86+ ),
87+ );
88+ },
89+ );
5990 }
6091
61- static AsyncWidgetBuilder <T > _createStreamBuilder <T >(
62- RxWidgetBuilder <T > builder) =>
63- (context, snapshot) {
64- if (snapshot.hasError) {
65- throw UnhandledStreamError (snapshot.error! );
66- }
67- return builder (context, snapshot.data);
68- };
92+ void unsubscribe () {
93+ subscription? .cancel ();
94+ subscription = null ;
95+ }
6996}
0 commit comments