Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkgs/async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 2.13.0

- Fix type check and cast in SubscriptionStream's cancelOnError wrapper
- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed.
Copy link
Member

@lrhn lrhn Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to a

## 2.14.0-wip

entry, and update the pubspec.yaml version to match.
The 2.13.0 version has been released, so this fix won't be in it.
Can probably be a 2.13.1-wip "bugfix". @natebosch WDYT?

Copy link
Contributor Author

@suojae suojae Apr 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to 2.13.1-wip, Let me know if there are any other changes needed. Thanks! ☺️

commit log: 032a24b

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's looking good. It needs a second reviewer, so we can have a second opinion.


## 2.12.0

Expand Down
29 changes: 28 additions & 1 deletion pkgs/async/lib/src/stream_group.dart
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,34 @@ class StreamGroup<T> implements Sink<Stream<T>> {
if (_closed) return _controller.done;

_closed = true;
if (_subscriptions.isEmpty) _controller.close();

if (_subscriptions.isEmpty) {
_onIdleController?.close();
_controller.close();
return _controller.done;
}

if (_controller.stream.isBroadcast) {
// For a broadcast group that's closed, we must listen to streams with
// null subscriptions to detect when they complete. This ensures the
// group itself can close once all its streams have closed.
final streamsToRemove = <Stream<T>>[];
Copy link
Member

@lrhn lrhn Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a listen throwing is likely rare, I'd prefer to not allocate this list unless it's actually needed.
It's wasteful to allocate a list that won't be used in 99.9% of cases.

So

  List<Stream<T>>? streamsToRemove;
  ... updateAll...
    on Object {
      (streamsToRemove ??= []).add(stream);
      return null;
    }
...
  if (streamsToRemove != null) {
    for (final stream in streamsToRemove) {
      ...
   }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've implemented the suggested optimization to avoid unnecessary list allocation.
Changed the code to use a nullable list that's only initialized when needed and the null-safe call to forEach ensures we only iterate when a list was actually created.

commit log: cd2faf2


_subscriptions.updateAll((stream, subscription) {
if (subscription != null) return subscription;

try {
return _listenToStream(stream);
} on Object {
streamsToRemove.add(stream);
return null;
}
});

for (final stream in streamsToRemove) {
_subscriptions.remove(stream);
}
}

return _controller.done;
}
Expand Down
16 changes: 16 additions & 0 deletions pkgs/async/test/stream_group_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,22 @@ void main() {
controller.add('first');
expect(streamGroup.close(), completes);
});

test('completes close() when streams close without being removed',
() async {
var controller = StreamController.broadcast();
var group = StreamGroup.broadcast();
group.add(controller.stream);
var closeCompleted = false;
group.close().then((_) => closeCompleted = true);

await flushMicrotasks();
expect(closeCompleted, isFalse);

await controller.close();
await flushMicrotasks();
expect(closeCompleted, isTrue);
});
});

group('regardless of type', () {
Expand Down