From f064d389584e424a67c564b3130eabab67277eeb Mon Sep 17 00:00:00 2001 From: Andrew Bekhiet <40571928+Andrew-Bekhiet@users.noreply.github.com> Date: Thu, 22 Dec 2022 15:21:50 +0200 Subject: [PATCH 1/2] feat(operators): startWithFuture --- lib/src/operators/index.dart | 1 + lib/src/operators/start_with_future.dart | 22 ++++++++++++++++++++++ test/operators/start_with_future_test.dart | 14 ++++++++++++++ 3 files changed, 37 insertions(+) create mode 100644 lib/src/operators/start_with_future.dart create mode 100644 test/operators/start_with_future_test.dart diff --git a/lib/src/operators/index.dart b/lib/src/operators/index.dart index ce06258..37a6406 100644 --- a/lib/src/operators/index.dart +++ b/lib/src/operators/index.dart @@ -5,5 +5,6 @@ export 'do_on.dart'; export 'done_on_error.dart'; export 'flat_map_batches.dart'; export 'ignore.dart'; +export 'start_with_future.dart'; export 'to_single_subscription.dart'; export 'void.dart'; diff --git a/lib/src/operators/start_with_future.dart b/lib/src/operators/start_with_future.dart new file mode 100644 index 0000000..e5765a4 --- /dev/null +++ b/lib/src/operators/start_with_future.dart @@ -0,0 +1,22 @@ +import 'package:rxdart/rxdart.dart'; + +/// Just like [startWith], but accepts a [Future] +/// +/// ### Example +/// +/// Stream.fromIterable([1, 2, 3]) +/// .startWithFuture(Future(() async => 0)) +/// .listen(print); // prints 0, 1, 2, 3 +/// +extension StartWithFuture on Stream { + /// Just like [startWith], but accepts a [Future] + /// + /// ### Example + /// + /// Stream.fromIterable([1, 2, 3]) + /// .startWithFuture(Future(() async => 0)) + /// .listen(print); // prints 0, 1, 2, 3 + /// + Stream startWithFuture(Future startValue) => + Stream.fromFuture(startValue).switchMap(startWith); +} diff --git a/test/operators/start_with_future_test.dart b/test/operators/start_with_future_test.dart new file mode 100644 index 0000000..7fba47f --- /dev/null +++ b/test/operators/start_with_future_test.dart @@ -0,0 +1,14 @@ +import 'package:rxdart_ext/rxdart_ext.dart'; +import 'package:test/test.dart'; + +void main() { + test( + 'Stream.startWithFuture', + () async { + final stream = + Stream.fromIterable([1, 2, 3]).startWithFuture(Future(() async => 0)); + + expect(stream, emitsInOrder([0, 1, 2, 3])); + }, + ); +} From 53727086d1c89ef0e363df6d2ebfb560a8bc7adc Mon Sep 17 00:00:00 2001 From: Andrew Bekhiet <40571928+Andrew-Bekhiet@users.noreply.github.com> Date: Tue, 24 Oct 2023 12:41:39 +0000 Subject: [PATCH 2/2] fix(startWithFuture): ensure events are not missed test(startWithFuture): simulate delay to ensure no events were dropped --- lib/src/operators/start_with_future.dart | 4 ++-- test/operators/start_with_future_test.dart | 19 +++++++++++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/lib/src/operators/start_with_future.dart b/lib/src/operators/start_with_future.dart index e5765a4..513450e 100644 --- a/lib/src/operators/start_with_future.dart +++ b/lib/src/operators/start_with_future.dart @@ -17,6 +17,6 @@ extension StartWithFuture on Stream { /// .startWithFuture(Future(() async => 0)) /// .listen(print); // prints 0, 1, 2, 3 /// - Stream startWithFuture(Future startValue) => - Stream.fromFuture(startValue).switchMap(startWith); + Stream startWithFuture(Future startFuture) => + Rx.concatEager([startFuture.asStream(), this]); } diff --git a/test/operators/start_with_future_test.dart b/test/operators/start_with_future_test.dart index 7fba47f..106b56a 100644 --- a/test/operators/start_with_future_test.dart +++ b/test/operators/start_with_future_test.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:rxdart_ext/rxdart_ext.dart'; import 'package:test/test.dart'; @@ -5,10 +7,23 @@ void main() { test( 'Stream.startWithFuture', () async { - final stream = - Stream.fromIterable([1, 2, 3]).startWithFuture(Future(() async => 0)); + final stream = _createStreamForTest().startWithFuture(_startFuture()); expect(stream, emitsInOrder([0, 1, 2, 3])); }, ); } + +Future _startFuture() async { + await Future.delayed(Duration(seconds: 1, milliseconds: 500)); + return 0; +} + +Stream _createStreamForTest() async* { + yield 1; + + await Future.delayed(Duration(seconds: 1)); + + yield 2; + yield 3; +}