Skip to content

Commit 834eb27

Browse files
committed
main
1 parent fb9f61a commit 834eb27

File tree

2 files changed

+274
-1
lines changed

2 files changed

+274
-1
lines changed
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
# Parallel Loop
2+
3+
`Parallel` static class provides utilities based on `Task` to perform parallel enumerations, all parallel operation are shipped with a `Task`
4+
5+
- `Parallel.For`: range based parallel enumerations, an simulation of `for` statement
6+
- `Parallel.ForEach`: parallel enumerations for `IEnumerable` and `IAsyncEnumerable`
7+
- async counterpart of `For` and `ForEach`
8+
- optionally run with a `ParallelOptions`: to specify cancellation token, paralleism degree and task scheduler.
9+
- access state of entire loop by a `ParallelLoopState` parameter in callback.
10+
11+
Additionally an `Invoke` exists to run action in parallel.
12+
13+
- `Parallel.Invoke`: invoke multiple actions in parallel
14+
15+
> [!NOTE]
16+
> Each non-async method from `Parallel` are blocking operations that would block the thread until all tasks were terminated.
17+
18+
## For
19+
20+
```cs
21+
var files = Directory.GetFiles(@"C:/Users/User/Projects/nix-config", "*", SearchOption.AllDirectories);
22+
23+
long totalSize = 0;
24+
25+
Parallel.For(0, files.Length, idx => {
26+
FileInfo info = new(files[idx]);
27+
Interlocked.Add(ref totalSize, info.Length); // [!code highlight]
28+
});
29+
30+
Console.WriteLine(totalSize);
31+
```
32+
33+
## ForEach
34+
35+
```cs
36+
string[] files = Directory.GetFiles(@"~/projects/", "*", SearchOption.AllDirectories);
37+
38+
long totalSize = 0;
39+
40+
Parallel.ForEach(files, f => {
41+
FileInfo info = new(f);
42+
Interlocked.Add(ref totalSize, info.Length); // [!code highlight]
43+
});
44+
45+
Console.WriteLine(totalSize);
46+
```
47+
48+
### Enumerate by Step
49+
50+
`Parallel.For` does not provide an overload to skip a count on each iteration. But it could be achieved by using a iterator method.
51+
52+
```cs
53+
int[] numbers = [.. Enumerable.Range(1, 10)];
54+
55+
Parallel.ForEach(Range(1,numbers.Length , 2), idx => {
56+
_ = numbers[idx]; // [!code highlight]
57+
});
58+
59+
static IEnumerable<int> Range(int start, int end, int step) { // [!code highlight]
60+
for (int i = start; i < end; i += step) { // [!code highlight]
61+
yield return i; // [!code highlight]
62+
} // [!code highlight]
63+
} // [!code highlight]
64+
```
65+
66+
## Break Parallel Loop
67+
68+
Parallel loop methods provides overloads supports extra parameter typed as `ParallelLoopState` for the callback to describe the state of the iterations.
69+
The state could control the termination of iterations, but in a different manner since they're parallel.
70+
Each iteration would start when the scheduler has enough places to activate the tasks, the remaining would still have to wait.
71+
72+
- `ParallelLoopState.Stop()`:
73+
- Any iteration that hasn't started yet will not be scheduled.
74+
- Any iteration that is already running will continue to completion.
75+
- **Does not terminate current thread**
76+
77+
- `ParallelLoopState.Break()`:
78+
- Any iteration that hasn't started yet(*except the ones with index less than current index*) will not be scheduled.
79+
- Any iteration that is already running will continue to completion.
80+
- **Does not terminate current thread**
81+
82+
```cs
83+
Parallel.ForEach(
84+
Enumerable.Range(1, 2_000_000), // would all that many iterations be started? // [!code highlight]
85+
(n, state) => {
86+
Console.WriteLine(n);
87+
// let's break on a condition that would hit for real quick
88+
// so you would see only few iterations were started
89+
if (int.IsOddInteger(n)) { // [!code highlight]
90+
state.Stop(); // [!code highlight]
91+
} // [!code highlight]
92+
}
93+
);
94+
```
95+
96+
It's hard to exemplify what `Break` does in a concurrent context.
97+
98+
```cs
99+
Parallel.ForEach(
100+
Enumerable.Range(1, 2_000_000), // would all that many iterations be started? // [!code highlight]
101+
(n, state) => {
102+
// let's break on a condition that would hit for real quick
103+
// so you would see only few iterations were started
104+
if (n == 123) {
105+
state.Break(); // [!code highlight]
106+
}
107+
108+
Console.WriteLine(n); // would still prints 123 after Break() // [!code highlight]
109+
}
110+
);
111+
```
112+
113+
You could examine that the `Break` does not terminate the current thread by
114+
115+
```ps1
116+
dotnet run | sls \b123\b
117+
```
118+
119+
> [!NOTE]
120+
> `ShouldExitCurrentIteration` would be true after `Stop()` or `Break()` or any exception was thrown.
121+
122+
> [!TIP]
123+
> Additionally you could use `IsStopped` and `IsExceptional` to coordinate in other running iterations when `Stop()` was called or any exception was thrown from any iteration.
124+
125+
## Exception Handling
126+
127+
Any exception from any iteration would break all other iterations not started yet, and terminate the loop **as soon as all currently running iterations finish.**
128+
129+
Since `Parallel` utils are synchronous and blocking, `AggregateException` could be caught from it. Each iteration could possibly push exceptions to `AggregateException.InnerExceptions`.
130+
131+
```cs
132+
try {
133+
Parallel.For(1, 10_000_000, (n, state) => {
134+
Console.WriteLine(n);
135+
136+
if (int.IsOddInteger(n))
137+
throw new Exception(); // multiple thread would throw this
138+
});
139+
} catch (AggregateException ex) {
140+
ex.Handle(iex => {
141+
Console.WriteLine(iex.Message); // write this for multiple times for thrown from multiple threads
142+
return true;
143+
});
144+
}
145+
146+
// 9166664
147+
// 833334
148+
// 9999997
149+
// 4166666
150+
// Exception of type 'System.Exception' was thrown.
151+
// Exception of type 'System.Exception' was thrown.
152+
// Exception of type 'System.Exception' was thrown.
153+
// Exception of type 'System.Exception' was thrown.
154+
// Exception of type 'System.Exception' was thrown.
155+
```
156+
157+
### Cancellation is Unique
158+
159+
Cancellation on a parallel loop is unique because it is dedicatedly to cancel the entire loop, not specific running thread.
160+
And the cancellation should only be triggered as if for once and **terminate all iterations not matter they're running or not**.
161+
So expectation made the runtime to propagate `OperationCancelledException` thrown by `token.ThrowIfCancellationRequested` **directly** instead of wrapping it inside a `AggregateException` when the **cancellation is succeeded**.
162+
163+
> [!NOTE]
164+
> Only a succeeded cancellation would propagate `OperationCanceledException` directly, or it would be wrapped inside `AggregateException`.
165+
166+
```cs
167+
CancellationTokenSource cts = new(millisecondsDelay: 2000);
168+
169+
try {
170+
Parallel.For(
171+
0,
172+
10,
173+
new ParallelOptions() { CancellationToken = cts.Token },
174+
_ => { // [!code highlight]
175+
while (true) // [!code highlight]
176+
cts.Token.ThrowIfCancellationRequested(); // [!code highlight]
177+
}
178+
); // [!code highlight]
179+
} catch (AggregateException ex) {
180+
ex.Handle(iex => {
181+
if (iex is OperationCanceledException) {
182+
// not reachable
183+
Console.WriteLine($"{nameof(OperationCanceledException)} was caught by {nameof(AggregateException)}");
184+
return true;
185+
}
186+
return false;
187+
});
188+
} catch (OperationCanceledException) { // [!code highlight]
189+
// would hit here since cancellation should be succeeded // [!code highlight]
190+
Console.WriteLine($"{nameof(OperationCanceledException)} was propagated directly"); // [!code highlight]
191+
} // [!code highlight]
192+
```
193+
194+
## Performance Enhancement
195+
196+
### Thread-Local Storage
197+
198+
If one could calculate partially on **each worker thread**(the thread manages a batch of iterations), and finally add up all partial results to the target variable, it could be much more efficient than contenting one single variable from threads.
199+
Such approach is call **Thread-Local Storage**, a dedicated storage target for each worker thread.
200+
The design is pretty similar to `Enumerable.Aggregate` that folds calculation base on a given initial value on each iteration.
201+
202+
```cs
203+
string[] files = Directory.GetFiles(@"C:/Users/User/Projects/nix-config", "*", SearchOption.AllDirectories);
204+
205+
long size = 0L;
206+
// calculate file size using thread local storage
207+
// to be more efficient
208+
Parallel.ForEach(
209+
source: files,
210+
localInit: () => 0L, // initial value for the thread local storage // [!code highlight]
211+
body: (f, state, sum) => { // just like a Aggregate but with extra state // [!code highlight]
212+
return sum + new FileInfo(f).Length; // [!code highlight]
213+
}, // [!code highlight]
214+
// add up to target variable when all iterations of a worker thread were finished
215+
localFinally: sum => Interlocked.Add(ref size, sum) // [!code highlight]
216+
);
217+
218+
Console.WriteLine(size);
219+
```
220+
221+
### Partitioning
222+
223+
Partitioning is a trade-off solution when **invoking callback delegates in parallel loop is way too expensive** and **the operation within the delegate body is relatively fast enough**.
224+
So one can partition items from source with specified count into **ranges** and process each range **within a same thread**(because each operation is fast enough), so this reduces the cost of involing delegate callback by reducing the thread count started by the loop.
225+
226+
> [!NOTE]
227+
>`Partitioner` requires collections **with indexer** to work with, it's the only way to represent a range.
228+
229+
```cs
230+
// calculating sum of a large array is a good example for partitioning
231+
// for it has simple operation on adding up
232+
// and to avoid callback on each iteration
233+
// optionally you could avoid resource contention by Thread-Local storage
234+
235+
int[] source = Enumerable.Range(1, 1000 * 1000).ToArray();
236+
237+
var partition = Partitioner.Create(0, source.Length); // auto slice ranges from source // [!code highlight]
238+
239+
long sumOfArray = 0L;
240+
241+
Parallel.ForEach(
242+
partition, // iterate on ranges instead // [!code highlight]
243+
() => 0L,
244+
(range, _, sum) => {
245+
var (start, end) = range; // unpack the tuple // [!code highlight]
246+
for (int i = start; i < end; i++) {
247+
sum = checked(sum + source[i]);
248+
}
249+
return sum;
250+
},
251+
sum => Interlocked.Add(ref sumOfArray, sum)
252+
);
253+
254+
Console.WriteLine(sumOfArray);
255+
256+
// you can direct sum this using linq // [!code error]
257+
// because it returns int which might overflow for such a large // [!code error]
258+
Console.WriteLine(source.Sum() is int); // System.OverflowException // [!code error]
259+
```
260+
261+
## Invoke
262+
263+
`Parallel.Invoke` is not really a loop, but I can't find a appropriate place to introduce it.
264+
It simply run multiple actions in a parallel manner as an blocking operation, no async counterpart exist.
265+
266+
```cs
267+
// blocking operation
268+
Parallel.Invoke(
269+
() => Console.WriteLine(1),
270+
() => Console.WriteLine(2),
271+
() => Console.WriteLine(3),
272+
() => Console.WriteLine(4)
273+
); // order is not guaranteed
274+
```

docs/document/Modern CSharp/docs/Parallel Programming/Synchronization/5.Thread Coordination Primitives.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ int shared = 0;
144144
145145
var tasks = Enumerable.Range(1, 100).Select(n => {
146146
return Task.Run(() => {
147-
148147
semaphore.Wait(); // would block when count is 0 // [!code highlight]
149148
150149
Thread.Sleep(1000);

0 commit comments

Comments
 (0)