Skip to content

Commit 89dcb4e

Browse files
committed
Fix: ConcurrentModificationError and improve broadcast stream handling (#372)
1 parent f1283f7 commit 89dcb4e

File tree

1 file changed

+16
-4
lines changed

1 file changed

+16
-4
lines changed

pkgs/async/lib/src/stream_group.dart

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,24 @@ class StreamGroup<T> implements Sink<Stream<T>> {
297297
}
298298

299299
if (_controller.stream.isBroadcast) {
300-
for (var entry in _subscriptions.entries.where((e) => e.value == null)) {
300+
// For a broadcast group that's closed, we must listen to streams with
301+
// null subscriptions to detect when they complete. This ensures the
302+
// group itself can close once all its streams have closed.
303+
final streamsToRemove = <Stream<T>>[];
304+
305+
_subscriptions.updateAll((stream, subscription) {
306+
if (subscription != null) return subscription;
307+
301308
try {
302-
_subscriptions[entry.key] = _listenToStream(entry.key);
303-
} catch (_) {
304-
_subscriptions.remove(entry.key);
309+
return _listenToStream(stream);
310+
} on Object {
311+
streamsToRemove.add(stream);
312+
return null;
305313
}
314+
});
315+
316+
for (final stream in streamsToRemove) {
317+
_subscriptions.remove(stream);
306318
}
307319
}
308320

0 commit comments

Comments
 (0)