Skip to content

Commit 2306fdb

Browse files
committed
problem: no support for radio-dish pattern
1 parent 7d44a87 commit 2306fdb

File tree

12 files changed

+1357
-2
lines changed

12 files changed

+1357
-2
lines changed

src/NetMQ.Tests/RadioDish.cs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
using System;
2+
using System.Threading;
3+
using NetMQ.Sockets;
4+
using Xunit;
5+
using Xunit.Abstractions;
6+
7+
namespace NetMQ.Tests
8+
{
9+
public class RadioDish
10+
{
11+
[Fact]
12+
public void TestTcp()
13+
{
14+
using var radio = new RadioSocket();
15+
using var dish = new DishSocket();
16+
dish.Join("2");
17+
18+
int port = radio.BindRandomPort("tcp://*");
19+
dish.Connect($"tcp://127.0.0.1:{port}");
20+
21+
Thread.Sleep(100);
22+
23+
radio.Send("1", "HELLO");
24+
radio.Send("2", "HELLO");
25+
26+
var (group,msg) = dish.ReceiveString();
27+
Assert.Equal("2", group);
28+
Assert.Equal("HELLO", msg);
29+
}
30+
31+
[Fact]
32+
public void TestBlocking()
33+
{
34+
using var radio = new RadioSocket();
35+
using var dish = new DishSocket();
36+
dish.Join("2");
37+
38+
radio.Bind("inproc://test-radio-dish");
39+
dish.Connect("inproc://test-radio-dish");
40+
41+
radio.Send("1", "HELLO");
42+
radio.Send("2", "HELLO");
43+
44+
var (group,msg) = dish.ReceiveString();
45+
Assert.Equal("2", group);
46+
Assert.Equal("HELLO", msg);
47+
}
48+
49+
[Fact]
50+
public async void TestAsync()
51+
{
52+
using var radio = new RadioSocket();
53+
using var dish = new DishSocket();
54+
dish.Join("2");
55+
56+
int port = radio.BindRandomPort("tcp://*");
57+
dish.Connect($"tcp://127.0.0.1:{port}");
58+
59+
Thread.Sleep(100);
60+
61+
await radio.SendAsync("1", "HELLO");
62+
await radio.SendAsync("2", "HELLO");
63+
64+
var (group,msg) = await dish.ReceiveStringAsync();
65+
Assert.Equal("2", group);
66+
Assert.Equal("HELLO", msg);
67+
}
68+
}
69+
}

src/NetMQ/Core/Mechanisms/Mechanism.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ static class SocketNames
3333
public const string Peer = "PEER";
3434
public const string Server = "SERVER";
3535
public const string Client = "CLIENT";
36+
public const string Radio = "RADIO";
37+
public const string Dish = "DISH";
3638
public const string Gather = "GATHER";
3739
public const string Scatter = "SCATTER";
3840
}
@@ -127,6 +129,10 @@ protected string GetSocketName(ZmqSocketType socketType)
127129
return SocketNames.Server;
128130
case ZmqSocketType.Client:
129131
return SocketNames.Client;
132+
case ZmqSocketType.Radio:
133+
return SocketNames.Radio;
134+
case ZmqSocketType.Dish:
135+
return SocketNames.Dish;
130136
case ZmqSocketType.Gather:
131137
return SocketNames.Gather;
132138
case ZmqSocketType.Scatter:
@@ -318,6 +324,10 @@ private bool CheckSocketType(string type)
318324
return type == SocketNames.Server;
319325
case ZmqSocketType.Peer:
320326
return type == SocketNames.Peer;
327+
case ZmqSocketType.Radio:
328+
return type == SocketNames.Dish;
329+
case ZmqSocketType.Dish:
330+
return type == SocketNames.Radio;
321331
case ZmqSocketType.Gather:
322332
return type == SocketNames.Scatter;
323333
case ZmqSocketType.Scatter:

src/NetMQ/Core/Patterns/Dish.cs

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
#nullable enable
2+
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Text;
6+
using NetMQ.Core.Patterns.Utils;
7+
8+
namespace NetMQ.Core.Patterns
9+
{
10+
internal class Dish : SocketBase
11+
{
12+
// Fair queueing object for inbound pipes.
13+
private readonly FairQueueing m_fairQueueing = new FairQueueing();
14+
15+
// Object for distributing the subscriptions upstream.
16+
private readonly Distribution m_distribution = new Distribution();
17+
18+
// The repository of subscriptions.
19+
private readonly HashSet<string> m_subscriptions = new HashSet<string>();
20+
21+
// If true, 'message' contains a matching message to return on the
22+
// next recv call.
23+
private bool m_hasMessage = false;
24+
private Msg m_message = new Msg();
25+
26+
public Dish(Ctx parent, int threadId, int socketId) : base(parent, threadId, socketId,true)
27+
{
28+
m_options.SocketType = ZmqSocketType.Dish;
29+
30+
// When socket is being closed down we don't want to wait till pending
31+
// subscription commands are sent to the wire.
32+
m_options.Linger = 0;
33+
34+
m_message.InitEmpty();
35+
}
36+
37+
public override void Destroy()
38+
{
39+
base.Destroy();
40+
m_message.Close();
41+
}
42+
43+
protected override void XAttachPipe(Pipe pipe, bool icanhasall)
44+
{
45+
m_fairQueueing.Attach(pipe);
46+
m_distribution.Attach(pipe);
47+
48+
// Send all the cached subscriptions to the new upstream peer.
49+
SendSubscriptions(pipe);
50+
}
51+
52+
protected override void XReadActivated(Pipe pipe)
53+
{
54+
m_fairQueueing.Activated(pipe);
55+
}
56+
57+
protected override void XWriteActivated(Pipe pipe)
58+
{
59+
m_distribution.Activated(pipe);
60+
}
61+
62+
protected override void XTerminated(Pipe pipe)
63+
{
64+
m_fairQueueing.Terminated(pipe);
65+
m_distribution.Terminated(pipe);
66+
}
67+
68+
protected override void XHiccuped(Pipe pipe)
69+
{
70+
// Send all the cached subscriptions to the hiccuped pipe.
71+
SendSubscriptions(pipe);
72+
}
73+
74+
protected override void XJoin(string @group)
75+
{
76+
if (group.Length > Msg.MaxGroupLength)
77+
throw new InvalidException("Group maximum length is 255");
78+
79+
// User cannot join same group twice
80+
if (!m_subscriptions.Add(@group))
81+
throw new InvalidException("Group was already joined");
82+
83+
Msg msg = new Msg();
84+
msg.InitJoin();
85+
msg.Group = group;
86+
87+
m_distribution.SendToAll(ref msg);
88+
msg.Close();
89+
}
90+
91+
protected override void XLeave(string @group)
92+
{
93+
if (group.Length > Msg.MaxGroupLength)
94+
throw new InvalidException("Group maximum length is 255");
95+
96+
if (!m_subscriptions.Remove(@group))
97+
throw new InvalidException("Socket didn't join group");
98+
99+
Msg msg = new Msg();
100+
msg.InitLeave();
101+
msg.Group = group;
102+
103+
m_distribution.SendToAll(ref msg);
104+
msg.Close();
105+
}
106+
107+
protected override bool XSend(ref Msg msg)
108+
{
109+
throw new NotSupportedException("XSend not supported on Dish socket");
110+
}
111+
112+
protected override bool XHasOut() => true; // Subscription can be added/removed anytime.
113+
114+
protected override bool XRecv(ref Msg msg)
115+
{
116+
// If there's already a message prepared by a previous call to poll,
117+
// return it straight ahead.
118+
if (m_hasMessage)
119+
{
120+
msg.Move(ref m_message);
121+
m_hasMessage = false;
122+
return true;
123+
}
124+
125+
return XXRecv(ref msg);
126+
}
127+
128+
bool XXRecv(ref Msg msg)
129+
{
130+
// Get a message using fair queueing algorithm.
131+
bool received = m_fairQueueing.Recv(ref msg);
132+
133+
// If there's no message available, return immediately.
134+
// The same when error occurs.
135+
if (!received)
136+
return false;
137+
138+
// Skip non matching messages
139+
while (!m_subscriptions.Contains(msg.Group))
140+
{
141+
received = m_fairQueueing.Recv(ref msg);
142+
if (!received)
143+
return false;
144+
}
145+
146+
// Found a matching message
147+
return true;
148+
}
149+
150+
protected override bool XHasIn()
151+
{
152+
// If there's already a message prepared by a previous call to zmq_poll,
153+
// return straight ahead.
154+
if (m_hasMessage)
155+
return true;
156+
157+
var received = XXRecv(ref m_message);
158+
if (!received)
159+
return false;
160+
161+
// Matching message found
162+
m_hasMessage = true;
163+
return true;
164+
}
165+
166+
private void SendSubscriptions(Pipe pipe)
167+
{
168+
foreach (var subscription in m_subscriptions)
169+
{
170+
Msg msg = new Msg();
171+
msg.InitJoin();
172+
msg.Group = subscription;
173+
174+
// Send it to the pipe.
175+
pipe.Write(ref msg);
176+
}
177+
178+
pipe.Flush();
179+
}
180+
181+
internal class DishSession : SessionBase
182+
{
183+
enum State
184+
{
185+
Group,
186+
Body
187+
}
188+
189+
private State m_state = State.Group;
190+
private string m_group = String.Empty;
191+
192+
public DishSession(IOThread ioThread, bool connect, SocketBase socket, Options options, Address addr) : base(ioThread, connect, socket, options, addr)
193+
{
194+
}
195+
196+
public override PushMsgResult PushMsg(ref Msg msg)
197+
{
198+
switch (m_state)
199+
{
200+
case State.Group:
201+
if (!msg.HasMore)
202+
return PushMsgResult.Error;
203+
204+
if (msg.Size > Msg.MaxGroupLength)
205+
return PushMsgResult.Error;
206+
207+
m_group = msg.GetString(Encoding.ASCII);
208+
m_state = State.Body;
209+
210+
msg.Close();
211+
msg.InitEmpty();
212+
213+
return PushMsgResult.Ok;
214+
case State.Body:
215+
// Set the message group
216+
msg.Group = m_group;
217+
218+
// Thread safe socket doesn't support multipart messages
219+
if (msg.HasMore)
220+
return PushMsgResult.Error;
221+
222+
// Push message to dish socket
223+
var result = base.PushMsg(ref msg);
224+
if (result == PushMsgResult.Ok)
225+
m_state = State.Group;
226+
227+
return result;
228+
default:
229+
throw new ArgumentOutOfRangeException();
230+
}
231+
}
232+
233+
public override PullMsgResult PullMsg(ref Msg msg)
234+
{
235+
var result = base.PullMsg(ref msg);
236+
if (result != PullMsgResult.Ok)
237+
return result;
238+
239+
if (!msg.IsJoin && !msg.IsLeave)
240+
return PullMsgResult.Ok;
241+
242+
Msg command = new Msg();
243+
int offset;
244+
245+
if (msg.IsJoin)
246+
{
247+
command.InitPool(msg.Group.Length + 5);
248+
offset = 5;
249+
command[0] = 4;
250+
command.Put(Encoding.ASCII, "JOIN", 1);
251+
}
252+
else
253+
{
254+
command.InitPool(msg.Group.Length + 6);
255+
offset = 6;
256+
command[0] = 5;
257+
command.Put(Encoding.ASCII, "LEAVE", 1);
258+
}
259+
260+
command.SetFlags(MsgFlags.Command);
261+
262+
// Copy the group
263+
command.Put(Encoding.ASCII, msg.Group, offset);
264+
265+
// Close the join message
266+
msg.Close();
267+
268+
msg = command;
269+
270+
return PullMsgResult.Ok;
271+
}
272+
273+
protected override void Reset()
274+
{
275+
base.Reset();
276+
m_state = State.Group;
277+
}
278+
}
279+
}
280+
}

0 commit comments

Comments
 (0)