Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkgs/async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.13.1-wip

- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed.

## 2.13.0

- Fix type check and cast in SubscriptionStream's cancelOnError wrapper
Expand Down
27 changes: 26 additions & 1 deletion pkgs/async/lib/src/stream_group.dart
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,32 @@ class StreamGroup<T> implements Sink<Stream<T>> {
if (_closed) return _controller.done;

_closed = true;
if (_subscriptions.isEmpty) _controller.close();

if (_subscriptions.isEmpty) {
_onIdleController?.close();
_controller.close();
return _controller.done;
}

if (_controller.stream.isBroadcast) {
// For a broadcast group that's closed, we must listen to streams with
// null subscriptions to detect when they complete. This ensures the
// group itself can close once all its streams have closed.
List<Stream<T>>? streamsToRemove;

_subscriptions.updateAll((stream, subscription) {
if (subscription != null) return subscription;

try {
return _listenToStream(stream);
} on Object {
(streamsToRemove ??= []).add(stream);
return null;
}
});

streamsToRemove?.forEach(_subscriptions.remove);
}

return _controller.done;
}
Expand Down
16 changes: 16 additions & 0 deletions pkgs/async/test/stream_group_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,22 @@ void main() {
controller.add('first');
expect(streamGroup.close(), completes);
});

test('completes close() when streams close without being removed',
() async {
var controller = StreamController.broadcast();
var group = StreamGroup.broadcast();
group.add(controller.stream);
var closeCompleted = false;
group.close().then((_) => closeCompleted = true);

await flushMicrotasks();
expect(closeCompleted, isFalse);

await controller.close();
await flushMicrotasks();
expect(closeCompleted, isTrue);
});
});

group('regardless of type', () {
Expand Down
Loading