Skip to content

Commit ca1a3a8

Browse files
authored
Merge pull request #885 from somdoron/sockets
Radio-Dish and Scatter-gather
2 parents 1e47aa0 + 2306fdb commit ca1a3a8

17 files changed

+1650
-3
lines changed

src/NetMQ.Tests/RadioDish.cs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
using System;
2+
using System.Threading;
3+
using NetMQ.Sockets;
4+
using Xunit;
5+
using Xunit.Abstractions;
6+
7+
namespace NetMQ.Tests
8+
{
9+
public class RadioDish
10+
{
11+
[Fact]
12+
public void TestTcp()
13+
{
14+
using var radio = new RadioSocket();
15+
using var dish = new DishSocket();
16+
dish.Join("2");
17+
18+
int port = radio.BindRandomPort("tcp://*");
19+
dish.Connect($"tcp://127.0.0.1:{port}");
20+
21+
Thread.Sleep(100);
22+
23+
radio.Send("1", "HELLO");
24+
radio.Send("2", "HELLO");
25+
26+
var (group,msg) = dish.ReceiveString();
27+
Assert.Equal("2", group);
28+
Assert.Equal("HELLO", msg);
29+
}
30+
31+
[Fact]
32+
public void TestBlocking()
33+
{
34+
using var radio = new RadioSocket();
35+
using var dish = new DishSocket();
36+
dish.Join("2");
37+
38+
radio.Bind("inproc://test-radio-dish");
39+
dish.Connect("inproc://test-radio-dish");
40+
41+
radio.Send("1", "HELLO");
42+
radio.Send("2", "HELLO");
43+
44+
var (group,msg) = dish.ReceiveString();
45+
Assert.Equal("2", group);
46+
Assert.Equal("HELLO", msg);
47+
}
48+
49+
[Fact]
50+
public async void TestAsync()
51+
{
52+
using var radio = new RadioSocket();
53+
using var dish = new DishSocket();
54+
dish.Join("2");
55+
56+
int port = radio.BindRandomPort("tcp://*");
57+
dish.Connect($"tcp://127.0.0.1:{port}");
58+
59+
Thread.Sleep(100);
60+
61+
await radio.SendAsync("1", "HELLO");
62+
await radio.SendAsync("2", "HELLO");
63+
64+
var (group,msg) = await dish.ReceiveStringAsync();
65+
Assert.Equal("2", group);
66+
Assert.Equal("HELLO", msg);
67+
}
68+
}
69+
}

src/NetMQ.Tests/ScatterGather.cs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
using System.Threading;
2+
using NetMQ.Sockets;
3+
using Xunit;
4+
5+
namespace NetMQ.Tests
6+
{
7+
public class ScatterGather
8+
{
9+
[Fact]
10+
public void TestTcp()
11+
{
12+
using var scatter = new ScatterSocket();
13+
using var gather = new GatherSocket();
14+
15+
int port = scatter.BindRandomPort("tcp://*");
16+
gather.Connect($"tcp://127.0.0.1:{port}");
17+
18+
scatter.Send("1");
19+
scatter.Send("2");
20+
21+
var m1 = gather.ReceiveString();
22+
Assert.Equal("1", m1);
23+
24+
var m2 = gather.ReceiveString();
25+
Assert.Equal("2", m2);
26+
}
27+
28+
[Fact]
29+
public void TestBlocking()
30+
{
31+
using var scatter = new ScatterSocket();
32+
using var gather = new GatherSocket();
33+
using var gather2 = new GatherSocket();
34+
35+
scatter.Bind("inproc://test-scatter-gather");
36+
gather.Connect("inproc://test-scatter-gather");
37+
gather2.Connect("inproc://test-scatter-gather");
38+
39+
scatter.Send("1");
40+
scatter.Send("2");
41+
42+
var m1 = gather.ReceiveString();
43+
Assert.Equal("1", m1);
44+
45+
var m2 = gather2.ReceiveString();
46+
Assert.Equal("2", m2);
47+
}
48+
49+
[Fact]
50+
public async void TestAsync()
51+
{
52+
using var scatter = new ScatterSocket();
53+
using var gather = new GatherSocket();
54+
using var gather2 = new GatherSocket();
55+
56+
scatter.Bind("inproc://test-scatter-gather");
57+
gather.Connect("inproc://test-scatter-gather");
58+
gather2.Connect("inproc://test-scatter-gather");
59+
60+
await scatter.SendAsync("1");
61+
await scatter.SendAsync("2");
62+
63+
var m1 = await gather.ReceiveStringAsync();
64+
Assert.Equal("1", m1);
65+
66+
var m2 = await gather2.ReceiveStringAsync();
67+
Assert.Equal("2", m2);
68+
}
69+
}
70+
}

src/NetMQ/Core/Mechanisms/Mechanism.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ static class SocketNames
3333
public const string Peer = "PEER";
3434
public const string Server = "SERVER";
3535
public const string Client = "CLIENT";
36+
public const string Radio = "RADIO";
37+
public const string Dish = "DISH";
38+
public const string Gather = "GATHER";
39+
public const string Scatter = "SCATTER";
3640
}
3741

3842
const int NameLengthSize = sizeof(byte);
@@ -125,6 +129,14 @@ protected string GetSocketName(ZmqSocketType socketType)
125129
return SocketNames.Server;
126130
case ZmqSocketType.Client:
127131
return SocketNames.Client;
132+
case ZmqSocketType.Radio:
133+
return SocketNames.Radio;
134+
case ZmqSocketType.Dish:
135+
return SocketNames.Dish;
136+
case ZmqSocketType.Gather:
137+
return SocketNames.Gather;
138+
case ZmqSocketType.Scatter:
139+
return SocketNames.Scatter;
128140
default:
129141
throw new ArgumentOutOfRangeException(nameof(socketType), socketType, null);
130142
}
@@ -312,6 +324,14 @@ private bool CheckSocketType(string type)
312324
return type == SocketNames.Server;
313325
case ZmqSocketType.Peer:
314326
return type == SocketNames.Peer;
327+
case ZmqSocketType.Radio:
328+
return type == SocketNames.Dish;
329+
case ZmqSocketType.Dish:
330+
return type == SocketNames.Radio;
331+
case ZmqSocketType.Gather:
332+
return type == SocketNames.Scatter;
333+
case ZmqSocketType.Scatter:
334+
return type == SocketNames.Gather;
315335
default:
316336
return false;
317337
}

0 commit comments

Comments
 (0)