Skip to content

Commit 46a112c

Browse files
Updated for .NET Standard 2.1
1 parent 96b3e18 commit 46a112c

File tree

5 files changed

+73
-40
lines changed

5 files changed

+73
-40
lines changed

Open.ChannelExtensions/Extensions.Source.cs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -136,36 +136,34 @@ public static ChannelReader<TRead> SourceAsync<TWrite, TRead>(this Channel<TWrit
136136
/// <summary>
137137
/// Writes all entries from the source to the channel. Calls complete when finished.
138138
/// </summary>
139-
/// <typeparam name="TWrite">The input type of the channel.</typeparam>
140-
/// <typeparam name="TRead">The output type of the channel.</typeparam>
139+
/// <typeparam name="T">The output type of the channel.</typeparam>
141140
/// <param name="target">The channel to write to.</param>
142-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
143141
/// <param name="source">The source data to use.</param>
144142
/// <param name="cancellationToken">An optional cancellation token.</param>
145143
/// <returns>The channel reader.</returns>
146-
public static ChannelReader<TRead> Source<TWrite, TRead>(this Channel<TWrite, TRead> target,
147-
int maxConcurrency, IEnumerable<TWrite> source, CancellationToken cancellationToken = default)
144+
public static ChannelReader<T> Source<T>(this Channel<string, T> target,
145+
TextReader source, CancellationToken cancellationToken = default)
148146
{
149-
if (maxConcurrency == 1)
150-
return target.Source(source, cancellationToken);
151-
152-
target.Writer.WriteAllConcurrently(maxConcurrency, source, true, cancellationToken).ConfigureAwait(false);
147+
target.Writer.WriteAllLines(source, true, cancellationToken).ConfigureAwait(false);
153148
return target.Reader;
154149
}
155150

151+
#if NETSTANDARD2_1
156152
/// <summary>
157-
/// Writes all entries from the source to the channel. Calls complete when finished.
153+
/// Executes all entries from the source and passes their result to the channel. Calls complete when finished.
158154
/// </summary>
159-
/// <typeparam name="T">The output type of the channel.</typeparam>
155+
/// <typeparam name="TWrite">The input type of the channel.</typeparam>
156+
/// <typeparam name="TRead">The output type of the channel.</typeparam>
160157
/// <param name="target">The channel to write to.</param>
161-
/// <param name="source">The source data to use.</param>
158+
/// <param name="source">The asynchronous source data to use.</param>
162159
/// <param name="cancellationToken">An optional cancellation token.</param>
163160
/// <returns>The channel reader.</returns>
164-
public static ChannelReader<T> Source<T>(this Channel<string, T> target,
165-
TextReader source, CancellationToken cancellationToken = default)
161+
public static ChannelReader<TRead> Source<TWrite, TRead>(this Channel<TWrite, TRead> target,
162+
IAsyncEnumerable<TWrite> source, CancellationToken cancellationToken = default)
166163
{
167-
target.Writer.WriteAllLines(source, true, cancellationToken).ConfigureAwait(false);
164+
target.Writer.WriteAllAsync(source, true, cancellationToken).ConfigureAwait(false);
168165
return target.Reader;
169166
}
167+
#endif
170168
}
171169
}

Open.ChannelExtensions/Extensions.Write.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,40 @@ public static async ValueTask<long> WriteAllLines(this ChannelWriter<string> tar
131131

132132
return count;
133133
}
134+
135+
#if NETSTANDARD2_1
136+
/// <summary>
137+
/// Asynchronously writes all entries from the source to the channel.
138+
/// </summary>
139+
/// <typeparam name="T">The input type of the channel.</typeparam>
140+
/// <param name="target">The channel to write to.</param>
141+
/// <param name="source">The asynchronous source data to use.</param>
142+
/// <param name="complete">If true, will call .Complete() if all the results have successfully been written (or the source is emtpy).</param>
143+
/// <param name="cancellationToken">An optional cancellation token.</param>
144+
/// <returns>A task containing the count of items written that completes when all the data has been written to the channel writer.
145+
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
146+
public static async ValueTask<long> WriteAllAsync<T>(this ChannelWriter<T> target,
147+
IAsyncEnumerable<T> source, bool complete = false, CancellationToken cancellationToken = default)
148+
{
149+
await target.WaitToWriteAndThrowIfClosedAsync(
150+
"The target channel was closed before writing could begin.",
151+
cancellationToken);
152+
153+
long count = 0;
154+
var next = new ValueTask();
155+
await foreach (var value in source)
156+
{
157+
await next.ConfigureAwait(false);
158+
count++;
159+
next = target.WriteAsync(value, cancellationToken);
160+
}
161+
await next.ConfigureAwait(false);
162+
163+
if (complete)
164+
target.Complete();
165+
166+
return count;
167+
}
168+
#endif
134169
}
135170
}

Open.ChannelExtensions/Extensions.WriteConcurrently.cs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -123,21 +123,5 @@ public static Task<long> WriteAllConcurrentlyAsync<T>(this ChannelWriter<T> targ
123123
public static Task<long> WriteAllConcurrentlyAsync<T>(this ChannelWriter<T> target,
124124
int maxConcurrency, IEnumerable<Func<T>> source, bool complete = false, CancellationToken cancellationToken = default)
125125
=> WriteAllConcurrentlyAsync(target, maxConcurrency, source.Select(e => new ValueTask<T>(e())), complete, cancellationToken);
126-
127-
/// <summary>
128-
/// Asynchronously writes all entries from the source to the channel.
129-
/// </summary>
130-
/// <typeparam name="T">The input type of the channel.</typeparam>
131-
/// <param name="target">The channel to write to.</param>
132-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
133-
/// <param name="source">The source data to use.</param>
134-
/// <param name="complete">If true, will call .Complete() if all the results have successfully been written (or the source is emtpy).</param>
135-
/// <param name="cancellationToken">An optional cancellation token.</param>
136-
/// <returns>A task containing the count of items written that completes when all the data has been written to the channel writer.
137-
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
138-
public static Task<long> WriteAllConcurrently<T>(this ChannelWriter<T> target,
139-
int maxConcurrency, IEnumerable<T> source, bool complete = false, CancellationToken cancellationToken = default)
140-
=> WriteAllConcurrentlyAsync(target, maxConcurrency, source.Select(e => new ValueTask<T>(e)), complete, cancellationToken);
141-
142126
}
143127
}

Open.ChannelExtensions/Extensions._.cs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,29 @@ public static ChannelReader<string> ToChannel(this TextReader source,
9797
/// <typeparam name="T">The input type of the channel.</typeparam>
9898
/// <param name="source">The source data to use.</param>
9999
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
100-
/// <param name="maxConcurrency">The maximum number of concurrent operations.</param>
101100
/// <param name="cancellationToken">An optional cancellation token.</param>
102101
/// <returns>The channel reader containing the results.</returns>
103102
public static ChannelReader<T> ToChannel<T>(this IEnumerable<T> source,
104-
int capacity = -1, bool singleReader = false, int maxConcurrency = 1,
103+
int capacity = -1, bool singleReader = false,
105104
CancellationToken cancellationToken = default)
106105
=> CreateChannel<T>(capacity, singleReader)
107-
.Source(maxConcurrency, source, cancellationToken);
106+
.Source(source, cancellationToken);
107+
108+
#if NETSTANDARD2_1
109+
/// <summary>
110+
/// Writes all entries from the source to a channel and calls complete when finished.
111+
/// </summary>
112+
/// <typeparam name="T">The input type of the channel.</typeparam>
113+
/// <param name="source">The source data to use.</param>
114+
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
115+
/// <param name="cancellationToken">An optional cancellation token.</param>
116+
/// <returns>The channel reader containing the results.</returns>
117+
public static ChannelReader<T> ToChannel<T>(this IAsyncEnumerable<T> source,
118+
int capacity = -1, bool singleReader = false,
119+
CancellationToken cancellationToken = default)
120+
=> CreateChannel<T>(capacity, singleReader)
121+
.Source(source, cancellationToken);
122+
#endif
108123

109124
/// <summary>
110125
/// Asynchronously executes all entries and writes their results to a channel.

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
4-
<TargetFrameworks>netstandard2.0</TargetFrameworks>
4+
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
55
<LangVersion>latest</LangVersion>
6+
<NullableReferenceTypes>true</NullableReferenceTypes>
67
<Authors>electricessence</Authors>
78
<Description>
89
A set of extensions for optimizing/simplifying System.Threading.Channels usage.
@@ -18,15 +19,15 @@
1819
<RepositoryType>git</RepositoryType>
1920
<PackageTags>channels dotnet threading tasks extensions</PackageTags>
2021
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
21-
<Version>2.5.0</Version>
22+
<Version>2.6.0</Version>
2223
<PackageLicenseExpression>MIT</PackageLicenseExpression>
23-
<AssemblyVersion>2.5.0.0</AssemblyVersion>
24-
<FileVersion>2.5.0.0</FileVersion>
25-
<PackageReleaseNotes></PackageReleaseNotes>
24+
<AssemblyVersion>2.6.0.0</AssemblyVersion>
25+
<FileVersion>2.6.0.0</FileVersion>
26+
<PackageReleaseNotes>Support for .NET Standard 2.1 and IAsyncEnumerable.</PackageReleaseNotes>
2627
</PropertyGroup>
2728

2829
<ItemGroup>
29-
<PackageReference Include="System.Threading.Channels" Version="4.5.0" />
30+
<PackageReference Include="System.Threading.Channels" Version="4.6.0" />
3031
</ItemGroup>
3132

3233
</Project>

0 commit comments

Comments
 (0)