|
| 1 | +# Open.ChannelExtensions |
| 2 | + |
| 3 | +[](https://www.nuget.org/packages/Open.ChannelExtensions/) |
| 4 | + |
| 5 | +A set of extensions for optimizing/simplifying System.Threading.Channels usage. |
| 6 | + |
| 7 | +[Click here for detailed documentation.](https://open-net-libraries.github.io/Open.ChannelExtensions/api/Open.ChannelExtensions.Extensions.html#methods) |
| 8 | + |
| 9 | +## Highlights |
| 10 | + |
| 11 | +### Read & Write |
| 12 | + |
| 13 | +*With optional concurrency levels.* |
| 14 | + |
| 15 | +* Reading all entries in a channel. |
| 16 | +* Writing all entries from a source to a channel. |
| 17 | +* Piping (consuming) all entries to a buffer (channel). |
| 18 | +* `.AsAsyncEnumerable()` (`IAsyncEnumerable`) support for .NET Standard 2.1+ and .NET Core 3+ |
| 19 | + |
| 20 | +### Special `ChannelReader` Operations |
| 21 | + |
| 22 | +* `Filter` |
| 23 | +* `Transform` |
| 24 | +* `Batch` |
| 25 | +* `Join` |
| 26 | + |
| 27 | +--- |
| 28 | +## Installation |
| 29 | + |
| 30 | +```nuget |
| 31 | +Install-Package Open.ChannelExtensions |
| 32 | +``` |
| 33 | +--- |
| 34 | + |
| 35 | +## Examples |
| 36 | + |
| 37 | +Being able to define an asynchronous pipeline with best practice usage using simple expressive syntax: |
| 38 | + |
| 39 | +```cs |
| 40 | +await Channel |
| 41 | + .CreateBounded<T>(10) |
| 42 | + .SourceAsync(source /* IEnumerable<Task<T>> */) |
| 43 | + .PipeAsync( |
| 44 | + maxConcurrency: 2, |
| 45 | + capacity: 5, |
| 46 | + transform: asyncTransform01) |
| 47 | + .Pipe(transform02, /* capacity */ 3) |
| 48 | + .ReadAllAsync(finalTransformedValue => { |
| 49 | + // Do something async with each final value. |
| 50 | + }); |
| 51 | +``` |
| 52 | + |
| 53 | +```cs |
| 54 | +await source /* IEnumerable<T> */ |
| 55 | + .ToChannel(boundedSize: 10, singleReader: true) |
| 56 | + .PipeAsync(asyncTransform01, /* capacity */ 5) |
| 57 | + .Pipe( |
| 58 | + maxConcurrency: 2, |
| 59 | + capacity: 3, |
| 60 | + transform: transform02) |
| 61 | + .ReadAll(finalTransformedValue => { |
| 62 | + // Do something with each final value. |
| 63 | + }); |
| 64 | +``` |
| 65 | + |
| 66 | +### Reading (until the channel is closed) |
| 67 | + |
| 68 | +#### One by one read each entry from the channel |
| 69 | + |
| 70 | +```cs |
| 71 | +await channel.ReadAll( |
| 72 | + entry => { /* Processing Code */ }); |
| 73 | +``` |
| 74 | + |
| 75 | +```cs |
| 76 | +await channel.ReadAll( |
| 77 | + (entry, index) => { /* Processing Code */ }); |
| 78 | +``` |
| 79 | + |
| 80 | +```cs |
| 81 | +await channel.ReadAllAsync( |
| 82 | + async entry => { await /* Processing Code */ }); |
| 83 | +``` |
| 84 | + |
| 85 | +```cs |
| 86 | +await channel.ReadAllAsync( |
| 87 | + async (entry, index) => { await /* Processing Code */ }); |
| 88 | +``` |
| 89 | + |
| 90 | +#### Read concurrently each entry from the channel |
| 91 | + |
| 92 | +```cs |
| 93 | +await channel.ReadAllConcurrently( |
| 94 | + maxConcurrency, |
| 95 | + entry => { /* Processing Code */ }); |
| 96 | +``` |
| 97 | + |
| 98 | +```cs |
| 99 | +await channel.ReadAllConcurrentlyAsync( |
| 100 | + maxConcurrency, |
| 101 | + async entry => { await /* Processing Code */ }); |
| 102 | +``` |
| 103 | + |
| 104 | +### Writing |
| 105 | + |
| 106 | +If `complete` is `true`, the channel will be closed when the source is empty. |
| 107 | + |
| 108 | +#### Dump a source enumeration into the channel |
| 109 | + |
| 110 | +```cs |
| 111 | +// source can be any IEnumerable<T>. |
| 112 | +await channel.WriteAll(source, complete: true); |
| 113 | +``` |
| 114 | + |
| 115 | +```cs |
| 116 | +// source can be any IEnumerable<Task<T>> or IEnumerable<ValueTask<T>>. |
| 117 | +await channel.WriteAllAsync(source, complete: true); |
| 118 | +``` |
| 119 | + |
| 120 | +#### Synchronize reading from the source and process the results concurrently |
| 121 | + |
| 122 | +```cs |
| 123 | +// source can be any IEnumerable<Task<T>> or IEnumerable<ValueTask<T>>. |
| 124 | +await channel.WriteAllConcurrentlyAsync( |
| 125 | + maxConcurrency, source, complete: true); |
| 126 | +``` |
| 127 | + |
| 128 | +### Filter & Transform |
| 129 | + |
| 130 | +```cs |
| 131 | +// Filter and transform when reading. |
| 132 | +channel.Reader |
| 133 | + .Filter(predicate) // .Where() |
| 134 | + .Transform(selector) // .Select() |
| 135 | + .ReadAllAsync(async value => {/*...*/}); |
| 136 | +``` |
| 137 | + |
| 138 | +### Batching |
| 139 | + |
| 140 | +```cs |
| 141 | +values.Reader |
| 142 | + .Batch(10 /*batch size*/) |
| 143 | + .WithTimeout(1000) // Any non-empty batches are flushed every second. |
| 144 | + .ReadAllAsync(async batch => {/*...*/}); |
| 145 | +``` |
| 146 | + |
| 147 | +### Joining |
| 148 | + |
| 149 | +```cs |
| 150 | +batches.Reader |
| 151 | + .Join() |
| 152 | + .ReadAllAsync(async value => {/*...*/}); |
| 153 | +``` |
| 154 | + |
| 155 | +### Pipelining / Transforming |
| 156 | + |
| 157 | +#### Transform and buffer entries |
| 158 | + |
| 159 | +```cs |
| 160 | +// Transform values in a source channel to new unbounded channel. |
| 161 | +var transformed = channel.Pipe( |
| 162 | + async value => /* transformation */); |
| 163 | +``` |
| 164 | + |
| 165 | +```cs |
| 166 | +// Transform values in a source channel to new unbounded channel with a max concurrency of X. |
| 167 | +const X = 4; |
| 168 | +var transformed = channel.Pipe( |
| 169 | + X, async value => /* transformation */); |
| 170 | +``` |
| 171 | + |
| 172 | +```cs |
| 173 | +// Transform values in a source channel to new bounded channel bound of N entries. |
| 174 | +const N = 5; |
| 175 | +var transformed = channel.Pipe( |
| 176 | + async value => /* transformation */, N); |
| 177 | +``` |
| 178 | + |
| 179 | +```cs |
| 180 | +// Transform values in a source channel to new bounded channel bound of N entries with a max concurrency of X. |
| 181 | +const X = 4; |
| 182 | +const N = 5; |
| 183 | +var transformed = channel.Pipe( |
| 184 | + X, async value => /* transformation */, N); |
| 185 | + |
| 186 | +// or |
| 187 | +transformed = channel.Pipe( |
| 188 | + maxConcurrency: X, |
| 189 | + capacity: N, |
| 190 | + transform: async value => /* transformation */); |
| 191 | +``` |
0 commit comments