Skip to content
This repository was archived by the owner on May 20, 2023. It is now read-only.

Commit 4b471ef

Browse files
cpellingnshahan
authored andcommitted
Implement ObservableView.firstNonNull and ObservableView.nonNullValues.
Also fix an issue with ObservableView.values: It would drop events from `stream` on the floor (if `stream` was a broadcast stream) while waiting for the first emitted `value` to be consumed. While this is similar to the usual caveat with broadcast streams, it's unintuitive; it's reasonable for callers to expect they won't be left with a "stale" value. PiperOrigin-RevId: 209695678
1 parent 326e232 commit 4b471ef

File tree

1 file changed

+40
-3
lines changed

1 file changed

+40
-3
lines changed

lib/model/observable/observable.dart

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,26 @@ class ChangeNotificationProvider<T> implements ChangeAware<T>, Disposable {
167167
abstract class ObservableView<T> extends ChangeAware<T> implements Disposable {
168168
T get value;
169169

170+
/// Blocks until [value] is non-null, and then completes with that value.
171+
///
172+
/// If [value] is already non-null, completes immediately.
173+
///
174+
/// The implementation may internally subscribe to [stream], so if [stream] is
175+
/// single-subscription (non-broadcast), it won't be possible to listen to it
176+
/// separately.
177+
Future<T> get firstNonNull;
178+
170179
/// A [Stream] of all values on this view, including the current [value] at
171180
/// the time the stream is listened to.
172181
Stream<T> get values;
173182

183+
/// A [Stream] of all non-null values on this view, including the current
184+
/// [value] at the time the stream is listened to (if it's non-null).
185+
///
186+
/// This interface does not guarantee that consecutive data events are
187+
/// distinct, but subclasses may do so.
188+
Stream<T> get nonNullValues;
189+
174190
/// Returns a new [ObservableView] which is created by lazily calling [mapper]
175191
/// on this view's [value], [stream], and [changes] properties.
176192
///
@@ -199,17 +215,30 @@ abstract class ObservableViewMixin<T> implements ObservableView<T> {
199215
}
200216

201217
@override
202-
Stream<T> get values async* {
218+
Future<T> get firstNonNull => value != null
219+
? Future.value(value)
220+
: stream.firstWhere((value) => value != null);
221+
222+
@override
223+
Stream<T> get values {
203224
// Unlike with `changes`, we're OK here returning an async non-broadcast
204225
// stream instead of trying to inherit the broadcastness/syncness of the
205226
// underlying `stream` -- if `stream` were sync, then
206227
// `ref.values.asBroadcastStream()` would end up yielding the current value
207228
// before anything could possibly listen to it, which really defeats the
208229
// point of even calling `values` in the first place.
209-
yield value;
210-
yield* stream;
230+
231+
StreamController<T> controller;
232+
controller = StreamController(onListen: () {
233+
controller.add(value);
234+
controller.addStream(stream).then((_) => controller.close());
235+
});
236+
return controller.stream;
211237
}
212238

239+
@override
240+
Stream<T> get nonNullValues => values.where((value) => value != null);
241+
213242
@override
214243
ObservableView<M> map<M>(M Function(T) mapper) =>
215244
_MappedView<T, M>(this, mapper);
@@ -278,6 +307,14 @@ class ObservableReference<T> extends ChangeNotificationProvider<T>
278307
return _listenSub.asFuture();
279308
}
280309

310+
/// A [Stream] of all non-null values on this view, including the current
311+
/// [value] at the time the stream is listened to (if it's non-null).
312+
///
313+
/// This implementation will never emit equal consecutive data events.
314+
/// "Equality" is defined by [equalsFn].
315+
@override
316+
Stream<T> get nonNullValues => super.nonNullValues.distinct(_equalsFn);
317+
281318
@override
282319
void dispose() {
283320
super.dispose();

0 commit comments

Comments
 (0)