Skip to content

Commit 3b2581e

Browse files
author
Oren (electricessence)
committed
Reverted default channel options for AllowSynchronousContinuations to false.
Exposed channelOptions for some methods.
1 parent 1704458 commit 3b2581e

10 files changed

+148
-62
lines changed

Open.ChannelExtensions/BufferingChannelReader.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ abstract class BufferingChannelReader<TIn, TOut> : ChannelReader<TOut>
1010
{
1111
protected ChannelReader<TIn>? Source;
1212
protected readonly Channel<TOut>? Buffer;
13-
public BufferingChannelReader(ChannelReader<TIn> source, bool singleReader)
13+
public BufferingChannelReader(ChannelReader<TIn> source, bool singleReader, bool syncCont = false)
1414
{
1515
Source = source ?? throw new ArgumentNullException(nameof(source));
1616
Contract.EndContractBlock();
@@ -21,7 +21,7 @@ public BufferingChannelReader(ChannelReader<TIn> source, bool singleReader)
2121
}
2222
else
2323
{
24-
Buffer = Extensions.CreateChannel<TOut>(-1, singleReader);
24+
Buffer = Extensions.CreateChannel<TOut>(-1, singleReader, syncCont);
2525

2626
source.Completion.ContinueWith(t =>
2727
{

Open.ChannelExtensions/Documentation.xml

Lines changed: 25 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Open.ChannelExtensions/Extensions.Batch.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class BatchingChannelReader<T> : BufferingChannelReader<T, List<T>>
1212
private readonly int _batchSize;
1313
private List<T>? _current;
1414

15-
public BatchingChannelReader(ChannelReader<T> source, int batchSize, bool singleReader) : base(source, singleReader)
15+
public BatchingChannelReader(ChannelReader<T> source, int batchSize, bool singleReader, bool syncCont = false) : base(source, singleReader, syncCont)
1616
{
1717
if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize), batchSize, "Must be at least 1.");
1818
Contract.EndContractBlock();
@@ -69,8 +69,9 @@ protected override bool TryPipeItems()
6969
/// <param name="source">The channel to read from.</param>
7070
/// <param name="batchSize">The maximum size of each batch.</param>
7171
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
72+
/// <param name="allowSynchronousContinuations">True can reduce the amount of scheduling and markedly improve performance, but may produce unexpected or even undesirable behavior.</param>
7273
/// <returns>A channel reader containing the batches.</returns>
73-
public static ChannelReader<List<T>> Batch<T>(this ChannelReader<T> source, int batchSize, bool singleReader = false)
74-
=> new BatchingChannelReader<T>(source ?? throw new ArgumentNullException(nameof(source)), batchSize, singleReader);
74+
public static ChannelReader<List<T>> Batch<T>(this ChannelReader<T> source, int batchSize, bool singleReader = false, bool allowSynchronousContinuations = false)
75+
=> new BatchingChannelReader<T>(source ?? throw new ArgumentNullException(nameof(source)), batchSize, singleReader, allowSynchronousContinuations);
7576
}
7677
}

Open.ChannelExtensions/Extensions.Join.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,11 @@ public static ChannelReader<T> Join<T>(this ChannelReader<T[]> source, bool sing
9494
/// <typeparam name="T">The result type.</typeparam>
9595
/// <param name="source">The source reader.</param>
9696
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
97+
/// <param name="allowSynchronousContinuations">True can reduce the amount of scheduling and markedly improve performance, but may produce unexpected or even undesirable behavior.</param>
9798
/// <returns>A channel reader containing the joined results.</returns>
98-
public static ChannelReader<T> Join<T>(this ChannelReader<IAsyncEnumerable<T>> source, bool singleReader = false)
99+
public static ChannelReader<T> Join<T>(this ChannelReader<IAsyncEnumerable<T>> source, bool singleReader = false, bool allowSynchronousContinuations = false)
99100
{
100-
var buffer = CreateChannel<T>(1, singleReader);
101+
var buffer = CreateChannel<T>(1, singleReader, allowSynchronousContinuations);
101102
var writer = buffer.Writer;
102103

103104
_ = JoinCore();

Open.ChannelExtensions/Extensions.Pipe.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ async ValueTask PipeToCore()
6666
/// <typeparam name="TIn">The type contained by the source channel.</typeparam>
6767
/// <typeparam name="TOut">The outgoing type from the resultant channel.</typeparam>
6868
/// <param name="source">The source channel.</param>
69-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
69+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
7070
/// <param name="transform">The transform function to apply the source entries before passing on to the output.</param>
7171
/// <param name="capacity">The width of the pipe: how many entries to buffer while waiting to be read from.</param>
7272
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
@@ -108,7 +108,7 @@ async ValueTask ValueNotReady(ValueTask<TOut> value, CancellationToken token)
108108
/// <typeparam name="TRead">The type contained by the source channel.</typeparam>
109109
/// <typeparam name="TOut">The outgoing type from the resultant channel.</typeparam>
110110
/// <param name="source">The source channel.</param>
111-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
111+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
112112
/// <param name="transform">The transform function to apply the source entries before passing on to the output.</param>
113113
/// <param name="capacity">The width of the pipe: how many entries to buffer while waiting to be read from.</param>
114114
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
@@ -130,7 +130,7 @@ public static ChannelReader<TOut> PipeAsync<TWrite, TRead, TOut>(this Channel<TW
130130
/// <typeparam name="TIn">The type contained by the source channel.</typeparam>
131131
/// <typeparam name="TOut">The outgoing type from the resultant channel.</typeparam>
132132
/// <param name="source">The source channel.</param>
133-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
133+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
134134
/// <param name="transform">The transform function to apply the source entries before passing on to the output.</param>
135135
/// <param name="capacity">The width of the pipe: how many entries to buffer while waiting to be read from.</param>
136136
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
@@ -148,7 +148,7 @@ public static ChannelReader<TOut> TaskPipeAsync<TIn, TOut>(this ChannelReader<TI
148148
/// <typeparam name="TRead">The type contained by the source channel.</typeparam>
149149
/// <typeparam name="TOut">The outgoing type from the resultant channel.</typeparam>
150150
/// <param name="source">The source channel.</param>
151-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
151+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
152152
/// <param name="transform">The transform function to apply the source entries before passing on to the output.</param>
153153
/// <param name="capacity">The width of the pipe: how many entries to buffer while waiting to be read from.</param>
154154
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
@@ -170,7 +170,7 @@ public static ChannelReader<TOut> TaskPipeAsync<TWrite, TRead, TOut>(this Channe
170170
/// <typeparam name="TIn">The type contained by the source channel.</typeparam>
171171
/// <typeparam name="TOut">The outgoing type from the resultant channel.</typeparam>
172172
/// <param name="source">The source channel.</param>
173-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
173+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
174174
/// <param name="transform">The transform function to apply the source entries before passing on to the output.</param>
175175
/// <param name="capacity">The width of the pipe: how many entries to buffer while waiting to be read from.</param>
176176
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
@@ -188,7 +188,7 @@ public static ChannelReader<TOut> Pipe<TIn, TOut>(this ChannelReader<TIn> source
188188
/// <typeparam name="TRead">The type contained by the source channel.</typeparam>
189189
/// <typeparam name="TOut">The outgoing type from the resultant channel.</typeparam>
190190
/// <param name="source">The source channel.</param>
191-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
191+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
192192
/// <param name="transform">The transform function to apply the source entries before passing on to the output.</param>
193193
/// <param name="capacity">The width of the pipe: how many entries to buffer while waiting to be read from.</param>
194194
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>

Open.ChannelExtensions/Extensions.ReadConcurrently.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public static partial class Extensions
1414
/// </summary>
1515
/// <typeparam name="T">The item type.</typeparam>
1616
/// <param name="reader">The channel reader to read from.</param>
17-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
17+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
1818
/// <param name="receiver">The async receiver function.</param>
1919
/// <param name="cancellationToken">An optional cancellation token.</param>
2020
/// <returns>A task that completes when no more reading is to be done.</returns>
@@ -54,7 +54,7 @@ public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reade
5454
/// </summary>
5555
/// <typeparam name="T">The item type.</typeparam>
5656
/// <param name="reader">The channel reader to read from.</param>
57-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
57+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
5858
/// <param name="cancellationToken">The cancellation token.</param>
5959
/// <param name="receiver">The async receiver function.</param>
6060
/// <returns>A task that completes when no more reading is to be done.</returns>
@@ -70,7 +70,7 @@ public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reade
7070
/// </summary>
7171
/// <typeparam name="T">The item type.</typeparam>
7272
/// <param name="reader">The channel reader to read from.</param>
73-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
73+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
7474
/// <param name="receiver">The async receiver function.</param>
7575
/// <param name="cancellationToken">An optional cancellation token.</param>
7676
/// <returns>A task that completes when no more reading is to be done.</returns>
@@ -86,7 +86,7 @@ public static Task<long> TaskReadAllConcurrentlyAsync<T>(this ChannelReader<T> r
8686
/// <typeparam name="TWrite">The item type of the writer.</typeparam>
8787
/// <typeparam name="TRead">The item type of the reader.</typeparam>
8888
/// <param name="channel">The channel to read from.</param>
89-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
89+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
9090
/// <param name="receiver">The async receiver function.</param>
9191
/// <param name="cancellationToken">An optional cancellation token.</param>
9292
/// <returns>A task that completes when no more reading is to be done.</returns>
@@ -107,7 +107,7 @@ public static Task<long> ReadAllConcurrentlyAsync<TWrite, TRead>(this Channel<TW
107107
/// <typeparam name="TWrite">The item type of the writer.</typeparam>
108108
/// <typeparam name="TRead">The item type of the reader.</typeparam>
109109
/// <param name="channel">The channel to read from.</param>
110-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
110+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
111111
/// <param name="cancellationToken">The cancellation token.</param>
112112
/// <param name="receiver">The async receiver function.</param>
113113
/// <returns>A task that completes when no more reading is to be done.</returns>
@@ -124,7 +124,7 @@ public static Task<long> ReadAllConcurrentlyAsync<TWrite, TRead>(this Channel<TW
124124
/// <typeparam name="TWrite">The item type of the writer.</typeparam>
125125
/// <typeparam name="TRead">The item type of the reader.</typeparam>
126126
/// <param name="channel">The channel to read from.</param>
127-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
127+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
128128
/// <param name="receiver">The async receiver function.</param>
129129
/// <param name="cancellationToken">An optional cancellation token.</param>
130130
/// <returns>A task that completes when no more reading is to be done.</returns>
@@ -144,7 +144,7 @@ public static Task<long> TaskReadAllConcurrentlyAsync<TWrite, TRead>(this Channe
144144
/// </summary>
145145
/// <typeparam name="T">The item type.</typeparam>
146146
/// <param name="reader">The channel reader to read from.</param>
147-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
147+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
148148
/// <param name="receiver">The receiver function.</param>
149149
/// <param name="cancellationToken">An optional cancellation token.</param>
150150
/// <returns>A task that completes when no more reading is to be done.</returns>
@@ -165,7 +165,7 @@ public static Task<long> ReadAllConcurrently<T>(this ChannelReader<T> reader,
165165
/// </summary>
166166
/// <typeparam name="T">The item type.</typeparam>
167167
/// <param name="reader">The channel reader to read from.</param>
168-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
168+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
169169
/// <param name="cancellationToken">The cancellation token.</param>
170170
/// <param name="receiver">The receiver function.</param>
171171
/// <returns>A task that completes when no more reading is to be done.</returns>
@@ -182,7 +182,7 @@ public static Task<long> ReadAllConcurrently<T>(this ChannelReader<T> reader,
182182
/// <typeparam name="TWrite">The item type of the writer.</typeparam>
183183
/// <typeparam name="TRead">The item type of the reader.</typeparam>
184184
/// <param name="channel">The channel to read from.</param>
185-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
185+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
186186
/// <param name="receiver">The receiver function.</param>
187187
/// <param name="cancellationToken">An optional cancellation token.</param>
188188
/// <returns>A task that completes when no more reading is to be done.</returns>
@@ -203,7 +203,7 @@ public static Task<long> ReadAllConcurrently<TWrite, TRead>(this Channel<TWrite,
203203
/// <typeparam name="TWrite">The item type of the writer.</typeparam>
204204
/// <typeparam name="TRead">The item type of the reader.</typeparam>
205205
/// <param name="channel">The channel to read from.</param>
206-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
206+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
207207
/// <param name="cancellationToken">The cancellation token.</param>
208208
/// <param name="receiver">The receiver function.</param>
209209
/// <returns>A task that completes when no more reading is to be done.</returns>

Open.ChannelExtensions/Extensions.Source.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public static ChannelReader<TRead> Source<TWrite, TRead>(this Channel<TWrite, TR
152152
/// <typeparam name="TWrite">The input type of the channel.</typeparam>
153153
/// <typeparam name="TRead">The output type of the channel.</typeparam>
154154
/// <param name="target">The channel to write to.</param>
155-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
155+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
156156
/// <param name="source">The asynchronous source data to use.</param>
157157
/// <param name="cancellationToken">An optional cancellation token.</param>
158158
/// <returns>The channel reader.</returns>
@@ -175,7 +175,7 @@ public static ChannelReader<TRead> SourceAsync<TWrite, TRead>(this Channel<TWrit
175175
/// <typeparam name="TWrite">The input type of the channel.</typeparam>
176176
/// <typeparam name="TRead">The output type of the channel.</typeparam>
177177
/// <param name="target">The channel to write to.</param>
178-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
178+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
179179
/// <param name="source">The asynchronous source data to use.</param>
180180
/// <param name="cancellationToken">An optional cancellation token.</param>
181181
/// <returns>The channel reader.</returns>
@@ -201,7 +201,7 @@ public static ChannelReader<TRead> SourceAsync<TWrite, TRead>(this Channel<TWrit
201201
/// <typeparam name="TWrite">The input type of the channel.</typeparam>
202202
/// <typeparam name="TRead">The output type of the channel.</typeparam>
203203
/// <param name="target">The channel to write to.</param>
204-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
204+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
205205
/// <param name="source">The asynchronous source data to use.</param>
206206
/// <param name="cancellationToken">An optional cancellation token.</param>
207207
/// <returns>The channel reader.</returns>

0 commit comments

Comments
 (0)