Skip to content

Commit 596ff08

Browse files
authored
Introduce a Stream.parallelForEach for bounded parallelism. (#8201)
```dart Future<void> Stream<T>.parallelForEach( int maxParallel, FutureOr<void> Function(T item) each, { FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error, }) ``` Call `each` for each item in this stream with `maxParallel` invocations. This method will invoke `each` for each item in this stream, and wait for all futures from `each` to be resolved. `parallelForEach` will call `each` in parallel, but never more then `maxParallel`. If `each` throws and `onError` rethrows (default behavior), then `parallelForEach` will wait for ongoing `each` invocations to finish, before throw the first error. If `onError` does not throw, then iteration will not be interrupted and errors from `each` will be ignored. ```dart // Count size of all files in the current folder var folderSize = 0; // Use parallelForEach to read at-most 5 files at the same time. await Directory.current.list().parallelForEach(5, (item) async { if (item is File) { final bytes = await item.readAsBytes(); folderSize += bytes.length; } }); print('Folder size: $folderSize');
1 parent 296b70b commit 596ff08

File tree

2 files changed

+327
-0
lines changed

2 files changed

+327
-0
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
// TODO: This library is a decent proposal for addition to `dart:async` or
6+
// other similar utility package. It's extremely useful when processing
7+
// a stream of objects, where I/O is required for each object.
8+
9+
import 'dart:async';
10+
11+
/// A [Notifier] allows micro-tasks to [wait] for other micro-tasks to
12+
/// [notify].
13+
///
14+
/// [Notifier] is a concurrency primitive that allows one micro-task to
15+
/// wait for notification from another micro-task. The [Future] return from
16+
/// [wait] will be completed the next time [notify] is called.
17+
///
18+
/// ```dart
19+
/// var weather = 'rain';
20+
/// final notifier = Notifier();
21+
///
22+
/// // Create a micro task to fetch the weather
23+
/// scheduleMicrotask(() async {
24+
/// // Infinitely loop that just keeps the weather up-to-date
25+
/// while (true) {
26+
/// weather = await getWeather();
27+
/// notifier.notify();
28+
///
29+
/// // Sleep 5s before updating the weather again
30+
/// await Future.delayed(Duration(seconds: 5));
31+
/// }
32+
/// });
33+
///
34+
/// // Wait for sunny weather
35+
/// while (weather != 'sunny') {
36+
/// await notifier.wait;
37+
/// }
38+
/// ```
39+
final class Notifier {
40+
var _completer = Completer<void>();
41+
42+
/// Notify everybody waiting for notification.
43+
///
44+
/// This will complete all futures previously returned by [wait].
45+
/// Calls to [wait] after this call, will not be resolved, until the
46+
/// next time [notify] is called.
47+
void notify() {
48+
if (!_completer.isCompleted) {
49+
_completer.complete();
50+
}
51+
}
52+
53+
/// Wait for notification.
54+
///
55+
/// Returns a [Future] that will complete the next time [notify] is called.
56+
///
57+
/// The [Future] returned will always be unresolved, and it will never throw.
58+
/// Once [notify] is called the future will be completed, and any new calls
59+
/// to [wait] will return a new future. This new future will also be
60+
/// unresolved, until [notify] is called.
61+
Future<void> get wait {
62+
if (_completer.isCompleted) {
63+
_completer = Completer();
64+
}
65+
return _completer.future;
66+
}
67+
}
68+
69+
/// Utility extensions on [Stream].
70+
extension StreamExtensions<T> on Stream<T> {
71+
/// Call [each] for each item in this stream with [maxParallel] invocations.
72+
///
73+
/// This method will invoke [each] for each item in this stream, and wait for
74+
/// all futures from [each] to be resolved. [parallelForEach] will call [each]
75+
/// in parallel, but never more then [maxParallel].
76+
///
77+
/// If [each] throws and [onError] rethrows (default behavior), then
78+
/// [parallelForEach] will wait for ongoing [each] invocations to finish,
79+
/// before throw the first error.
80+
///
81+
/// If [onError] does not throw, then iteration will not be interrupted and
82+
/// errors from [each] will be ignored.
83+
///
84+
/// ```dart
85+
/// // Count size of all files in the current folder
86+
/// var folderSize = 0;
87+
/// // Use parallelForEach to read at-most 5 files at the same time.
88+
/// await Directory.current.list().parallelForEach(5, (item) async {
89+
/// if (item is File) {
90+
/// final bytes = await item.readAsBytes();
91+
/// folderSize += bytes.length;
92+
/// }
93+
/// });
94+
/// print('Folder size: $folderSize');
95+
/// ```
96+
Future<void> parallelForEach(
97+
int maxParallel,
98+
FutureOr<void> Function(T item) each, {
99+
FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error,
100+
}) async {
101+
// Track the first error, so we rethrow when we're done.
102+
Object? firstError;
103+
StackTrace? firstStackTrace;
104+
105+
// Track number of running items.
106+
var running = 0;
107+
final itemDone = Notifier();
108+
109+
try {
110+
var doBreak = false;
111+
await for (final item in this) {
112+
// For each item we increment [running] and call [each]
113+
running += 1;
114+
unawaited(() async {
115+
try {
116+
await each(item);
117+
} catch (e, st) {
118+
try {
119+
// If [onError] doesn't throw, we'll just continue.
120+
await onError(e, st);
121+
} catch (e, st) {
122+
doBreak = true;
123+
if (firstError == null) {
124+
firstError = e;
125+
firstStackTrace = st;
126+
}
127+
}
128+
} finally {
129+
// When [each] is done, we decrement [running] and notify
130+
running -= 1;
131+
itemDone.notify();
132+
}
133+
}());
134+
135+
if (running >= maxParallel) {
136+
await itemDone.wait;
137+
}
138+
if (doBreak) {
139+
break;
140+
}
141+
}
142+
} finally {
143+
// Wait for all items to be finished
144+
while (running > 0) {
145+
await itemDone.wait;
146+
}
147+
}
148+
149+
// If an error happened, then we rethrow the first one.
150+
final firstError_ = firstError;
151+
final firstStackTrace_ = firstStackTrace;
152+
if (firstError_ != null && firstStackTrace_ != null) {
153+
Error.throwWithStackTrace(firstError_, firstStackTrace_);
154+
}
155+
}
156+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
import 'dart:math';
7+
8+
import 'package:pub_dev/shared/parallel_foreach.dart';
9+
import 'package:test/test.dart';
10+
11+
void main() {
12+
group('Notifier', () {
13+
test('Notifier.wait/notify', () async {
14+
final notified = Completer<void>();
15+
16+
final notifier = Notifier();
17+
unawaited(notifier.wait.then((value) => notified.complete()));
18+
expect(notified.isCompleted, isFalse);
19+
20+
notifier.notify();
21+
expect(notified.isCompleted, isFalse);
22+
23+
await notified.future;
24+
expect(notified.isCompleted, isTrue);
25+
});
26+
27+
test('Notifier.wait is never resolved', () async {
28+
var count = 0;
29+
30+
final notifier = Notifier();
31+
unawaited(notifier.wait.then((value) => count++));
32+
expect(count, 0);
33+
34+
await Future.delayed(Duration.zero);
35+
expect(count, 0);
36+
37+
notifier.notify();
38+
expect(count, 0);
39+
40+
await Future.delayed(Duration.zero);
41+
expect(count, 1);
42+
43+
unawaited(notifier.wait.then((value) => count++));
44+
unawaited(notifier.wait.then((value) => count++));
45+
46+
await Future.delayed(Duration.zero);
47+
expect(count, 1);
48+
49+
notifier.notify();
50+
expect(count, 1);
51+
52+
await Future.delayed(Duration.zero);
53+
expect(count, 3);
54+
});
55+
});
56+
57+
group('parallelForEach', () {
58+
test('sum (maxParallel: 1)', () async {
59+
var sum = 0;
60+
await Stream.fromIterable([1, 2, 3]).parallelForEach(1, (item) {
61+
sum += item;
62+
});
63+
expect(sum, 6);
64+
});
65+
66+
test('sum (maxParallel: 2)', () async {
67+
var sum = 0;
68+
var active = 0;
69+
var maxActive = 0;
70+
await Stream.fromIterable([1, 2, 3]).parallelForEach(2, (item) async {
71+
active++;
72+
expect(active, lessThanOrEqualTo(2));
73+
maxActive = max(active, maxActive);
74+
await Future.delayed(Duration(milliseconds: 50));
75+
expect(active, lessThanOrEqualTo(2));
76+
maxActive = max(active, maxActive);
77+
sum += item;
78+
active--;
79+
});
80+
expect(sum, 6);
81+
expect(maxActive, 2);
82+
});
83+
84+
test('abort when error is thrown (maxParallel: 1)', () async {
85+
var sum = 0;
86+
await expectLater(
87+
Stream.fromIterable([1, 2, 3]).parallelForEach(1, (item) async {
88+
sum += item;
89+
if (sum > 2) {
90+
throw Exception('abort');
91+
}
92+
}),
93+
throwsException,
94+
);
95+
expect(sum, 3);
96+
});
97+
98+
test('abort will not comsume the entire stream', () async {
99+
var countedTo = 0;
100+
Stream<int> countToN(int N) async* {
101+
for (var i = 1; i <= N; i++) {
102+
await Future.delayed(Duration.zero);
103+
yield i;
104+
countedTo = i;
105+
}
106+
}
107+
108+
var sum = 0;
109+
await countToN(20).parallelForEach(2, (item) async {
110+
sum += item;
111+
});
112+
expect(sum, greaterThan(20));
113+
expect(countedTo, 20);
114+
115+
countedTo = 0;
116+
await expectLater(
117+
countToN(20).parallelForEach(2, (item) async {
118+
if (item > 10) throw Exception('abort');
119+
}),
120+
throwsException,
121+
);
122+
expect(countedTo, greaterThanOrEqualTo(10));
123+
expect(countedTo, lessThan(20));
124+
});
125+
126+
test('onError can ignore errors', () async {
127+
var countedTo = 0;
128+
Stream<int> countToN(int N) async* {
129+
for (var i = 1; i <= N; i++) {
130+
await Future.delayed(Duration.zero);
131+
yield i;
132+
countedTo = i;
133+
}
134+
}
135+
136+
var sum = 0;
137+
await countToN(20).parallelForEach(2, (item) async {
138+
sum += item;
139+
if (sum > 10) {
140+
throw Exception('ignore this');
141+
}
142+
}, onError: (_, __) => null);
143+
expect(sum, greaterThan(20));
144+
expect(countedTo, 20);
145+
146+
countedTo = 0;
147+
await expectLater(
148+
countToN(20).parallelForEach(
149+
2,
150+
(item) async {
151+
sum += item;
152+
if (countedTo > 15) {
153+
throw Exception('break');
154+
}
155+
if (countedTo > 10) {
156+
throw Exception('ignore this');
157+
}
158+
},
159+
onError: (e, st) {
160+
if (e.toString().contains('break')) {
161+
throw e as Exception;
162+
}
163+
},
164+
),
165+
throwsException,
166+
);
167+
expect(countedTo, greaterThanOrEqualTo(10));
168+
expect(countedTo, lessThan(20));
169+
});
170+
});
171+
}

0 commit comments

Comments
 (0)