diff --git a/pkgs/async/CHANGELOG.md b/pkgs/async/CHANGELOG.md index ca81027f..1f6d74da 100644 --- a/pkgs/async/CHANGELOG.md +++ b/pkgs/async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 2.13.1-wip + +- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed. + ## 2.13.0 - Fix type check and cast in SubscriptionStream's cancelOnError wrapper diff --git a/pkgs/async/lib/src/stream_group.dart b/pkgs/async/lib/src/stream_group.dart index 502a111c..79f057f1 100644 --- a/pkgs/async/lib/src/stream_group.dart +++ b/pkgs/async/lib/src/stream_group.dart @@ -289,7 +289,32 @@ class StreamGroup implements Sink> { if (_closed) return _controller.done; _closed = true; - if (_subscriptions.isEmpty) _controller.close(); + + if (_subscriptions.isEmpty) { + _onIdleController?.close(); + _controller.close(); + return _controller.done; + } + + if (_controller.stream.isBroadcast) { + // For a broadcast group that's closed, we must listen to streams with + // null subscriptions to detect when they complete. This ensures the + // group itself can close once all its streams have closed. + List>? streamsToRemove; + + _subscriptions.updateAll((stream, subscription) { + if (subscription != null) return subscription; + + try { + return _listenToStream(stream); + } on Object { + (streamsToRemove ??= []).add(stream); + return null; + } + }); + + streamsToRemove?.forEach(_subscriptions.remove); + } return _controller.done; } diff --git a/pkgs/async/pubspec.yaml b/pkgs/async/pubspec.yaml index db2d84f4..688c14b6 100644 --- a/pkgs/async/pubspec.yaml +++ b/pkgs/async/pubspec.yaml @@ -1,5 +1,5 @@ name: async -version: 2.13.0 +version: 2.13.1-wip description: Utility functions and classes related to the 'dart:async' library. repository: https://github.com/dart-lang/core/tree/main/pkgs/async issue_tracker: https://github.com/dart-lang/core/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aasync diff --git a/pkgs/async/test/stream_group_test.dart b/pkgs/async/test/stream_group_test.dart index 3700120e..7b40f6e3 100644 --- a/pkgs/async/test/stream_group_test.dart +++ b/pkgs/async/test/stream_group_test.dart @@ -491,6 +491,22 @@ void main() { controller.add('first'); expect(streamGroup.close(), completes); }); + + test('completes close() when streams close without being removed', + () async { + var controller = StreamController.broadcast(); + var group = StreamGroup.broadcast(); + group.add(controller.stream); + var closeCompleted = false; + group.close().then((_) => closeCompleted = true); + + await flushMicrotasks(); + expect(closeCompleted, isFalse); + + await controller.close(); + await flushMicrotasks(); + expect(closeCompleted, isTrue); + }); }); group('regardless of type', () {