Skip to content

Commit 7d44a87

Browse files
committed
problem: NetMQ doesn't support Scatter-Gather pattern
1 parent 1e47aa0 commit 7d44a87

File tree

9 files changed

+293
-1
lines changed

9 files changed

+293
-1
lines changed

src/NetMQ.Tests/ScatterGather.cs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
using System.Threading;
2+
using NetMQ.Sockets;
3+
using Xunit;
4+
5+
namespace NetMQ.Tests
6+
{
7+
public class ScatterGather
8+
{
9+
[Fact]
10+
public void TestTcp()
11+
{
12+
using var scatter = new ScatterSocket();
13+
using var gather = new GatherSocket();
14+
15+
int port = scatter.BindRandomPort("tcp://*");
16+
gather.Connect($"tcp://127.0.0.1:{port}");
17+
18+
scatter.Send("1");
19+
scatter.Send("2");
20+
21+
var m1 = gather.ReceiveString();
22+
Assert.Equal("1", m1);
23+
24+
var m2 = gather.ReceiveString();
25+
Assert.Equal("2", m2);
26+
}
27+
28+
[Fact]
29+
public void TestBlocking()
30+
{
31+
using var scatter = new ScatterSocket();
32+
using var gather = new GatherSocket();
33+
using var gather2 = new GatherSocket();
34+
35+
scatter.Bind("inproc://test-scatter-gather");
36+
gather.Connect("inproc://test-scatter-gather");
37+
gather2.Connect("inproc://test-scatter-gather");
38+
39+
scatter.Send("1");
40+
scatter.Send("2");
41+
42+
var m1 = gather.ReceiveString();
43+
Assert.Equal("1", m1);
44+
45+
var m2 = gather2.ReceiveString();
46+
Assert.Equal("2", m2);
47+
}
48+
49+
[Fact]
50+
public async void TestAsync()
51+
{
52+
using var scatter = new ScatterSocket();
53+
using var gather = new GatherSocket();
54+
using var gather2 = new GatherSocket();
55+
56+
scatter.Bind("inproc://test-scatter-gather");
57+
gather.Connect("inproc://test-scatter-gather");
58+
gather2.Connect("inproc://test-scatter-gather");
59+
60+
await scatter.SendAsync("1");
61+
await scatter.SendAsync("2");
62+
63+
var m1 = await gather.ReceiveStringAsync();
64+
Assert.Equal("1", m1);
65+
66+
var m2 = await gather2.ReceiveStringAsync();
67+
Assert.Equal("2", m2);
68+
}
69+
}
70+
}

src/NetMQ/Core/Mechanisms/Mechanism.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ static class SocketNames
3333
public const string Peer = "PEER";
3434
public const string Server = "SERVER";
3535
public const string Client = "CLIENT";
36+
public const string Gather = "GATHER";
37+
public const string Scatter = "SCATTER";
3638
}
3739

3840
const int NameLengthSize = sizeof(byte);
@@ -125,6 +127,10 @@ protected string GetSocketName(ZmqSocketType socketType)
125127
return SocketNames.Server;
126128
case ZmqSocketType.Client:
127129
return SocketNames.Client;
130+
case ZmqSocketType.Gather:
131+
return SocketNames.Gather;
132+
case ZmqSocketType.Scatter:
133+
return SocketNames.Scatter;
128134
default:
129135
throw new ArgumentOutOfRangeException(nameof(socketType), socketType, null);
130136
}
@@ -312,6 +318,10 @@ private bool CheckSocketType(string type)
312318
return type == SocketNames.Server;
313319
case ZmqSocketType.Peer:
314320
return type == SocketNames.Peer;
321+
case ZmqSocketType.Gather:
322+
return type == SocketNames.Scatter;
323+
case ZmqSocketType.Scatter:
324+
return type == SocketNames.Gather;
315325
default:
316326
return false;
317327
}

src/NetMQ/Core/Patterns/Gather.cs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#nullable enable
2+
3+
using System.Diagnostics;
4+
using NetMQ.Core.Patterns.Utils;
5+
6+
namespace NetMQ.Core.Patterns
7+
{
8+
internal sealed class Gather : SocketBase
9+
{
10+
/// <summary>
11+
/// Fair queueing object for inbound pipes.
12+
/// </summary>
13+
private readonly FairQueueing m_fairQueueing;
14+
15+
public Gather(Ctx parent, int threadId, int socketId)
16+
: base(parent, threadId, socketId)
17+
{
18+
m_options.SocketType = ZmqSocketType.Gather;
19+
20+
m_fairQueueing = new FairQueueing();
21+
}
22+
23+
/// <summary>
24+
/// Register the pipe with this socket.
25+
/// </summary>
26+
/// <param name="pipe">the Pipe to attach</param>
27+
/// <param name="icanhasall">not used</param>
28+
protected override void XAttachPipe(Pipe pipe, bool icanhasall)
29+
{
30+
Debug.Assert(pipe != null);
31+
m_fairQueueing.Attach(pipe);
32+
}
33+
34+
/// <summary>
35+
/// Indicate the given pipe as being ready for reading by this socket.
36+
/// </summary>
37+
/// <param name="pipe">the <c>Pipe</c> that is now becoming available for reading</param>
38+
protected override void XReadActivated(Pipe pipe)
39+
{
40+
m_fairQueueing.Activated(pipe);
41+
}
42+
43+
/// <summary>
44+
/// This is an override of the abstract method that gets called to signal that the given pipe is to be removed from this socket.
45+
/// </summary>
46+
/// <param name="pipe">the Pipe that is being removed</param>
47+
protected override void XTerminated(Pipe pipe)
48+
{
49+
m_fairQueueing.Terminated(pipe);
50+
}
51+
52+
/// <summary>
53+
/// Receive a message. The <c>Recv</c> method calls this lower-level method to do the actual receiving.
54+
/// </summary>
55+
/// <param name="msg">the <c>Msg</c> to receive the message into</param>
56+
/// <returns><c>true</c> if the message was received successfully, <c>false</c> if there were no messages to receive</returns>
57+
protected override bool XRecv(ref Msg msg)
58+
{
59+
bool received = m_fairQueueing.Recv(ref msg);
60+
61+
// Drop any messages with more flag
62+
while (received && msg.HasMore)
63+
{
64+
// drop all frames of the current multi-frame message
65+
received = m_fairQueueing.Recv(ref msg);
66+
67+
while (received && msg.HasMore)
68+
received = m_fairQueueing.Recv(ref msg);
69+
70+
// get the new message
71+
if (received)
72+
received = m_fairQueueing.Recv(ref msg);
73+
}
74+
75+
if (!received)
76+
return false;
77+
78+
return true;
79+
}
80+
81+
protected override bool XHasIn()
82+
{
83+
return m_fairQueueing.HasIn();
84+
}
85+
}
86+
}

src/NetMQ/Core/Patterns/Scatter.cs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#nullable enable
2+
3+
using System.Diagnostics;
4+
using NetMQ.Core.Patterns.Utils;
5+
6+
namespace NetMQ.Core.Patterns
7+
{
8+
internal sealed class Scatter : SocketBase
9+
{
10+
/// <summary>
11+
/// Load balancer managing the outbound pipes.
12+
/// </summary>
13+
private readonly LoadBalancer m_loadBalancer;
14+
15+
public Scatter(Ctx parent, int threadId, int socketId)
16+
: base(parent, threadId, socketId, true)
17+
{
18+
m_options.SocketType = ZmqSocketType.Scatter;
19+
20+
m_loadBalancer = new LoadBalancer();
21+
}
22+
23+
/// <summary>
24+
/// Register the pipe with this socket.
25+
/// </summary>
26+
/// <param name="pipe">the Pipe to attach</param>
27+
/// <param name="icanhasall">not used</param>
28+
protected override void XAttachPipe(Pipe pipe, bool icanhasall)
29+
{
30+
Debug.Assert(pipe != null);
31+
32+
// Don't delay pipe termination as there is no one
33+
// to receive the delimiter.
34+
pipe.SetNoDelay();
35+
36+
m_loadBalancer.Attach(pipe);
37+
}
38+
39+
/// <summary>
40+
/// Indicate the given pipe as being ready for writing to by this socket.
41+
/// This gets called by the WriteActivated method.
42+
/// </summary>
43+
/// <param name="pipe">the <c>Pipe</c> that is now becoming available for writing</param>
44+
protected override void XWriteActivated(Pipe pipe)
45+
{
46+
m_loadBalancer.Activated(pipe);
47+
}
48+
49+
/// <summary>
50+
/// This is an override of the abstract method that gets called to signal that the given pipe is to be removed from this socket.
51+
/// </summary>
52+
/// <param name="pipe">the Pipe that is being removed</param>
53+
protected override void XTerminated(Pipe pipe)
54+
{
55+
m_loadBalancer.Terminated(pipe);
56+
}
57+
58+
/// <summary>
59+
/// Transmit the given message. The <c>Send</c> method calls this to do the actual sending.
60+
/// </summary>
61+
/// <param name="msg">the message to transmit</param>
62+
/// <returns><c>true</c> if the message was sent successfully</returns>
63+
protected override bool XSend(ref Msg msg)
64+
{
65+
if (msg.HasMore)
66+
throw new InvalidException("SCATTER sockets do not allow multipart data (ZMQ_SNDMORE)");
67+
68+
return m_loadBalancer.Send(ref msg);
69+
}
70+
71+
protected override bool XHasOut()
72+
{
73+
return m_loadBalancer.HasOut();
74+
}
75+
}
76+
}

src/NetMQ/Core/SessionBase.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ public static SessionBase Create([NotNull] IOThread ioThread, bool connect, [Not
142142
case ZmqSocketType.Stream:
143143
case ZmqSocketType.Peer:
144144
case ZmqSocketType.Server:
145-
case ZmqSocketType.Client:
145+
case ZmqSocketType.Client:
146+
case ZmqSocketType.Gather:
147+
case ZmqSocketType.Scatter:
146148
if (options.CanSendHelloMsg && options.HelloMsg != null)
147149
return new HelloMsgSession(ioThread, connect, socket, options, addr);
148150
else

src/NetMQ/Core/SocketBase.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,10 @@ public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int th
211211
return new Server(parent, threadId, socketId);
212212
case ZmqSocketType.Client:
213213
return new Client(parent, threadId, socketId);
214+
case ZmqSocketType.Gather:
215+
return new Gather(parent, threadId, socketId);
216+
case ZmqSocketType.Scatter:
217+
return new Scatter(parent, threadId, socketId);
214218
default:
215219
throw new InvalidException("SocketBase.Create called with invalid type of " + type);
216220
}

src/NetMQ/Sockets/GatherSocket.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#nullable enable
2+
3+
namespace NetMQ.Sockets
4+
{
5+
/// <summary>
6+
/// Gather socket, thread-safe alternative for Pull socket
7+
/// </summary>
8+
public class GatherSocket : ThreadSafeSocket, IThreadSafeInSocket
9+
{
10+
/// <summary>
11+
/// Create a new Gather Socket.
12+
/// </summary>
13+
public GatherSocket() : base(ZmqSocketType.Gather)
14+
{
15+
}
16+
}
17+
}

src/NetMQ/Sockets/ScatterSocket.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#nullable enable
2+
3+
namespace NetMQ.Sockets
4+
{
5+
/// <summary>
6+
/// Scatter socket, thread-safe alternative for Push socket
7+
/// </summary>
8+
public class ScatterSocket : ThreadSafeSocket, IThreadSafeOutSocket
9+
{
10+
/// <summary>
11+
/// Create a new Scatter Socket.
12+
/// </summary>
13+
public ScatterSocket() : base(ZmqSocketType.Scatter)
14+
{
15+
}
16+
}
17+
}

src/NetMQ/ZmqSocketType.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ public enum ZmqSocketType
8383
/// </summary>
8484
Client = 13,
8585

86+
/// <summary>
87+
/// This denotes an Gather socket.
88+
/// </summary>
89+
Gather = 16,
90+
91+
/// <summary>
92+
/// This denotes an Scatter socket.
93+
/// </summary>
94+
Scatter = 17,
95+
8696
/// <summary>
8797
/// This denotes a Peer socket.
8898
/// </summary>

0 commit comments

Comments
 (0)