Skip to content

Commit 96b3e18

Browse files
author
Oren (electricessence)
committed
Final tweaks before .NET 3
1 parent 1ae8e49 commit 96b3e18

File tree

4 files changed

+25
-34
lines changed

4 files changed

+25
-34
lines changed

Open.ChannelExtensions/Extensions.Pipe.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public static ChannelReader<TOut> PipeAsync<TWrite, TRead, TOut>(this Channel<TW
9292
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
9393
/// <param name="cancellationToken">An optional cancellation token.</param>
9494
/// <returns>The channel reader containing the output.</returns>
95-
public static ChannelReader<TOut> PipeAsync<TIn, TOut>(this ChannelReader<TIn> source,
95+
public static ChannelReader<TOut> TaskPipeAsync<TIn, TOut>(this ChannelReader<TIn> source,
9696
int maxConcurrency, Func<TIn, Task<TOut>> transform, int capacity = -1, bool singleReader = false,
9797
CancellationToken cancellationToken = default)
9898
=> source.PipeAsync(maxConcurrency, e => new ValueTask<TOut>(transform(e)), capacity, singleReader, cancellationToken);
@@ -110,10 +110,10 @@ public static ChannelReader<TOut> PipeAsync<TIn, TOut>(this ChannelReader<TIn> s
110110
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
111111
/// <param name="cancellationToken">An optional cancellation token.</param>
112112
/// <returns>The channel reader containing the output.</returns>
113-
public static ChannelReader<TOut> PipeAsync<TWrite, TRead, TOut>(this Channel<TWrite, TRead> source,
113+
public static ChannelReader<TOut> TaskPipeAsync<TWrite, TRead, TOut>(this Channel<TWrite, TRead> source,
114114
int maxConcurrency, Func<TRead, Task<TOut>> transform, int capacity = -1, bool singleReader = false,
115115
CancellationToken cancellationToken = default)
116-
=> source.Reader.PipeAsync(maxConcurrency, transform, capacity, singleReader, cancellationToken);
116+
=> source.Reader.TaskPipeAsync(maxConcurrency, transform, capacity, singleReader, cancellationToken);
117117

118118
/// <summary>
119119
/// Reads all entries and applies the values to the provided transform function before buffering the results into another channel for consumption.
@@ -194,7 +194,7 @@ public static ChannelReader<TOut> PipeAsync<TWrite, TRead, TOut>(this Channel<TW
194194
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
195195
/// <param name="cancellationToken">An optional cancellation token.</param>
196196
/// <returns>The channel reader containing the output.</returns>
197-
public static ChannelReader<TOut> PipeAsync<TIn, TOut>(this ChannelReader<TIn> source,
197+
public static ChannelReader<TOut> TaskPipeAsync<TIn, TOut>(this ChannelReader<TIn> source,
198198
Func<TIn, Task<TOut>> transform, int capacity = -1, bool singleReader = false,
199199
CancellationToken cancellationToken = default)
200200
=> source.PipeAsync(e => new ValueTask<TOut>(transform(e)), capacity, singleReader, cancellationToken);
@@ -211,10 +211,10 @@ public static ChannelReader<TOut> PipeAsync<TIn, TOut>(this ChannelReader<TIn> s
211211
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
212212
/// <param name="cancellationToken">An optional cancellation token.</param>
213213
/// <returns>The channel reader containing the output.</returns>
214-
public static ChannelReader<TOut> PipeAsync<TWrite, TRead, TOut>(this Channel<TWrite, TRead> source,
214+
public static ChannelReader<TOut> TaskPipeAsync<TWrite, TRead, TOut>(this Channel<TWrite, TRead> source,
215215
Func<TRead, Task<TOut>> transform, int capacity = -1, bool singleReader = false,
216216
CancellationToken cancellationToken = default)
217-
=> source.Reader.PipeAsync(transform, capacity, singleReader, cancellationToken);
217+
=> source.Reader.TaskPipeAsync(transform, capacity, singleReader, cancellationToken);
218218

219219
/// <summary>
220220
/// Reads all entries and applies the values to the provided transform function before buffering the results into another channel for consumption.

Open.ChannelExtensions/Extensions.ReadConcurrently.cs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,8 @@ public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reade
4040
return Task
4141
.WhenAll(readers)
4242
.ContinueWith(
43-
t =>
44-
{
45-
if (t.IsFaulted)
46-
return Task.FromException<long>(t.Exception);
47-
if (t.IsCanceled)
48-
return Task.FromCanceled<long>(cancellationToken);
49-
return Task.FromResult(t.Result.Sum());
50-
},
51-
TaskContinuationOptions.ExecuteSynchronously)
52-
.Unwrap();
43+
t => t.Result.Sum(),
44+
TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);
5345

5446
ValueTask ParallelReceiver(T item, long i) => receiver(item);
5547
}

Open.ChannelExtensions/Extensions.WriteConcurrently.cs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,14 @@ public static Task<long> WriteAllConcurrentlyAsync<T>(this ChannelWriter<T> targ
4747
.ContinueWith(t =>
4848
{
4949
if (complete)
50-
target.Complete();
50+
target.Complete(t.Exception);
5151

52-
if (t.IsFaulted)
53-
return Task.FromException<long>(t.Exception);
54-
55-
if (t.IsCanceled)
56-
return Task.FromCanceled<long>(cancellationToken);
57-
58-
return Task.FromResult(t.Result.Sum());
52+
return t;
5953
}, TaskContinuationOptions.ExecuteSynchronously)
60-
.Unwrap();
54+
.Unwrap()
55+
.ContinueWith(
56+
t => t.Result.Sum(),
57+
TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);
6158

6259
// returns false if there's no more (wasn't cancelled).
6360
async Task<long> WriteAllAsyncCore()

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,27 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
4-
<TargetFramework>netstandard2.0</TargetFramework>
4+
<TargetFrameworks>netstandard2.0</TargetFrameworks>
55
<LangVersion>latest</LangVersion>
66
<Authors>electricessence</Authors>
7-
<Description>A set of extensions for optimizing/simplifying System.Threading.Channels usage.
7+
<Description>
8+
A set of extensions for optimizing/simplifying System.Threading.Channels usage.
89

9-
Includes:
10-
ReadUntilCancelled, ReadAll, ReadAllConcurrently, WriteAll, WriteAllConcurrently, and Pipe operations.
10+
Includes:
11+
ReadUntilCancelled, ReadAll, ReadAllConcurrently, WriteAll, WriteAllConcurrently, and Pipe operations.
1112

12-
Part of the "Open" set of libraries.</Description>
13+
Part of the "Open" set of libraries.
14+
</Description>
1315
<PackageProjectUrl>https://github.com/electricessence/Open.ChannelExtensions</PackageProjectUrl>
1416
<PackageLicenseUrl></PackageLicenseUrl>
1517
<Copyright>https://github.com/electricessence/Open.ChannelExtensions/blob/master/LICENSE</Copyright>
1618
<RepositoryType>git</RepositoryType>
1719
<PackageTags>channels dotnet threading tasks extensions</PackageTags>
1820
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
19-
<Version>2.4.1</Version>
21+
<Version>2.5.0</Version>
2022
<PackageLicenseExpression>MIT</PackageLicenseExpression>
21-
<AssemblyVersion>2.4.1.0</AssemblyVersion>
22-
<FileVersion>2.4.1.0</FileVersion>
23+
<AssemblyVersion>2.5.0.0</AssemblyVersion>
24+
<FileVersion>2.5.0.0</FileVersion>
2325
<PackageReleaseNotes></PackageReleaseNotes>
2426
</PropertyGroup>
2527

0 commit comments

Comments
 (0)