Skip to content

Commit 8a7c497

Browse files
committed
add AsyncEnumerable support to thread safe sockets
1 parent 4c7b847 commit 8a7c497

File tree

5 files changed

+196
-2
lines changed

5 files changed

+196
-2
lines changed

.github/workflows/CI.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ jobs:
99
- run: dotnet restore src/NetMQ.sln
1010
- name: build
1111
run: dotnet build src/NetMQ.sln /p:Configuration=Release /verbosity:minimal
12-
- name: test
12+
- name: test netcoreapp3.1
13+
run: dotnet test -v n -p:ParallelizeTestCollections=false --configuration Release --no-build -f netcoreapp3.1 src/NetMQ.Tests/NetMQ.Tests.csproj
14+
- name: test netcoreapp2.1
1315
run: dotnet test -v n -p:ParallelizeTestCollections=false --configuration Release --no-build -f netcoreapp2.1 src/NetMQ.Tests/NetMQ.Tests.csproj
1416
windows:
1517
runs-on: windows-latest
@@ -24,6 +26,8 @@ jobs:
2426
run: dotnet build src/NetMQ.sln /p:Configuration=Release /verbosity:minimal
2527
- name: test netcoreapp2.1
2628
run: dotnet test -v n -p:ParallelizeTestCollections=false --configuration Release --no-build -f netcoreapp2.1 src\NetMQ.Tests\NetMQ.Tests.csproj
29+
- name: test netcoreapp3.1
30+
run: dotnet test -v n -p:ParallelizeTestCollections=false --configuration Release --no-build -f netcoreapp3.1 src\NetMQ.Tests\NetMQ.Tests.csproj
2731
- name: test net47
2832
run: dotnet test -v n -p:ParallelizeTestCollections=false --configuration Release --no-build -f net47 src\NetMQ.Tests\NetMQ.Tests.csproj
2933
- name: coverage

src/NetMQ.Tests/ClientServer.cs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,19 @@
44
using NetMQ;
55
using NetMQ.Sockets;
66
using Xunit;
7+
using Xunit.Abstractions;
78

89
namespace NetMQ.Tests
910
{
1011
public class ClientServer
1112
{
13+
private readonly ITestOutputHelper m_testOutputHelper;
14+
15+
public ClientServer(ITestOutputHelper testOutputHelper)
16+
{
17+
m_testOutputHelper = testOutputHelper;
18+
}
19+
1220
[Fact]
1321
public void Inproc()
1422
{
@@ -73,5 +81,83 @@ public async void AsyncWithCancellationToken()
7381

7482
await Assert.ThrowsAsync<OperationCanceledException>(async () => await server.ReceiveStringAsync(source.Token));
7583
}
84+
85+
#if NETCOREAPP3_1
86+
87+
[Fact(Timeout = 120)]
88+
public async void AsyncEnumerableCanceled()
89+
{
90+
using CancellationTokenSource source = new CancellationTokenSource();
91+
using var server = new ServerSocket();
92+
93+
source.CancelAfter(100);
94+
95+
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
96+
{
97+
await foreach (var msg in server.ReceiveStringAsyncEnumerable(source.Token))
98+
return;
99+
});
100+
}
101+
102+
[Fact(Timeout = 1000)]
103+
public void AsyncEnumerable()
104+
{
105+
using var server = new ServerSocket();
106+
int port = server.BindRandomPort("tcp://*");
107+
108+
using var client = new ClientSocket();
109+
client.Connect($"tcp://127.0.0.1:{port}");
110+
111+
int totalCount = 0;
112+
113+
var t1 = Task.Run(async () =>
114+
{
115+
int count = 0;
116+
117+
await foreach (var (_, msg) in server.ReceiveStringAsyncEnumerable())
118+
{
119+
count++;
120+
Interlocked.Increment(ref totalCount);
121+
122+
if (msg == "1")
123+
{
124+
m_testOutputHelper.WriteLine($"T1 read {count} messages");
125+
return;
126+
}
127+
}
128+
});
129+
130+
var t2 = Task.Run(async () => {
131+
int count = 0;
132+
133+
await foreach (var (_, msg) in server.ReceiveStringAsyncEnumerable())
134+
{
135+
count++;
136+
Interlocked.Increment(ref totalCount);
137+
138+
if (msg == "1")
139+
{
140+
m_testOutputHelper.WriteLine($"T2 read {count} messages");
141+
return;
142+
}
143+
}
144+
});
145+
146+
for (int i = 0; i < 15000; i++)
147+
{
148+
client.Send("0");
149+
}
150+
151+
// Send the end message to both of the threads
152+
client.Send("1");
153+
client.Send("1");
154+
155+
t1.Wait();
156+
t2.Wait();
157+
158+
Assert.Equal(15002, totalCount);
159+
}
160+
161+
#endif
76162
}
77163
}

src/NetMQ.Tests/NetMQ.Tests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
99
<PackageTargetFallback Condition=" '$(TargetFramework)' == 'netcoreapp1.0' ">$(PackageTargetFallback);netcoreapp1.0;portable-net45+win8</PackageTargetFallback>
1010
<IsTestProject>true</IsTestProject>
11-
<TargetFrameworks>netcoreapp2.1;net47</TargetFrameworks>
11+
<TargetFrameworks>netcoreapp3.1;netcoreapp2.1;net47</TargetFrameworks>
1212
<LangVersion>8</LangVersion>
1313
</PropertyGroup>
1414

src/NetMQ/ReceiveThreadSafeSocketExtensions.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#nullable enable
22

33
using System;
4+
using System.Collections.Generic;
45
using System.Diagnostics.CodeAnalysis;
6+
using System.Runtime.CompilerServices;
57
using System.Text;
68
using System.Threading;
79
using System.Threading.Tasks;
@@ -117,6 +119,31 @@ public static ValueTask<byte[]> ReceiveBytesAsync(this IThreadSafeInSocket socke
117119

118120
#endregion
119121

122+
#region AsyncEnumerable
123+
124+
#if NETSTANDARD2_1
125+
126+
/// <summary>
127+
/// Provides a consuming IAsyncEnumerable for receiving messages from the socket.
128+
/// </summary>
129+
/// <param name="socket">The socket to receive from.</param>
130+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
131+
/// <returns>An IAsyncEnumerable that receive and returns messages from the socket.</returns>
132+
/// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
133+
public static async IAsyncEnumerable<byte[]> ReceiveBytesAsyncEnumerable(
134+
this IThreadSafeInSocket socket,
135+
[EnumeratorCancellation] CancellationToken cancellationToken = default)
136+
{
137+
while (true)
138+
{
139+
yield return await socket.ReceiveBytesAsync(cancellationToken);
140+
}
141+
}
142+
143+
#endif
144+
145+
#endregion
146+
120147
#endregion
121148

122149
#region Receiving string
@@ -273,6 +300,31 @@ public static ValueTask<string> ReceiveStringAsync(this IThreadSafeInSocket sock
273300
cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
274301
}
275302

303+
#endregion
304+
305+
#region AsyncEnumerable
306+
307+
#if NETSTANDARD2_1
308+
309+
/// <summary>
310+
/// Provides a consuming IAsyncEnumerable for receiving messages from the socket.
311+
/// </summary>
312+
/// <param name="socket">The socket to receive from.</param>
313+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
314+
/// <returns>An IAsyncEnumerable that receive and returns messages from the socket.</returns>
315+
/// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
316+
public static async IAsyncEnumerable<string> ReceiveStringAsyncEnumerable(
317+
this IThreadSafeInSocket socket,
318+
[EnumeratorCancellation] CancellationToken cancellationToken = default)
319+
{
320+
while (true)
321+
{
322+
yield return await socket.ReceiveStringAsync(cancellationToken);
323+
}
324+
}
325+
326+
#endif
327+
276328
#endregion
277329

278330
#endregion

src/NetMQ/RoutingIdSocketExtensions.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#nullable enable
22

33
using System;
4+
using System.Collections.Generic;
45
using System.Diagnostics.CodeAnalysis;
6+
using System.Runtime.CompilerServices;
57
using System.Text;
68
using System.Threading;
79
using System.Threading.Tasks;
@@ -368,6 +370,31 @@ public static bool TryReceiveBytes(this IRoutingIdSocket socket, TimeSpan timeou
368370
cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
369371
}
370372

373+
#endregion
374+
375+
#region AsyncEnumerable
376+
377+
#if NETSTANDARD2_1
378+
379+
/// <summary>
380+
/// Provides a consuming IAsyncEnumerable for receiving messages from the socket.
381+
/// </summary>
382+
/// <param name="socket">The socket to receive from.</param>
383+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
384+
/// <returns>An IAsyncEnumerable that receive and returns messages from the socket.</returns>
385+
/// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
386+
public static async IAsyncEnumerable<(uint, byte[])> ReceiveBytesAsyncEnumerable(
387+
this IRoutingIdSocket socket,
388+
[EnumeratorCancellation] CancellationToken cancellationToken = default)
389+
{
390+
while (true)
391+
{
392+
yield return await socket.ReceiveBytesAsync(cancellationToken);
393+
}
394+
}
395+
396+
#endif
397+
371398
#endregion
372399

373400
#endregion
@@ -535,6 +562,31 @@ public static bool TryReceiveString(this IRoutingIdSocket socket, TimeSpan timeo
535562
cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
536563
}
537564

565+
#endregion
566+
567+
#region AsyncEnumerable
568+
569+
#if NETSTANDARD2_1
570+
571+
/// <summary>
572+
/// Provides a consuming IAsyncEnumerable for receiving messages from the socket.
573+
/// </summary>
574+
/// <param name="socket">The socket to receive from.</param>
575+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
576+
/// <returns>An IAsyncEnumerable that receive and returns messages from the socket.</returns>
577+
/// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
578+
public static async IAsyncEnumerable<(uint, string)> ReceiveStringAsyncEnumerable(
579+
this IRoutingIdSocket socket,
580+
[EnumeratorCancellation] CancellationToken cancellationToken = default)
581+
{
582+
while (true)
583+
{
584+
yield return await socket.ReceiveStringAsync(cancellationToken);
585+
}
586+
}
587+
588+
#endif
589+
538590
#endregion
539591

540592
#endregion

0 commit comments

Comments
 (0)