Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkgs/async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.13.0

- Fix type check and cast in SubscriptionStream's cancelOnError wrapper

## 2.12.0

- Require Dart 3.4.
Expand Down
10 changes: 7 additions & 3 deletions pkgs/async/lib/src/subscription_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,16 @@ class _CancelOnErrorSubscriptionWrapper<T>
super.onError((Object error, StackTrace stackTrace) {
// Wait for the cancel to complete before sending the error event.
super.cancel().whenComplete(() {
if (handleError is ZoneBinaryCallback) {
if (handleError is dynamic Function(Object, StackTrace)) {
handleError(error, stackTrace);
} else if (handleError != null) {
(handleError as ZoneUnaryCallback)(error);
} else if (handleError is dynamic Function(Object)) {
handleError(error);
}
});
});
}
}

abstract class BinaryWrapper {
void binaryCall<T1, T2>(T1 t1, T2 t2);
}
2 changes: 1 addition & 1 deletion pkgs/async/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: async
version: 2.12.0
version: 2.13.0
description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/core/tree/main/pkgs/async
issue_tracker: https://github.com/dart-lang/core/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aasync
Expand Down
70 changes: 70 additions & 0 deletions pkgs/async/test/subscription_stream_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,76 @@ void main() {
});
}
});

group('subscriptionStream error callback', () {
test('binary typed', () async {
var completer = Completer<void>();
var stream = createErrorStream();
var sourceSubscription = stream.listen(null, cancelOnError: true);
var subscriptionStream = SubscriptionStream(sourceSubscription);

void f(Object error, StackTrace stackTrace) {
completer.complete();
}

subscriptionStream.listen((_) {},
onError: f,
onDone: () => throw 'should not happen',
cancelOnError: true);
await completer.future;
await flushMicrotasks();
});

test('binary dynamic', () async {
var completer = Completer<void>();
var stream = createErrorStream();
var sourceSubscription = stream.listen(null, cancelOnError: true);
var subscriptionStream = SubscriptionStream(sourceSubscription);

subscriptionStream.listen((_) {},
onError: (error, stackTrace) {
completer.complete();
},
onDone: () => throw 'should not happen',
cancelOnError: true);
await completer.future;
await flushMicrotasks();
});

test('unary typed', () async {
var completer = Completer<void>();
var stream = createErrorStream();
var sourceSubscription = stream.listen(null, cancelOnError: true);
var subscriptionStream = SubscriptionStream(sourceSubscription);

void f(Object error) {
completer.complete();
}

subscriptionStream.listen((_) {},
onError: f,
onDone: () => throw 'should not happen',
cancelOnError: true);
await completer.future;
await flushMicrotasks();
});

test('unary dynamic', () async {
var completer = Completer<void>();
var stream = createErrorStream();
var sourceSubscription = stream.listen(null, cancelOnError: true);
var subscriptionStream = SubscriptionStream(sourceSubscription);

subscriptionStream.listen((_) {},
onError: (error) {
completer.complete();
},
onDone: () => throw 'should not happen',
cancelOnError: true);
await completer.future;
await flushMicrotasks();
});
});
}

Stream<int> createStream() async* {
Expand Down
Loading