Skip to content

Commit 1e47aa0

Browse files
authored
Merge pull request #884 from somdoron/thread_safe
add AsyncEnumerable support to thread safe sockets
2 parents 4c7b847 + a7d7421 commit 1e47aa0

File tree

6 files changed

+267
-70
lines changed

6 files changed

+267
-70
lines changed

.github/workflows/CI.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ jobs:
99
- run: dotnet restore src/NetMQ.sln
1010
- name: build
1111
run: dotnet build src/NetMQ.sln /p:Configuration=Release /verbosity:minimal
12-
- name: test
12+
- name: test netcoreapp3.1
13+
run: dotnet test -v n -p:ParallelizeTestCollections=false --configuration Release --no-build -f netcoreapp3.1 src/NetMQ.Tests/NetMQ.Tests.csproj
14+
- name: test netcoreapp2.1
1315
run: dotnet test -v n -p:ParallelizeTestCollections=false --configuration Release --no-build -f netcoreapp2.1 src/NetMQ.Tests/NetMQ.Tests.csproj
1416
windows:
1517
runs-on: windows-latest
@@ -24,6 +26,8 @@ jobs:
2426
run: dotnet build src/NetMQ.sln /p:Configuration=Release /verbosity:minimal
2527
- name: test netcoreapp2.1
2628
run: dotnet test -v n -p:ParallelizeTestCollections=false --configuration Release --no-build -f netcoreapp2.1 src\NetMQ.Tests\NetMQ.Tests.csproj
29+
- name: test netcoreapp3.1
30+
run: dotnet test -v n -p:ParallelizeTestCollections=false --configuration Release --no-build -f netcoreapp3.1 src\NetMQ.Tests\NetMQ.Tests.csproj
2731
- name: test net47
2832
run: dotnet test -v n -p:ParallelizeTestCollections=false --configuration Release --no-build -f net47 src\NetMQ.Tests\NetMQ.Tests.csproj
2933
- name: coverage

src/NetMQ.Tests/ClientServer.cs

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,19 @@
44
using NetMQ;
55
using NetMQ.Sockets;
66
using Xunit;
7+
using Xunit.Abstractions;
78

89
namespace NetMQ.Tests
910
{
1011
public class ClientServer
1112
{
13+
private readonly ITestOutputHelper m_testOutputHelper;
14+
15+
public ClientServer(ITestOutputHelper testOutputHelper)
16+
{
17+
m_testOutputHelper = testOutputHelper;
18+
}
19+
1220
[Fact]
1321
public void Inproc()
1422
{
@@ -71,7 +79,85 @@ public async void AsyncWithCancellationToken()
7179

7280
source.CancelAfter(100);
7381

74-
await Assert.ThrowsAsync<OperationCanceledException>(async () => await server.ReceiveStringAsync(source.Token));
82+
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await server.ReceiveStringAsync(source.Token));
83+
}
84+
85+
#if NETCOREAPP3_1
86+
87+
[Fact(Timeout = 120)]
88+
public async void AsyncEnumerableCanceled()
89+
{
90+
using CancellationTokenSource source = new CancellationTokenSource();
91+
using var server = new ServerSocket();
92+
93+
source.CancelAfter(100);
94+
95+
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
96+
{
97+
await foreach (var msg in server.ReceiveStringAsyncEnumerable(source.Token))
98+
return;
99+
});
100+
}
101+
102+
[Fact(Timeout = 1000)]
103+
public void AsyncEnumerable()
104+
{
105+
using var server = new ServerSocket();
106+
int port = server.BindRandomPort("tcp://*");
107+
108+
using var client = new ClientSocket();
109+
client.Connect($"tcp://127.0.0.1:{port}");
110+
111+
int totalCount = 0;
112+
113+
var t1 = Task.Run(async () =>
114+
{
115+
int count = 0;
116+
117+
await foreach (var (_, msg) in server.ReceiveStringAsyncEnumerable())
118+
{
119+
count++;
120+
Interlocked.Increment(ref totalCount);
121+
122+
if (msg == "1")
123+
{
124+
m_testOutputHelper.WriteLine($"T1 read {count} messages");
125+
return;
126+
}
127+
}
128+
});
129+
130+
var t2 = Task.Run(async () => {
131+
int count = 0;
132+
133+
await foreach (var (_, msg) in server.ReceiveStringAsyncEnumerable())
134+
{
135+
count++;
136+
Interlocked.Increment(ref totalCount);
137+
138+
if (msg == "1")
139+
{
140+
m_testOutputHelper.WriteLine($"T2 read {count} messages");
141+
return;
142+
}
143+
}
144+
});
145+
146+
for (int i = 0; i < 15000; i++)
147+
{
148+
client.Send("0");
149+
}
150+
151+
// Send the end message to both of the threads
152+
client.Send("1");
153+
client.Send("1");
154+
155+
t1.Wait();
156+
t2.Wait();
157+
158+
Assert.Equal(15002, totalCount);
75159
}
160+
161+
#endif
76162
}
77163
}

src/NetMQ.Tests/NetMQ.Tests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
99
<PackageTargetFallback Condition=" '$(TargetFramework)' == 'netcoreapp1.0' ">$(PackageTargetFallback);netcoreapp1.0;portable-net45+win8</PackageTargetFallback>
1010
<IsTestProject>true</IsTestProject>
11-
<TargetFrameworks>netcoreapp2.1;net47</TargetFrameworks>
11+
<TargetFrameworks>netcoreapp3.1;netcoreapp2.1;net47</TargetFrameworks>
1212
<LangVersion>8</LangVersion>
1313
</PropertyGroup>
1414

src/NetMQ.Tests/ZMTPTests.cs

Lines changed: 70 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -23,52 +23,49 @@ public class ZMTPTests : IClassFixture<CleanupAfterFixture>
2323
{
2424
public ZMTPTests() => NetMQConfig.Cleanup();
2525

26+
private byte[] ReadRawXBytes(Socket raw, int toRead)
27+
{
28+
var bytes = new byte[toRead];
29+
int read = raw.Receive(bytes);
30+
31+
while (read < toRead)
32+
read += raw.Receive(bytes, read, toRead - read, SocketFlags.None);
33+
34+
return bytes;
35+
}
36+
2637
[Fact]
2738
public void V2Test()
2839
{
29-
using (var raw = new StreamSocket())
30-
using (var socket = new DealerSocket())
31-
{
32-
int port = raw.BindRandomPort("tcp://*");
33-
socket.Connect($"tcp://localhost:{port}");
40+
using var raw = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
41+
using var socket = new DealerSocket();
3442

35-
var routingId = raw.ReceiveFrameBytes();
36-
var preamble = raw.ReceiveFrameBytes();
37-
Assert.Equal(10, preamble.Length);
43+
int port = socket.BindRandomPort("tcp://*");
44+
raw.Connect("127.0.0.1", port);
3845

39-
raw.SendMoreFrame(routingId).SendFrame(new byte[] {0xff,0,0,0,0,0,0,0,0,0x7f,1,5}); // Signature
40-
raw.SendMoreFrame(routingId).SendFrame(new byte[] {0, 0}); // Empty Identity
41-
raw.SendMoreFrame(routingId).SendFrame(new byte[] {0, 1, 5}); // One byte message
42-
43-
// Receive rest of the greeting
44-
raw.SkipFrame(); // RoutingId
45-
var signature = raw.ReceiveFrameBytes();
46-
47-
if (signature.Length == 2)
48-
Assert.Equal(new byte[] {3, 5}, signature);
49-
else if (signature.Length == 1)
50-
{
51-
// Receive rest of the greeting
52-
raw.SkipFrame(); // RoutingId
53-
signature = raw.ReceiveFrameBytes();
54-
Assert.Equal(new byte[] {5}, signature);
55-
}
56-
57-
// Receive the identity
58-
raw.SkipFrame(); // RoutingId
59-
var identity = raw.ReceiveFrameBytes();
60-
Assert.Equal(new byte[] {0, 0}, identity);
61-
62-
// Receiving msg send by the raw
63-
var msg = socket.ReceiveFrameBytes();
64-
Assert.Equal(new byte[] {5}, msg);
65-
66-
// Sending msg from socket to raw
67-
socket.SendFrame(new byte[]{6});
68-
raw.SkipFrame(); // RoutingId
69-
msg = raw.ReceiveFrameBytes();
70-
Assert.Equal(new byte[]{0,1,6}, msg);
71-
}
46+
// preamble
47+
ReadRawXBytes(raw, 10);
48+
49+
raw.Send(new byte[] { 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 1, 5 }); // Signature
50+
raw.Send(new byte[] { 0, 0 }); // Empty Identity
51+
raw.Send(new byte[] { 0, 1, 5 }); // One byte message
52+
53+
// Receive rest of the greeting
54+
var signature = ReadRawXBytes(raw, 2);
55+
Assert.Equal(new byte[] { 3, 5 }, signature);
56+
57+
// Receive the identity
58+
var identity = ReadRawXBytes(raw, 2);
59+
Assert.Equal(new byte[] { 0, 0 }, identity);
60+
61+
// Receiving msg send by the raw
62+
var msg = socket.ReceiveFrameBytes();
63+
Assert.Equal(new byte[] { 5 }, msg);
64+
65+
// Sending msg from socket to raw
66+
socket.SendFrame(new byte[] { 6 });
67+
msg = ReadRawXBytes(raw, 3);
68+
Assert.Equal(new byte[] { 0, 1, 6 }, msg);
7269
}
7370

7471
[Fact]
@@ -79,14 +76,14 @@ public void HeartbeatEnabled()
7976
{
8077
sub.Options.HeartbeatInterval = TimeSpan.FromMilliseconds(10);
8178
sub.Options.HeartbeatTimeout = TimeSpan.FromMilliseconds(1);
82-
79+
8380
int port = pub.BindRandomPort("tcp://*");
8481
sub.Connect($"tcp://localhost:{port}");
85-
82+
8683
Thread.Sleep(3000);
8784
}
8885
}
89-
86+
9087
[Fact]
9188
public void V3Test()
9289
{
@@ -104,56 +101,62 @@ public void V3Test()
104101
raw.SendMoreFrame(routingId).SendFrame(
105102
new byte[64]
106103
{
107-
0xff,0,0,0,0,0,0,0,0,0x7f,3,0, (byte)'N', (byte)'U', (byte)'L', (byte)'L',
108-
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
104+
0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0, (byte) 'N', (byte) 'U', (byte) 'L', (byte) 'L',
105+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
106+
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
109107
}); // V3 Greeting
110108
raw.SendMoreFrame(routingId).SendFrame(
111109
new byte[43]
112110
{
113111
4, 41,
114-
5, (byte)'R', (byte)'E',(byte)'A',(byte)'D',(byte)'Y',
115-
11, (byte)'S',(byte)'o',(byte)'c',(byte)'k',(byte)'e',(byte)'t',(byte)'-',(byte)'T',(byte)'y',(byte)'p',(byte)'e',
116-
0,0,0,6,(byte)'D',(byte)'E',(byte)'A',(byte)'L',(byte)'E',(byte)'R',
117-
8,(byte)'I',(byte)'d',(byte)'e',(byte)'n',(byte)'t',(byte)'i',(byte)'t',(byte)'y',
118-
0,0,0,0
112+
5, (byte) 'R', (byte) 'E', (byte) 'A', (byte) 'D', (byte) 'Y',
113+
11, (byte) 'S', (byte) 'o', (byte) 'c', (byte) 'k', (byte) 'e', (byte) 't', (byte) '-',
114+
(byte) 'T', (byte) 'y', (byte) 'p', (byte) 'e',
115+
0, 0, 0, 6, (byte) 'D', (byte) 'E', (byte) 'A', (byte) 'L', (byte) 'E', (byte) 'R',
116+
8, (byte) 'I', (byte) 'd', (byte) 'e', (byte) 'n', (byte) 't', (byte) 'i', (byte) 't',
117+
(byte) 'y',
118+
0, 0, 0, 0
119119
}); // Ready Command
120-
120+
121121
int read = 0;
122122
while (read < 64 + 43 - 10)
123123
{
124124
raw.SkipFrame(); // RoutingId
125125
var bytes = raw.ReceiveFrameBytes();
126126
read += bytes.Length;
127127
}
128-
129-
raw.SendMoreFrame(routingId).SendFrame(new byte[] {0, 1, 5}); // One byte message
130-
128+
129+
raw.SendMoreFrame(routingId).SendFrame(new byte[] { 0, 1, 5 }); // One byte message
130+
131131
// Receiving msg send by the raw
132132
var msg = socket.ReceiveFrameBytes();
133-
Assert.Equal(new byte[] {5}, msg);
134-
133+
Assert.Equal(new byte[] { 5 }, msg);
134+
135135
// Sending msg from socket to raw
136-
socket.SendFrame(new byte[]{6});
136+
socket.SendFrame(new byte[] { 6 });
137137
raw.SkipFrame(); // RoutingId
138138
msg = raw.ReceiveFrameBytes();
139-
Assert.Equal(new byte[]{0,1,6}, msg);
140-
139+
Assert.Equal(new byte[] { 0, 1, 6 }, msg);
140+
141141
// Sending ping message
142-
raw.SendMoreFrame(routingId).SendFrame(new byte[11] {4, 9, 4, (byte)'P', (byte)'I', (byte)'N', (byte)'G', 0, 0,(byte)'H', (byte)'I'});
143-
142+
raw.SendMoreFrame(routingId).SendFrame(new byte[11]
143+
{ 4, 9, 4, (byte) 'P', (byte) 'I', (byte) 'N', (byte) 'G', 0, 0, (byte) 'H', (byte) 'I' });
144+
144145
// Receive pong
145146
raw.SkipFrame(); // RoutingId
146147
var ping = raw.ReceiveFrameBytes();
147-
Assert.Equal(new byte[9] {4,7,4,(byte)'P', (byte)'O', (byte)'N', (byte)'G', (byte)'H', (byte)'I'}, ping);
148-
148+
Assert.Equal(
149+
new byte[9] { 4, 7, 4, (byte) 'P', (byte) 'O', (byte) 'N', (byte) 'G', (byte) 'H', (byte) 'I' },
150+
ping);
151+
149152
// We should receive ping now
150153
raw.SkipFrame();
151154
ping = raw.ReceiveFrameBytes();
152-
Assert.Equal(new byte[9] {4,7,4,(byte)'P', (byte)'I', (byte)'N', (byte)'G', 0, 0}, ping);
155+
Assert.Equal(new byte[9] { 4, 7, 4, (byte) 'P', (byte) 'I', (byte) 'N', (byte) 'G', 0, 0 }, ping);
153156
}
154157
}
155158

156-
#if NET47
159+
#if NET47
157160
[Fact]
158161
public void WithLibzmq()
159162
{
@@ -174,6 +177,6 @@ public void WithLibzmq()
174177
}
175178
}
176179

177-
#endif
180+
#endif
178181
}
179182
}

0 commit comments

Comments
 (0)