-
Notifications
You must be signed in to change notification settings - Fork 66
Expand file tree
/
Copy pathlimit.dart
More file actions
166 lines (151 loc) · 4.76 KB
/
limit.dart
File metadata and controls
166 lines (151 loc) · 4.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import 'dart:async';
/// A semaphore-like mechanism to limit the number of concurrent operations.
///
/// This class allows you to limit how many operations can run in parallel,
/// queuing additional operations until a slot becomes available.
///
/// Example:
/// ```dart
/// final limit = ParallelLimit(maxConcurrency: 10);
///
/// // Option 1: Manual acquire/release
/// await limit.acquire();
/// try {
/// await doWork();
/// } finally {
/// limit.release();
/// }
///
/// // Option 2: Using run() helper
/// await limit.run(() => doWork());
/// ```
class ParallelLimit {
final int maxConcurrency;
int _activeCount = 0;
final _waitingQueue = <Completer<void>>[];
/// Creates a [ParallelLimit] that allows at most [maxConcurrency] operations
/// to run concurrently.
ParallelLimit({required this.maxConcurrency})
: assert(maxConcurrency > 0, 'maxConcurrency must be greater than 0');
/// Acquires a permit to run an operation.
///
/// If the limit has not been reached, this returns immediately.
/// Otherwise, it waits until a slot becomes available.
Future<void> acquire() async {
if (_activeCount < maxConcurrency) {
_activeCount++;
return;
}
final completer = Completer<void>();
_waitingQueue.add(completer);
await completer.future;
}
/// Releases a permit, allowing the next waiting operation to proceed.
void release() {
assert(_activeCount > 0, 'release() called without matching acquire()');
_activeCount--;
if (_waitingQueue.isNotEmpty) {
final next = _waitingQueue.removeAt(0);
_activeCount++;
next.complete();
}
}
/// Runs [operation] with a permit, automatically acquiring and releasing.
///
/// This is a convenience method that ensures [release] is always called,
/// even if [operation] throws an exception.
Future<T> run<T>(Future<T> Function() operation) async {
await acquire();
try {
return await operation();
} finally {
release();
}
}
/// The current number of active operations.
int get activeCount => _activeCount;
/// The number of operations waiting for a permit.
int get waitingCount => _waitingQueue.length;
}
/// Manages a bounded queue of futures with automatic stream subscription control.
///
/// When the queue reaches [maxSize], the associated stream subscription is paused,
/// all pending futures are awaited, and then the subscription is resumed.
///
/// Example:
/// ```dart
/// final queue = BoundedQueue<void>(
/// maxSize: 1000,
/// subscription: streamSubscription,
/// onError: (error) {
/// // Handle queue overflow errors
/// },
/// );
///
/// // Add futures to the queue
/// queue.add(someFuture);
///
/// // Manually wait for queue to drain if needed
/// await queue.waitIfNeeded();
///
/// // Wait for all remaining futures
/// await queue.waitForAll();
/// ```
class BoundedQueue<T> {
final int maxSize;
StreamSubscription? subscription;
final void Function(Object error)? onError;
final List<Future<T>> _futures = [];
var _isWaiting = false;
/// Creates a [BoundedQueue] with the specified [maxSize].
///
/// When the queue reaches [maxSize], [subscription] will be paused while
/// waiting for futures to complete. If [onError] is provided, it will be
/// called if an error occurs while waiting for futures.
BoundedQueue({required this.maxSize, this.subscription, this.onError})
: assert(maxSize > 0, 'maxSize must be greater than 0');
/// Adds a future to the queue.
void add(Future<T> future) {
_futures.add(future);
}
/// The current number of futures in the queue.
int get length => _futures.length;
/// Whether the queue is currently waiting for futures to complete.
bool get isWaiting => _isWaiting;
/// Waits for the queue to drain if it has reached [maxSize].
///
/// This will pause [subscription] (if provided), wait for all futures to
/// complete, clear the queue, and then resume the subscription.
///
/// If an error occurs while waiting, [onError] will be called (if provided).
Future<void> waitIfNeeded() async {
if (_futures.length >= maxSize && !_isWaiting) {
_isWaiting = true;
subscription?.pause();
try {
await Future.wait(_futures);
_futures.clear();
} catch (e) {
onError?.call(e);
rethrow;
} finally {
_isWaiting = false;
subscription?.resume();
}
}
}
/// Waits for all remaining futures in the queue to complete.
///
/// This does not pause/resume the subscription - it simply waits for
/// all pending futures.
Future<void> waitForAll() async {
if (_futures.isNotEmpty) {
await Future.wait(_futures);
_futures.clear();
}
}
/// Clears all futures from the queue without waiting for them.
void clear() {
_futures.clear();
}
}