Skip to content

Commit 32cd9b6

Browse files
committed
Add FutureGroup.addCancelable()
1 parent 33b5232 commit 32cd9b6

File tree

4 files changed

+45
-8
lines changed

4 files changed

+45
-8
lines changed

pkgs/async/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
## 2.13.1-wip
1+
## 2.14.0-wip
22

33
- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed.
44
- Run `dart format` with the new style.
55

6+
* Add `CancelableOperationGroup`.
7+
68
## 2.13.0
79

810
- Fix type check and cast in SubscriptionStream's cancelOnError wrapper

pkgs/async/lib/src/future_group.dart

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@
44

55
import 'dart:async';
66

7+
import 'cancelable_operation.dart';
8+
9+
/// A sentinel object indicating that a member of a [FutureGroup] was canceled
10+
/// rather than completing normally.
11+
const _canceledResult = Object();
12+
713
/// A collection of futures waits until all added [Future]s complete.
814
///
915
/// Futures are added to the group with [add]. Once you're finished adding
@@ -61,12 +67,21 @@ class FutureGroup<T> implements Sink<Future<T>> {
6167
/// The values emitted by the futures that have been added to the group, in
6268
/// the order they were added.
6369
///
64-
/// The slots for futures that haven't completed yet are `null`.
65-
final _values = <T?>[];
70+
/// This is type `Object?` rather than `T?` so it can contain
71+
/// [_canceledResult]. The slots for futures that haven't completed yet are
72+
/// `null`.
73+
final _values = <Object?>[];
6674

6775
/// Wait for [task] to complete.
6876
@override
69-
void add(Future<T> task) {
77+
void add(Future<T> task) =>
78+
addCancelable(CancelableOperation.fromFuture(task));
79+
80+
/// Wait for [task] to complete.
81+
///
82+
/// If [task] is canceled, it's removed from the group without adding a value
83+
/// to [future].
84+
void addCancelable(CancelableOperation<T> task) {
7085
if (_closed) throw StateError('The FutureGroup is closed.');
7186

7287
// Ensure that future values are put into [values] in the same order they're
@@ -76,19 +91,22 @@ class FutureGroup<T> implements Sink<Future<T>> {
7691
_values.add(null);
7792

7893
_pending++;
79-
task.then((value) {
94+
task.valueOrCancellation().then((value) {
8095
if (_completer.isCompleted) return null;
8196

8297
_pending--;
83-
_values[index] = value;
98+
_values[index] = task.isCanceled ? _canceledResult : value;
8499

85100
if (_pending != 0) return null;
86101
var onIdleController = _onIdleController;
87102
if (onIdleController != null) onIdleController.add(null);
88103

89104
if (!_closed) return null;
90105
if (onIdleController != null) onIdleController.close();
91-
_completer.complete(_values.whereType<T>().toList());
106+
_completer.complete([
107+
for (var value in _values)
108+
if (value != _canceledResult && value is T) value
109+
]);
92110
}).catchError((Object error, StackTrace stackTrace) {
93111
if (_completer.isCompleted) return null;
94112
_completer.completeError(error, stackTrace);

pkgs/async/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: async
2-
version: 2.13.1-wip
2+
version: 2.14.0-wip
33
description: Utility functions and classes related to the 'dart:async' library.
44
repository: https://github.com/dart-lang/core/tree/main/pkgs/async
55
issue_tracker: https://github.com/dart-lang/core/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aasync

pkgs/async/test/future_group_test.dart

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import 'dart:async';
66

7+
import 'package:async/src/cancelable_operation.dart';
78
import 'package:async/src/future_group.dart';
89
import 'package:test/test.dart';
910

@@ -92,6 +93,22 @@ void main() {
9293
expect(completed, isTrue);
9394
});
9495

96+
test('a canceled operation doesn\'t block completion', () {
97+
var completer1 = Completer<int>();
98+
var completer2 = CancelableCompleter<int>();
99+
var completer3 = Completer<int>();
100+
101+
futureGroup.add(completer1.future);
102+
futureGroup.addCancelable(completer2.operation);
103+
futureGroup.add(completer3.future);
104+
futureGroup.close();
105+
106+
completer3.complete(3);
107+
completer2.operation.cancel();
108+
completer1.complete(1);
109+
expect(futureGroup.future, completion(equals([1, 3])));
110+
});
111+
95112
test('completes to the values of the futures in order of addition', () {
96113
var completer1 = Completer<int>();
97114
var completer2 = Completer<int>();

0 commit comments

Comments
 (0)