diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md index b09778b07..9a45beac7 100644 --- a/pkgs/stream_transform/CHANGELOG.md +++ b/pkgs/stream_transform/CHANGELOG.md @@ -1,5 +1,7 @@ ## 2.1.2-wip +- Fix an exception when a subscription to a `combineLatest` stream is canceled + while there is an ongoing asynchronous combine callback. - Require Dart 3.4 or greater. ## 2.1.1 diff --git a/pkgs/stream_transform/lib/src/combine_latest.dart b/pkgs/stream_transform/lib/src/combine_latest.dart index f02a19e55..1dd0dfe69 100644 --- a/pkgs/stream_transform/lib/src/combine_latest.dart +++ b/pkgs/stream_transform/lib/src/combine_latest.dart @@ -66,13 +66,16 @@ extension CombineLatest on Stream { return; } if (result is Future) { - sourceSubscription!.pause(); - otherSubscription!.pause(); - result - .then(controller.add, onError: controller.addError) - .whenComplete(() { - sourceSubscription!.resume(); - otherSubscription!.resume(); + final resume = Completer(); + sourceSubscription!.pause(resume.future); + otherSubscription!.pause(resume.future); + + result.then((value) { + controller.add(value); + resume.complete(); + }, onError: (Object error, StackTrace stackTrace) { + controller.addError(error, stackTrace); + resume.complete(); }); } else { controller.add(result); diff --git a/pkgs/stream_transform/test/combine_latest_test.dart b/pkgs/stream_transform/test/combine_latest_test.dart index 1985c7587..24e41cabc 100644 --- a/pkgs/stream_transform/test/combine_latest_test.dart +++ b/pkgs/stream_transform/test/combine_latest_test.dart @@ -167,6 +167,28 @@ void main() { expect(emittedValues, [3, 5]); }); }); + + test('handles combined results after stream is canceled', () async { + final source = Stream.value(1); + final other = Stream.value(2); + final combineStarted = Completer(); + final combineDelay = Completer(); + Future delayedSum(int a, int b) async { + combineStarted.complete(); + await combineDelay.future; + return a + b; + } + + final subscription = source + .combineLatest(other, delayedSum) + .listen(expectAsync1((_) {}, count: 0)); + + await combineStarted.future; + + unawaited(subscription.cancel()); + combineDelay.complete(); + // No async errors should be raised + }); }); }