Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkgs/async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 2.13.0

- Fix type check and cast in SubscriptionStream's cancelOnError wrapper
- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed.
Copy link
Member

@lrhn lrhn Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to a

## 2.14.0-wip

entry, and update the pubspec.yaml version to match.
The 2.13.0 version has been released, so this fix won't be in it.
Can probably be a 2.13.1-wip "bugfix". @natebosch WDYT?

Copy link
Contributor Author

@suojae suojae Apr 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to 2.13.1-wip, Let me know if there are any other changes needed. Thanks! ☺️

commit log: 032a24b

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's looking good. It needs a second reviewer, so we can have a second opinion.


## 2.12.0

Expand Down
18 changes: 17 additions & 1 deletion pkgs/async/lib/src/stream_group.dart
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,23 @@ class StreamGroup<T> implements Sink<Stream<T>> {
if (_closed) return _controller.done;

_closed = true;
if (_subscriptions.isEmpty) _controller.close();

if (_subscriptions.isEmpty) {
_onIdleController?.add(null);
_onIdleController?.close();
_controller.close();
return _controller.done;
}

if (_controller.stream.isBroadcast) {
for (var entry in _subscriptions.entries.where((e) => e.value == null)) {
try {
_subscriptions[entry.key] = _listenToStream(entry.key);
} catch (_) {
_subscriptions.remove(entry.key);
}
}
}

return _controller.done;
}
Expand Down
16 changes: 16 additions & 0 deletions pkgs/async/test/stream_group_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,22 @@ void main() {
controller.add('first');
expect(streamGroup.close(), completes);
});

test('completes close() when streams close without being removed',
() async {
var controller = StreamController.broadcast();
var group = StreamGroup.broadcast();
group.add(controller.stream);
var closeCompleted = false;
group.close().then((_) => closeCompleted = true);

await flushMicrotasks();
expect(closeCompleted, isFalse);

await controller.close();
await flushMicrotasks();
expect(closeCompleted, isTrue);
});
});

group('regardless of type', () {
Expand Down