@@ -164,20 +164,69 @@ class ChangeNotificationProvider<T> implements ChangeAware<T>, Disposable {
164164
165165/// A read-only 'view' of something. Allows getting the current value and
166166/// listening for values and changes.
167- abstract class ObservableView <T > extends ChangeAware <T > {
167+ abstract class ObservableView <T > extends ChangeAware <T > implements Disposable {
168168 T get value;
169+
170+ /// Returns a new [ObservableView] which is created by lazily calling [mapper]
171+ /// on this view's [value] , [stream] , and [changes] properties.
172+ ///
173+ /// Disposing of the mapped view doesn't affect this view in any way.
174+ ObservableView <M > map <M >(M Function (T ) mapper);
175+
176+ /// An [ObservableView] of the most-recently-published value on the given
177+ /// [stream] .
178+ factory ObservableView .fromStream (Stream <T > stream, {T initialValue}) =>
179+ new ObservableReference (initialValue)..listen (stream);
180+ }
181+
182+ /// Implements methods of [ObservableView] in terms of the basic [value] and
183+ /// [stream] properties.
184+ abstract class ObservableViewMixin <T > implements ObservableView <T > {
185+ @override
186+ Stream <Change <T >> get changes {
187+ var last = value;
188+ // Want to do this using stream.map so that the `changes` stream has the
189+ // same broadcastness/syncness as the `stream` stream.
190+ return stream.map ((v) {
191+ var change = new Change (last, v);
192+ last = v;
193+ return change;
194+ });
195+ }
196+
197+ @override
198+ ObservableView <M > map <M >(M Function (T ) mapper) =>
199+ new _MappedView <T , M >(this , mapper);
200+ }
201+
202+ /// An [ObservableView] that just points at an existing [ObservableView] and
203+ /// passes it through a mapping function.
204+ class _MappedView <I , O > extends ObservableViewMixin <O > {
205+ final ObservableView <I > _delegate;
206+ final O Function (I ) _mapper;
207+ _MappedView (this ._delegate, this ._mapper);
208+
209+ @override
210+ O get value => _mapper (_delegate.value);
211+
212+ @override
213+ Stream <O > get stream => _delegate.stream.map (_mapper);
214+
215+ @override
216+ void dispose () {}
169217}
170218
171219/// A mutable object holder that allows listening on a stream of changes.
172220///
173221/// Changes to the value using `value=` are added to a broadcast stream. If the
174222/// new value is equivalent to the current value, nothing's added to the stream.
175223class ObservableReference <T > extends ChangeNotificationProvider <T >
176- implements ObservableView <T >, ObserveAware < T >, Disposable {
224+ with ObservableViewMixin <T > {
177225 static bool _defaultEq (a, b) => a == b;
178226
227+ final EqualsFn _equalsFn;
228+ StreamSubscription _listenSub;
179229 T _value;
180- EqualsFn _equalsFn;
181230
182231 /// Creates a listenable value holder, starting with the given value.
183232 /// Optionally takes custom equality function.
@@ -200,9 +249,23 @@ class ObservableReference<T> extends ChangeNotificationProvider<T>
200249 notifyChange (previous, value);
201250 }
202251
252+ /// Listens to the given stream and sets the reference's value to whatever is
253+ /// emitted on the stream.
254+ ///
255+ /// Returns a [Future] that completes when the stream is done, analogous to
256+ /// `StreamController.addStream` .
257+ Future listen (Stream <T > stream) {
258+ _listenSub? .cancel ();
259+ _listenSub = stream.listen ((v) {
260+ value = v;
261+ });
262+ return _listenSub.asFuture ();
263+ }
264+
203265 @override
204266 void dispose () {
205267 super .dispose ();
268+ _listenSub? .cancel ();
206269 _value = null ;
207270 }
208271}
0 commit comments