Skip to content

Commit f43d7a6

Browse files
More task scheduling improvements.
1 parent 9c56376 commit f43d7a6

File tree

5 files changed

+127
-13
lines changed

5 files changed

+127
-13
lines changed

Open.ChannelExtensions.Tests/Open.ChannelExtensions.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
<TargetFramework>net9.0</TargetFramework>
66
<LangVersion>latest</LangVersion>
77
<IsPackable>false</IsPackable>
8+
<RunSettingsFilePath>D:\Users\essence\Development\- GitHub\- Open-NET-Libraries\Open.ChannelExtensions\Open.ChannelExtensions.Tests\bin\Debug\net9.0\fine-code-coverage\coverage-tool-output\Open.ChannelExtensions.Tests-fcc-mscodecoverage-generated.runsettings</RunSettingsFilePath>
89
</PropertyGroup>
910

1011
<ItemGroup>
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
namespace Open.ChannelExtensions.Tests;
2+
3+
public static class TaskSchedulerTests
4+
{
5+
[Fact]
6+
public static async Task ReadAllConcurrentlyAsync_UsesCurrentScheduler()
7+
{
8+
// Arrange
9+
var channel = Channel.CreateUnbounded<int>();
10+
var testScheduler = new TestTaskScheduler();
11+
const int maxConcurrency = 3;
12+
const int itemsToProcess = 10000;
13+
14+
// Populate the channel with test data
15+
for (int i = 0; i < itemsToProcess; i++)
16+
{
17+
await channel.Writer.WriteAsync(i);
18+
}
19+
channel.Writer.Complete();
20+
21+
// Act
22+
var itemsProcessed = await channel.Reader
23+
.ReadAllConcurrentlyAsync(
24+
maxConcurrency,
25+
testScheduler,
26+
_ =>
27+
{
28+
Assert.True(testScheduler == TaskScheduler.Current, "The custom scheduler was not used.");
29+
return ValueTask.CompletedTask;
30+
});
31+
32+
// Assert
33+
Assert.Equal(itemsToProcess, itemsProcessed);
34+
}
35+
36+
private class TestTaskScheduler : TaskScheduler
37+
{
38+
protected override IEnumerable<Task> GetScheduledTasks() => null;
39+
40+
protected override void QueueTask(Task task)
41+
=> Task.Factory.StartNew(() => TryExecuteTask(task),
42+
CancellationToken.None,
43+
TaskCreationOptions.None,
44+
TaskScheduler.Default);
45+
46+
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
47+
=> TryExecuteTask(task);
48+
}
49+
}

Open.ChannelExtensions/Extensions.Pipe.cs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,49 @@ public static async ValueTask<long> PipeTo<T>(this ChannelReader<T> source,
5454
/// <typeparam name="T">The type contained by the source channel and written to the target..</typeparam>
5555
/// <param name="source">The source channel.</param>
5656
/// <param name="target">The target channel.</param>
57+
/// <param name="scheduler">The task scheduler to use for the operation.</param>
58+
/// <param name="taskCreationOptions">The options to use for starting the task.</param>
5759
/// <param name="cancellationToken">An optional cancellation token.</param>
5860
/// <returns>The channel reader of the target.</returns>
5961
public static ChannelReader<T> PipeTo<T>(this ChannelReader<T> source,
6062
Channel<T> target,
63+
TaskScheduler scheduler,
64+
TaskCreationOptions taskCreationOptions = TaskCreationOptions.None,
6165
CancellationToken cancellationToken = default)
6266
{
6367
if (source is null) throw new ArgumentNullException(nameof(source));
6468
if (target is null) throw new ArgumentNullException(nameof(target));
6569
Contract.EndContractBlock();
6670

67-
Task.Run(() => PipeTo(source, target.Writer, true, cancellationToken));
71+
_ = Task.Factory.StartNew(
72+
() => PipeTo(source, target.Writer, true, cancellationToken),
73+
cancellationToken,
74+
taskCreationOptions,
75+
scheduler);
6876

6977
return target.Reader;
7078
}
7179

80+
/// <inheritdoc cref="PipeTo{T}(ChannelReader{T}, Channel{T}, TaskScheduler, TaskCreationOptions, CancellationToken)"/>
81+
public static ChannelReader<T> PipeTo<T>(this ChannelReader<T> source,
82+
Channel<T> target,
83+
TaskScheduler scheduler,
84+
CancellationToken cancellationToken)
85+
=> PipeTo<T>(source, target, scheduler, TaskCreationOptions.None, cancellationToken);
86+
87+
/// <inheritdoc cref="PipeTo{T}(ChannelReader{T}, Channel{T}, TaskScheduler, TaskCreationOptions, CancellationToken)"/>
88+
public static ChannelReader<T> PipeTo<T>(this ChannelReader<T> source,
89+
Channel<T> target,
90+
CancellationToken cancellationToken)
91+
=> PipeTo<T>(source, target, TaskScheduler.Current, TaskCreationOptions.None, cancellationToken);
92+
93+
/// <inheritdoc cref="PipeTo{T}(ChannelReader{T}, Channel{T}, TaskScheduler, TaskCreationOptions, CancellationToken)"/>
94+
public static ChannelReader<T> PipeTo<T>(this ChannelReader<T> source,
95+
Channel<T> target,
96+
TaskCreationOptions taskCreationOptions = TaskCreationOptions.None,
97+
CancellationToken cancellationToken = default)
98+
=> PipeTo<T>(source, target, TaskScheduler.Current, taskCreationOptions, cancellationToken);
99+
72100
/// <summary>
73101
/// Reads all entries concurrently and applies the values to the provided transform function before buffering the results into another channel for consumption.
74102
/// </summary>

Open.ChannelExtensions/Extensions.ReadConcurrently.cs

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,34 @@ public static partial class Extensions
88
/// <typeparam name="T">The item type.</typeparam>
99
/// <param name="reader">The channel reader to read from.</param>
1010
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
11+
/// <param name="taskCreationOptions">The task creation options to use.</param>
12+
/// <param name="scheduler">The task scheduler to use.</param>
1113
/// <param name="receiver">The async receiver function.</param>
12-
/// <param name="cancellationToken">An optional cancellation token.</param>
14+
/// <param name="cancellationToken">The cancellation token.</param>
1315
/// <returns>A task that completes when no more reading is to be done.</returns>
16+
public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reader,
17+
int maxConcurrency,
18+
TaskScheduler scheduler,
19+
TaskCreationOptions taskCreationOptions,
20+
Func<T, ValueTask> receiver,
21+
CancellationToken cancellationToken = default)
22+
=> Task.Factory
23+
.StartNew(
24+
() => ReadAllConcurrentlyAsync(reader, maxConcurrency, receiver, cancellationToken),
25+
cancellationToken,
26+
taskCreationOptions,
27+
scheduler)
28+
.Unwrap();
29+
30+
/// <inheritdoc cref="ReadAllConcurrentlyAsync{T}(ChannelReader{T}, int, TaskScheduler, TaskCreationOptions, Func{T, ValueTask}, CancellationToken)"/>
31+
public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reader,
32+
int maxConcurrency,
33+
TaskScheduler scheduler,
34+
Func<T, ValueTask> receiver,
35+
CancellationToken cancellationToken = default)
36+
=> ReadAllConcurrentlyAsync(reader, maxConcurrency, scheduler, TaskCreationOptions.None, receiver, cancellationToken);
37+
38+
/// <inheritdoc cref="ReadAllConcurrentlyAsync{T}(ChannelReader{T}, int, TaskScheduler, TaskCreationOptions, Func{T, ValueTask}, CancellationToken)"/>
1439
public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reader,
1540
int maxConcurrency,
1641
Func<T, ValueTask> receiver,
@@ -37,7 +62,7 @@ static async Task<long> ReadAllConcurrentlyAsyncCore(
3762
var readers = new Task<long>[maxConcurrency];
3863
var scheduler = TaskScheduler.Current;
3964
for (int r = 0; r < maxConcurrency; ++r)
40-
readers[r] = Read();
65+
readers[r] = Task.Factory.StartNew(Read, cancellationToken, TaskCreationOptions.None, scheduler).Unwrap();
4166

4267
// This produces the most accurate/reliable exception and cancellation results.
4368
return await Task
@@ -73,22 +98,33 @@ static async Task<long> SumAsync(Task<long[]> counts)
7398
}
7499
}
75100

76-
/// <summary>
77-
/// Reads items from the channel and passes them to the receiver.
78-
/// </summary>
79-
/// <typeparam name="T">The item type.</typeparam>
80-
/// <param name="reader">The channel reader to read from.</param>
81-
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
82-
/// <param name="cancellationToken">The cancellation token.</param>
83-
/// <param name="receiver">The async receiver function.</param>
84-
/// <returns>A task that completes when no more reading is to be done.</returns>
101+
/// <inheritdoc cref="ReadAllConcurrentlyAsync{T}(ChannelReader{T}, int, TaskScheduler, TaskCreationOptions, Func{T, ValueTask}, CancellationToken)"/>
85102
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
86103
public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reader,
87104
int maxConcurrency,
88105
CancellationToken cancellationToken,
89106
Func<T, ValueTask> receiver)
90107
=> ReadAllConcurrentlyAsync(reader, maxConcurrency, receiver, cancellationToken);
91108

109+
/// <inheritdoc cref="ReadAllConcurrentlyAsync{T}(ChannelReader{T}, int, TaskScheduler, TaskCreationOptions, Func{T, ValueTask}, CancellationToken)"/>
110+
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
111+
public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reader,
112+
int maxConcurrency,
113+
TaskScheduler scheduler,
114+
TaskCreationOptions taskCreationOptions,
115+
CancellationToken cancellationToken,
116+
Func<T, ValueTask> receiver)
117+
=> ReadAllConcurrentlyAsync(reader, maxConcurrency, scheduler, taskCreationOptions, receiver, cancellationToken);
118+
119+
/// <inheritdoc cref="ReadAllConcurrentlyAsync{T}(ChannelReader{T}, int, TaskScheduler, TaskCreationOptions, Func{T, ValueTask}, CancellationToken)"/>
120+
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
121+
public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reader,
122+
int maxConcurrency,
123+
TaskScheduler scheduler,
124+
CancellationToken cancellationToken,
125+
Func<T, ValueTask> receiver)
126+
=> ReadAllConcurrentlyAsync(reader, maxConcurrency, scheduler, TaskCreationOptions.None, receiver, cancellationToken);
127+
92128
/// <summary>
93129
/// Reads items from the channel and passes them to the receiver.
94130
/// </summary>

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<RepositoryType>git</RepositoryType>
2323
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2424
<GenerateDocumentationFile>true</GenerateDocumentationFile>
25-
<Version>9.0.0</Version>
25+
<Version>9.1.0</Version>
2626
<PackageReleaseNotes>Added .Merge, .PipeAsync, and .PropagateCompletion extensions.</PackageReleaseNotes>
2727
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2828
<PublishRepositoryUrl>true</PublishRepositoryUrl>

0 commit comments

Comments
 (0)