Skip to content

Commit 93b5b57

Browse files
committed
Introduce a Stream.parallelForEach for bounded parallelism.
```dart Future<void> Stream<T>.parallelForEach( int maxParallel, FutureOr<void> Function(T item) each, { FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error, }) ``` Call [each] for each item in this stream with [maxParallel] invocations. This method will invoke [each] for each item in this stream, and wait for all futures from [each] to be resolved. [parallelForEach] will call [each] in parallel, but never more then [maxParallel]. If [each] throws and [onError] rethrows (default behavior), then [parallelForEach] will wait for ongoing [each] invocations to finish, before throw the first error. If [onError] does not throw, then iteration will not be interrupted and errors from [each] will be ignored. ```dart // Count size of all files in the current folder var folderSize = 0; // Use parallelForEach to read at-most 5 files at the same time. await Directory.current.list().parallelForEach(5, (item) async { if (item is File) { final bytes = await item.readAsBytes(); folderSize += bytes.length; } }); print('Folder size: $folderSize'); ```
1 parent 4229317 commit 93b5b57

File tree

2 files changed

+400
-0
lines changed

2 files changed

+400
-0
lines changed
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
// TODO: This library is a decent proposal for addition to `dart:async` or
6+
// other similar utility package. It's extremely useful when processing
7+
// a stream of objects, where I/O is required for each object.
8+
9+
import 'dart:async';
10+
11+
/// A [Notifier] allows micro-tasks to [wait] for other micro-tasks to
12+
/// [notify].
13+
///
14+
/// [Notifier] is a concurrency primitive that allows one micro-task to
15+
/// wait for notification from another micro-task. The [Future] return from
16+
/// [wait] will be completed the next time [notify] is called.
17+
///
18+
/// ```dart
19+
/// var weather = 'rain';
20+
/// final notifier = Notifier();
21+
///
22+
/// // Create a micro task to fetch the weather
23+
/// scheduleMicrotask(() async {
24+
/// // Infinitely loop that just keeps the weather up-to-date
25+
/// while (true) {
26+
/// weather = await getWeather();
27+
/// notifier.notify();
28+
///
29+
/// // Sleep 5s before updating the weather again
30+
/// await Future.delayed(Duration(seconds: 5));
31+
/// }
32+
/// });
33+
///
34+
/// // Wait for sunny weather
35+
/// while (weather != 'sunny') {
36+
/// await notifier.wait;
37+
/// }
38+
/// ```
39+
final class Notifier {
40+
var _completer = Completer<void>();
41+
42+
/// Notify everybody waiting for notification.
43+
///
44+
/// This will complete all futures previously returned by [wait].
45+
/// Calls to [wait] after this call, will not be resolved, until the
46+
/// next time [notify] is called.
47+
void notify() {
48+
if (!_completer.isCompleted) {
49+
_completer.complete();
50+
}
51+
}
52+
53+
/// Wait for notification.
54+
///
55+
/// Returns a [Future] that will complete the next time [notify] is called.
56+
///
57+
/// The [Future] returned will always be unresolved, and it will never throw.
58+
/// Once [notify] is called the future will be completed, and any new calls
59+
/// to [wait] will return a new future. This new future will also be
60+
/// unresolved, until [notify] is called.
61+
Future<void> get wait {
62+
if (_completer.isCompleted) {
63+
_completer = Completer();
64+
}
65+
return _completer.future;
66+
}
67+
}
68+
69+
/// Utility extensions on [Stream].
70+
extension StreamExtensions<T> on Stream<T> {
71+
/// Creates a stream whose elements are contiguous slices of `this`.
72+
///
73+
/// Each slice is [length] elements long, except for the last one which may be
74+
/// shorter if `this` emits too few elements. Each slice begins after the
75+
/// last one ends.
76+
///
77+
/// For example, `Stream.fromIterable([1, 2, 3, 4, 5]).slices(2)` emits
78+
/// `([1, 2], [3, 4], [5])`.
79+
///
80+
/// Errors are forwarded to the result stream immediately when they occur,
81+
/// even if previous data events have not been emitted because the next slice
82+
/// is not complete yet.
83+
Stream<List<T>> slices(int length) {
84+
if (length < 1) throw RangeError.range(length, 1, null, 'length');
85+
86+
var slice = <T>[];
87+
return transform(StreamTransformer.fromHandlers(handleData: (data, sink) {
88+
slice.add(data);
89+
if (slice.length == length) {
90+
sink.add(slice);
91+
slice = [];
92+
}
93+
}, handleDone: (sink) {
94+
if (slice.isNotEmpty) sink.add(slice);
95+
sink.close();
96+
}));
97+
}
98+
99+
/// A future which completes with the first event of this stream, or with
100+
/// `null`.
101+
///
102+
/// This stream is listened to, and if it emits any event, whether a data
103+
/// event or an error event, the future completes with the same data value or
104+
/// error. If the stream ends without emitting any events, the future is
105+
/// completed with `null`.
106+
Future<T?> get firstOrNull {
107+
final completer = Completer<T?>.sync();
108+
final subscription = listen(null,
109+
onError: completer.completeError,
110+
onDone: completer.complete,
111+
cancelOnError: true);
112+
subscription.onData((event) {
113+
subscription.cancel().whenComplete(() {
114+
completer.complete(event);
115+
});
116+
});
117+
return completer.future;
118+
}
119+
120+
/// Eagerly listens to this stream and buffers events until needed.
121+
///
122+
/// The returned stream will emit the same events as this stream, starting
123+
/// from when this method is called. The events are delayed until the returned
124+
/// stream is listened to, at which point all buffered events will be emitted
125+
/// in order, and then further events from this stream will be emitted as they
126+
/// arrive.
127+
///
128+
/// The buffer will retain all events until the returned stream is listened
129+
/// to, so if the stream can emit arbitrary amounts of data, callers should be
130+
/// careful to listen to the stream eventually or call
131+
/// `stream.listen(null).cancel()` to discard the buffered data if it becomes
132+
/// clear that the data isn't not needed.
133+
Stream<T> listenAndBuffer() {
134+
final controller = StreamController<T>(sync: true);
135+
final subscription = listen(controller.add,
136+
onError: controller.addError, onDone: controller.close);
137+
controller
138+
..onPause = subscription.pause
139+
..onResume = subscription.resume
140+
..onCancel = subscription.cancel;
141+
return controller.stream;
142+
}
143+
144+
/// Call [each] for each item in this stream with [maxParallel] invocations.
145+
///
146+
/// This method will invoke [each] for each item in this stream, and wait for
147+
/// all futures from [each] to be resolved. [parallelForEach] will call [each]
148+
/// in parallel, but never more then [maxParallel].
149+
///
150+
/// If [each] throws and [onError] rethrows (default behavior), then
151+
/// [parallelForEach] will wait for ongoing [each] invocations to finish,
152+
/// before throw the first error.
153+
///
154+
/// If [onError] does not throw, then iteration will not be interrupted and
155+
/// errors from [each] will be ignored.
156+
///
157+
/// ```dart
158+
/// // Count size of all files in the current folder
159+
/// var folderSize = 0;
160+
/// // Use parallelForEach to read at-most 5 files at the same time.
161+
/// await Directory.current.list().parallelForEach(5, (item) async {
162+
/// if (item is File) {
163+
/// final bytes = await item.readAsBytes();
164+
/// folderSize += bytes.length;
165+
/// }
166+
/// });
167+
/// print('Folder size: $folderSize');
168+
/// ```
169+
Future<void> parallelForEach(
170+
int maxParallel,
171+
FutureOr<void> Function(T item) each, {
172+
FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error,
173+
}) async {
174+
// Track the first error, so we rethrow when we're done.
175+
Object? firstError;
176+
StackTrace? firstStackTrace;
177+
178+
// Track number of running items.
179+
var running = 0;
180+
final itemDone = Notifier();
181+
182+
try {
183+
var doBreak = false;
184+
await for (final item in this) {
185+
// For each item we increment [running] and call [each]
186+
running += 1;
187+
unawaited(() async {
188+
try {
189+
await each(item);
190+
} catch (e, st) {
191+
try {
192+
// If [onError] doesn't throw, we'll just continue.
193+
await onError(e, st);
194+
} catch (e, st) {
195+
doBreak = true;
196+
if (firstError == null) {
197+
firstError = e;
198+
firstStackTrace = st;
199+
}
200+
}
201+
} finally {
202+
// When [each] is done, we decrement [running] and notify
203+
running -= 1;
204+
itemDone.notify();
205+
}
206+
}());
207+
208+
if (running >= maxParallel) {
209+
await itemDone.wait;
210+
}
211+
if (doBreak) {
212+
break;
213+
}
214+
}
215+
} finally {
216+
// Wait for all items to be finished
217+
while (running > 0) {
218+
await itemDone.wait;
219+
}
220+
}
221+
222+
// If an error happened, then we rethrow the first one.
223+
final firstError_ = firstError;
224+
final firstStackTrace_ = firstStackTrace;
225+
if (firstError_ != null && firstStackTrace_ != null) {
226+
Error.throwWithStackTrace(firstError_, firstStackTrace_);
227+
}
228+
}
229+
}

0 commit comments

Comments
 (0)