Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkgs/stream_transform/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## 2.1.2-wip

- Fix an exception when a subscription to a `combineLatest` stream is canceled
while there is an ongoing asynchronous combine callback.
- Require Dart 3.4 or greater.

## 2.1.1
Expand Down
4 changes: 2 additions & 2 deletions pkgs/stream_transform/lib/src/combine_latest.dart
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ extension CombineLatest<T> on Stream<T> {
result
.then(controller.add, onError: controller.addError)
.whenComplete(() {
sourceSubscription!.resume();
otherSubscription!.resume();
sourceSubscription?.resume();
otherSubscription?.resume();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Could we put it all into the .then?

    result.then((value) {
      controller.add(value);
      _resume();
    }, onError: (Object error, Stacktrace stack) {
      controller.addError(error, stack);
      _resume();
    });
  
  // ...
  // Helper, since it occurs at least three times now, if also used as `onResume`.
  void _resume() { 
    sourceSubscription?.resume();
    otherSubscription?.resume();
  }

Mainly because whenComplete can be more expensive than it looks, because of the way it has to propagete the prior result conditionally. And in any case, .then+.whenComplete is more work, and more closures, than just a .then.

Can probably use ?. instad of !. everywhere. I expect it to generate marginally smaller code to not have a throwing branch, even if it's just calling a common function, but I haven't checked. The ! is better documentation, though.)

});
} else {
controller.add(result);
Expand Down
20 changes: 20 additions & 0 deletions pkgs/stream_transform/test/combine_latest_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,26 @@ void main() {
expect(emittedValues, [3, 5]);
});
});

test('handles combined results after stream is canceled', () async {
final source = Stream.value(1);
final other = Stream.value(2);
late final Completer<void> combineDelay;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just create the completer here? It's only set once.
(Unless zones are involved, but I don't think they are.)

Or make the variable nullable, so you could use it to check whether the delayedSum has been called yet.

Future<int> delayedSum(int a, int b) async {
combineDelay = Completer<void>();
await combineDelay.future;
return a + b;
}

final subscription = source
.combineLatest(other, delayedSum)
.listen(expectAsync1((_) {}, count: 0));

await pumpEventQueue();
Copy link
Member

@lrhn lrhn Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pumpEventQueue scares me. I know there is no possible way it can always work correctly, and if the solution to failing is to "increase the number of pumps", then it's a fragile design. So if the test works, it might be by accident - it assumes an ordering between unrelated events.
No other test in the file uses pumpEventQueue.

I see that there is nothing to synchronize on in the code that runs after the delayedSum
completes. It explicitly does nothing (adds to a cancelled stream, does nothing for null).

The first pump should be possibler to handle with a synchronization future, like you do combineDelay.

Consider (or ignore, the test is fine if you're not unreasonably paranoid about pumpEventQueue like me):

 var stream1 = StreamController<int>();
 var stream2 = StreamController<int>();

 final onDelayedSum = Completer<void>();
 final delayedSumContinue = Completer<void>();
 Future<int> delayedSum(int a, int b) async {
   onDelayedSum.complete(null);
   await delayedSumContinue.future;
   return a + b;
 }
 final combinedStream = stream1.stream.combineLatest(stream2.stream);
 expect(stream1.hasListener, false);
 expect(stream2.hasListener, false);
 final subscription = combinedStream.listen(expectAsync1((_) {}, count: 0));
 expect(stream1.hasListener, true);
 expect(stream2.hasListener, true);
 expect(stream1.isPaused, false);
 expect(stream2.isPaused, false);
 stream1.add(1);
 stream2.add(2);
 await onDelayedSum.future;
 expect(stream1.hasListener, true);
 expect(stream2.hasListener, true);
 expect(stream1.isPaused, true);
 expect(stream2.isPaused, true);
 await subscription.cancel();
 expect(stream1.hasListener, false);
 expect(stream2.hasListener, false);
 delayedSumContinue.complete(null);
 // Should not cause error.

 // Do we need to pump-and-wait here?
 // Any later error should be ascribed to the test no matter what.
 // Is it because the test would  considered complete before the error would happen?
 // (If so, it'll still be an errror, it'll just be an error-after-test-completed. The program
 // won't end before all code has run.)
 // If so:
 await pumpEventQueue();
 // Or my preference, since it doesn't arbitrarily do 20 schedule microtasks,
 // but actually waits for all microtasks. It does allow other non-microtask events
 // to run first, but this is a controlled environment,
 // we know there shouldn't be any (that we care about).
 await Future<void>.delayed(Duration.zero); // Skips all microtasks.

unawaited(subscription.cancel());
combineDelay.complete();
await pumpEventQueue();
});
});
}

Expand Down
Loading