|
| 1 | +# Parallel Linq |
| 2 | + |
| 3 | +*Parallel Linq* is an extension to process your query in a parallel manner and Linq style. |
| 4 | +Just like `Parallel` class, *Parallel Linq* is based on `Task` and are all blocking operations too. |
| 5 | + |
| 6 | +- `ParallelEnumerable.AsParallel`: to convert a enumerable to a `ParallelQuery<T>` |
| 7 | +- Parallel counterpart of linq operators such as `Select`, `Sum`... |
| 8 | +- `ParallelEnumerable.AsSequential`: to convert a `ParallelQuery<T>` back to normal `IEnumerable<T>` |
| 9 | +- `ParallelEnumerable.WithCancellation`: pass cancellation token |
| 10 | +- `ParallelEnumerable.WithMergeOptions`: control how to buffer the scheduled iterations |
| 11 | + |
| 12 | +## `ParallelQuery` is `IEnumerable` |
| 13 | + |
| 14 | +`ParallelQuery<T>` is a simple wrapper on `IEnumerable<T>`, some of the implementations were overridden in the class itself, the rest were implemented as extension on `ParallelEnumerable`. |
| 15 | +The compiler would choose another version of linq operator from `ParallelEnumerable` extension when the compile-time type is a `ParallelQuery<T>`. |
| 16 | + |
| 17 | +```cs |
| 18 | +public class ParallelQuery<TSource> : ParallelQuery, IEnumerable<TSource> { /* ... */ } |
| 19 | +``` |
| 20 | + |
| 21 | +### `AsParallel` & `AsSequential` & `AsEnumerable` |
| 22 | + |
| 23 | +- `ParallelEnumerable.AsParallel()` is for converting a enumerable to `ParallelQuery` |
| 24 | +- `ParallelEnumerable.AsSequential()` is an extension dedicated for `ParallelQuery<T>` |
| 25 | + - does not change runtime type of source |
| 26 | + - but would notify the compiler to force the source to pick general implementations from `Enumerable` extension |
| 27 | + - all subsequent operations would became sequential which is not in parallel |
| 28 | +- `ParallelEnumerable.AsEnumerable()` is a common extension on `IEnumerable<T>`, however `ParallelEnumerable.AsEnumerable` exists to unwrap the backing enumerable when working with `ParallelQuery<T>` |
| 29 | + - identical to `ParallelEnumerable.AsSequential()` |
| 30 | + |
| 31 | +> [!NOTE] |
| 32 | +> They're all deferred execution |
| 33 | +
|
| 34 | +## Preserve Ordering |
| 35 | + |
| 36 | +`AsOrdered` makes sure the subsequent operation in parallel would preseve order of the original enumerable. |
| 37 | + |
| 38 | +```cs |
| 39 | +var seq = Enumerable.Range(1, 100); |
| 40 | +var ordered = Enumerable.Range(1, 100) |
| 41 | + .AsParallel() |
| 42 | + .AsOrdered() // [!code highlight] |
| 43 | + .Select(x => x); |
| 44 | + |
| 45 | +Console.WriteLine(seq.SequenceEqual(ordered)); // always true // [!code highlight] |
| 46 | +``` |
| 47 | + |
| 48 | +However, preseving ordering would be consuming anyway, so you could disable it when ordering does not matters anymore using `ParallelEnumerable.AsUnordered` |
| 49 | + |
| 50 | +```cs |
| 51 | +var ordered = Enumerable.Range(1, 100) |
| 52 | + .AsParallel() |
| 53 | + .AsOrdered() // [!code highlight] |
| 54 | + .Select(x => x) |
| 55 | + .AsUnordered(); // cancel the ordering preservation // [!code highlight] |
| 56 | +``` |
| 57 | + |
| 58 | +> [!NOTE] |
| 59 | +> See [Order Preservation](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/order-preservation-in-plinq#query-operators-and-ordering) |
| 60 | +
|
| 61 | +## Cancellation & Exception |
| 62 | + |
| 63 | +It's basically a functional version of parallel loop, so exception handling and cancellation is just as the same as `Parallel` class. |
| 64 | + |
| 65 | +- use `ParallelEnumerable.WithCancellation` to specify a cancellation token |
| 66 | +- cancellation is special so please catch it as `OperationCanceledException` |
| 67 | +- other exceptions from all iterations are just wrapped in `AggregateException` |
| 68 | +- **remember to evaluate the query** otherwise such cancellation or exception would never be triggered |
| 69 | + |
| 70 | +```cs |
| 71 | +var parallelSeq = ParallelEnumerable.Range(1, 100); |
| 72 | +var cts = new CancellationTokenSource(2000); |
| 73 | + |
| 74 | +var query = parallelSeq |
| 75 | + .WithCancellation(cts.Token) |
| 76 | + .Select(x => { |
| 77 | + Thread.Sleep(2000); |
| 78 | + cts.Token.ThrowIfCancellationRequested(); // [!code highlight] |
| 79 | + if (int.IsOddInteger(x)) throw new Exception("a normal exception was thrown"); |
| 80 | + return x * x; |
| 81 | + }); |
| 82 | + |
| 83 | +try { |
| 84 | + // you must consume query // [!code highlight] |
| 85 | + _ = query.ToList(); // [!code highlight] |
| 86 | +} catch (AggregateException ex) { |
| 87 | + ex.Handle(iex => { |
| 88 | + switch (iex) { |
| 89 | + case Exception: |
| 90 | + Console.WriteLine(ex.Message); |
| 91 | + return true; |
| 92 | + default: |
| 93 | + return false; |
| 94 | + } |
| 95 | + }); |
| 96 | +} catch (OperationCanceledException) { |
| 97 | + Console.WriteLine($"{nameof(OperationCanceledException)} was thrown"); |
| 98 | +} |
| 99 | +``` |
| 100 | + |
| 101 | +## Merge Option |
| 102 | + |
| 103 | +Parallel iterations were scheduled as groups, so buffering is enabled by default and the size of group is dependent on the system. |
| 104 | + |
| 105 | +- `ParallelMergeOptions.AutoBuffered` |
| 106 | +- `ParallelMergeOptions.Default`: alias to `AutoBuffered` |
| 107 | +- `ParallelMergeOptions.FullyBuffered`: source not available until all iteration were finished |
| 108 | +- `ParallelMergeOptions.NotBuffered`: yield item immediately whenever available |
| 109 | + |
| 110 | +:::code-group |
| 111 | + |
| 112 | +```cs[FullyBuffered] |
| 113 | +var query = ParallelEnumerable.Range(1, 100); |
| 114 | + .WithMergeOptions(ParallelMergeOptions.FullyBuffered) // [!code highlight] |
| 115 | + .Select(x => { |
| 116 | + Thread.Sleep(Random.Shared.Next(100)); |
| 117 | + Console.WriteLine("produced"); |
| 118 | + return x; |
| 119 | + }); |
| 120 | +
|
| 121 | +foreach (var _ in query) { |
| 122 | + Console.WriteLine("consumed"); |
| 123 | +} |
| 124 | +
|
| 125 | +// consume only happens after all were produced when FullyBuffered is specified |
| 126 | +// produced |
| 127 | +// produced |
| 128 | +// produced |
| 129 | +// ... |
| 130 | +// consumed |
| 131 | +// consumed |
| 132 | +// consumed |
| 133 | +``` |
| 134 | + |
| 135 | +```cs[NotBuffered] |
| 136 | +var query = ParallelEnumerable.Range(1, 100); |
| 137 | + .WithMergeOptions(ParallelMergeOptions.NotBuffered) // [!code highlight] |
| 138 | + .Select(x => { |
| 139 | + Thread.Sleep(Random.Shared.Next(100)); |
| 140 | + Console.WriteLine("produced"); |
| 141 | + return x; |
| 142 | + }); |
| 143 | +
|
| 144 | +foreach (var _ in query) { |
| 145 | + Console.WriteLine("consumed"); |
| 146 | +} |
| 147 | +
|
| 148 | +// consuming happens as long as one was available |
| 149 | +// produced |
| 150 | +// consumed |
| 151 | +// produced |
| 152 | +// consumed |
| 153 | +// produced |
| 154 | +// ... |
| 155 | +// consumed |
| 156 | +``` |
| 157 | + |
| 158 | +## Performance Enhancement |
| 159 | + |
| 160 | +### Local Storage |
| 161 | + |
| 162 | +`ParallelEnumerable.Aggregate` is exactly the role the perform local storage, the following example uses one of its most flexible overload. |
| 163 | + |
| 164 | +```cs |
| 165 | +var size = Directory.EnumerateFiles(@"c:/Users//User/Projects/nix-config", "*", SearchOption.AllDirectories) |
| 166 | + .AsParallel() |
| 167 | + .Aggregate( // [!code highlight] |
| 168 | + seed: 0L, // [!code highlight] |
| 169 | + updateAccumulatorFunc: (localSum, curr) => localSum + new FileInfo(curr).Length, // iteration // [!code highlight] |
| 170 | + combineAccumulatorsFunc: (sum, localSum) => sum + localSum, // add up when each group was finished // [!code highlight] |
| 171 | + resultSelector: i => i / 1024D // post action to transform the result // [!code highlight] |
| 172 | + ); // [!code highlight] |
| 173 | +
|
| 174 | +Console.WriteLine($"size in kb: {size}"); // [!code highlight] |
| 175 | +``` |
0 commit comments