Skip to content

Commit e32655a

Browse files
authored
Merge pull request #869 from dhyams/relaxed_correlate_866
Added support for ZMQ_REQ_CORRELATE and ZMQ_REQ_RELAXED socket options
2 parents 072f7a6 + b7da9bc commit e32655a

File tree

11 files changed

+445
-65
lines changed

11 files changed

+445
-65
lines changed

src/NetMQ.Tests/ReqRepTests.cs

Lines changed: 176 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,50 +7,131 @@ public class ReqRepTests : IClassFixture<CleanupAfterFixture>
77
{
88
public ReqRepTests() => NetMQConfig.Cleanup();
99

10+
protected void SimpleReqRepSequence(string address, RequestSocket req, ResponseSocket rep)
11+
{
12+
var port = rep.BindRandomPort(address);
13+
req.Connect(address + ":" + port);
14+
15+
req.SendFrame("Hi");
16+
Assert.Equal(new[] { "Hi" }, rep.ReceiveMultipartStrings());
17+
rep.SendFrame("Hi2");
18+
Assert.Equal(new[] { "Hi2" }, req.ReceiveMultipartStrings());
19+
}
20+
21+
protected void SendTwoReqsInSuccession(string address, RequestSocket req, ResponseSocket rep)
22+
{
23+
var port = rep.BindRandomPort(address);
24+
req.Connect(address + ":" + port);
25+
req.SendFrame("Hi");
26+
rep.SkipFrame();
27+
}
28+
29+
[Theory]
30+
[InlineData("tcp://localhost")]
31+
[InlineData("tcp://127.0.0.1")]
32+
public void SimpleReqRepSucceeds(string address)
33+
{
34+
using (var rep = new ResponseSocket())
35+
using (var req = new RequestSocket())
36+
{
37+
SimpleReqRepSequence(address, req, rep);
38+
}
39+
}
40+
41+
[Theory]
42+
[InlineData("tcp://localhost")]
43+
[InlineData("tcp://127.0.0.1")]
44+
public void SimpleReqRepWithCorrelationSucceeds(string address)
45+
{
46+
using (var rep = new ResponseSocket())
47+
using (var req = new RequestSocket())
48+
{
49+
req.Options.Correlate = true;
50+
SimpleReqRepSequence(address, req, rep);
51+
}
52+
}
53+
1054
[Theory]
1155
[InlineData("tcp://localhost")]
1256
[InlineData("tcp://127.0.0.1")]
13-
public void SimpleReqRep(string address)
57+
public void SimpleReqRepWithRelaxedSucceeds(string address)
58+
{
59+
using (var rep = new ResponseSocket())
60+
using (var req = new RequestSocket())
61+
{
62+
req.Options.Relaxed = true;
63+
SimpleReqRepSequence(address, req, rep);
64+
req.SendFrame("Hi2"); // ick that this not what I wanted.
65+
}
66+
}
67+
68+
[Theory]
69+
[InlineData("tcp://localhost")]
70+
[InlineData("tcp://127.0.0.1")]
71+
public void SendingTwoRequestsInSuccessionFails(string address)
1472
{
1573
using (var rep = new ResponseSocket())
1674
using (var req = new RequestSocket())
1775
{
1876
var port = rep.BindRandomPort(address);
1977
req.Connect(address + ":" + port);
20-
2178
req.SendFrame("Hi");
79+
rep.SkipFrame();
80+
Assert.Throws<FiniteStateMachineException>(() => req.SendFrame("Hi2"));
81+
}
82+
}
2283

23-
Assert.Equal(new[] { "Hi" }, rep.ReceiveMultipartStrings());
24-
25-
rep.SendFrame("Hi2");
84+
[Theory]
85+
[InlineData("tcp://localhost")]
86+
[InlineData("tcp://127.0.0.1")]
87+
public void SendingTwoRequestsInSuccessionWithRelaxedSucceeds(string address)
88+
{
89+
using (var rep = new ResponseSocket())
90+
using (var req = new RequestSocket())
91+
{
92+
req.Options.Relaxed = true;
2693

27-
Assert.Equal(new[] { "Hi2" }, req.ReceiveMultipartStrings());
94+
var port = rep.BindRandomPort(address);
95+
req.Connect(address + ":" + port);
96+
req.SendFrame("Hi");
97+
rep.SkipFrame();
2898
}
2999
}
30100

31-
[Fact]
32-
public void SendingTwoRequestsInARow()
101+
[Fact]
102+
public void ReceiveBeforeSendingFails()
33103
{
34104
using (var rep = new ResponseSocket())
35105
using (var req = new RequestSocket())
36106
{
37107
var port = rep.BindRandomPort("tcp://localhost");
38108
req.Connect("tcp://localhost:" + port);
39109

40-
req.SendFrame("Hi");
110+
Assert.Throws<FiniteStateMachineException>(() => req.ReceiveFrameBytes());
111+
}
112+
}
41113

42-
rep.SkipFrame();
114+
[Fact]
115+
public void ReceiveBeforeSendingWithRelaxedStillFails()
116+
{
117+
using (var rep = new ResponseSocket())
118+
using (var req = new RequestSocket())
119+
{
120+
req.Options.Relaxed = true;
121+
var port = rep.BindRandomPort("tcp://localhost");
122+
req.Connect("tcp://localhost:" + port);
43123

44-
Assert.Throws<FiniteStateMachineException>(() => req.SendFrame("Hi2"));
124+
Assert.Throws<FiniteStateMachineException>(() => req.ReceiveFrameBytes());
45125
}
46126
}
47127

48128
[Fact]
49-
public void ReceiveBeforeSending()
129+
public void ReceiveBeforeSendingWithCorrelateStillFails()
50130
{
51131
using (var rep = new ResponseSocket())
52132
using (var req = new RequestSocket())
53133
{
134+
req.Options.Correlate = true;
54135
var port = rep.BindRandomPort("tcp://localhost");
55136
req.Connect("tcp://localhost:" + port);
56137

@@ -59,7 +140,7 @@ public void ReceiveBeforeSending()
59140
}
60141

61142
[Fact]
62-
public void SendMessageInResponseBeforeReceiving()
143+
public void SendMessageInResponseBeforeReceivingFails()
63144
{
64145
using (var rep = new ResponseSocket())
65146
using (var req = new RequestSocket())
@@ -71,8 +152,89 @@ public void SendMessageInResponseBeforeReceiving()
71152
}
72153
}
73154

155+
// make sure that a single responder sends messages back to the correct requestors.
156+
[Fact]
157+
public void SingleResponderSendsCorrectMessagesToMultipleRequestors()
158+
{
159+
using (var rep = new ResponseSocket())
160+
using (var req1 = new RequestSocket())
161+
using (var req2 = new RequestSocket())
162+
{
163+
var port = rep.BindRandomPort("tcp://127.0.0.1");
164+
165+
req1.Connect($"tcp://127.0.0.1:{port}");
166+
req2.Connect($"tcp://127.0.0.1:{port}");
167+
168+
req1.SendFrame("From1");
169+
req2.SendFrame("From2");
170+
171+
rep.SendFrame(rep.ReceiveFrameString());
172+
rep.SendFrame(rep.ReceiveFrameString());
173+
174+
Assert.Equal("From2", req2.ReceiveFrameString());
175+
Assert.Equal("From1", req1.ReceiveFrameString());
176+
}
177+
}
178+
179+
internal void RouterBounce(ref RouterSocket router)
180+
{
181+
bool more;
182+
do
183+
{
184+
var bytes = router.ReceiveFrameBytes(out more);
185+
router.SendFrame(bytes, more);
186+
} while (more);
187+
}
188+
189+
[Theory]
190+
[InlineData(false)]
191+
[InlineData(true)]
192+
public void CorrelationSelectsLatestRequest(bool correlate)
193+
{
194+
var rep = new RouterSocket();
195+
var req = new RequestSocket();
196+
197+
var port = rep.BindRandomPort("tcp://localhost");
198+
req.Connect($"tcp://localhost:{port}");
199+
200+
req.Options.Relaxed = true;
201+
req.Options.Correlate = correlate;
202+
203+
// Send two requests.
204+
req.SendFrame("FirstReq");
205+
req.SendFrame("SecondReq");
206+
207+
// Bind server allowing it to receive messages.
208+
//rep.Bind($"tcp://localhost:{port}");
209+
210+
// Read the two messages and send them back as is.
211+
RouterBounce(ref rep);
212+
RouterBounce(ref rep);
213+
214+
// Read the reply. When Options.Correlate is active,
215+
// "FirstReq" should be ditched and "SecondReq" should be read. Vice
216+
// versa when Options.Corellate is not active.
217+
var result = req.ReceiveFrameString();
218+
219+
if (correlate)
220+
{
221+
// if correlate is on, we get SecondReq which is the typical desired behavior; this
222+
// is the last request that was sent.
223+
Assert.Equal("SecondReq", result);
224+
}
225+
else
226+
{
227+
// if correlate is off, we get FirstReq, which is not desired behavior; this is not
228+
// the last request that was sent.
229+
Assert.Equal("FirstReq", result);
230+
}
231+
232+
rep.Dispose();
233+
req.Dispose();
234+
}
235+
74236
[Fact]
75-
public void SendMultipartMessage()
237+
public void SendMultipartMessageSucceeds()
76238
{
77239
using (var rep = new ResponseSocket())
78240
using (var req = new RequestSocket())

src/NetMQ/Core/Options.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public Options()
6666
HeartbeatTimeout = -1;
6767
HelloMsg = null;
6868
CanSendHelloMsg = false;
69+
Correlate = false;
70+
Relaxed = false;
6971
}
7072

7173
/// <summary>
@@ -324,7 +326,10 @@ public byte IdentitySize {
324326
/// Indicate of socket can send an hello msg
325327
/// </summary>
326328
public bool CanSendHelloMsg { get; set; }
327-
329+
330+
public bool Correlate { get; set; }
331+
public bool Relaxed { get; set; }
332+
328333
/// <summary>
329334
/// Assign the given optionValue to the specified option.
330335
/// </summary>
@@ -520,6 +525,18 @@ public void SetSocketOption(ZmqSocketOption option, object optionValue)
520525
}
521526
break;
522527
}
528+
529+
case ZmqSocketOption.Relaxed:
530+
{
531+
Relaxed = (bool)optionValue;
532+
break;
533+
}
534+
535+
case ZmqSocketOption.Correlate:
536+
{
537+
Correlate = (bool)optionValue;
538+
break;
539+
}
523540

524541
default:
525542
throw new InvalidException("Options.SetSocketOption called with invalid ZmqSocketOption of " + option);

src/NetMQ/Core/Patterns/Dealer.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@ protected override bool XSend(ref Msg msg)
7474
return m_loadBalancer.Send(ref msg);
7575
}
7676

77+
/// <summary>
78+
/// Transmit the given message. The <c>Send</c> method calls this to do the actual sending.
79+
/// </summary>
80+
/// <param name="msg">the message to transmit</param>
81+
/// <param name="pipe">the pipe that the message was transmitted on (output)</param>
82+
/// <returns><c>true</c> if the message was sent successfully</returns>
83+
protected bool XSendPipe(ref Msg msg, out Pipe pipe)
84+
{
85+
return m_loadBalancer.SendPipe(ref msg, out pipe);
86+
}
87+
7788
/// <summary>
7889
/// Get a message from FairQueuing data structure
7990
/// </summary>
@@ -84,6 +95,17 @@ protected override bool XRecv(ref Msg msg)
8495
return m_fairQueueing.Recv(ref msg);
8596
}
8697

98+
/// <summary>
99+
/// Get a message from FairQueuing data structure
100+
/// </summary>
101+
/// <param name="msg">a Msg to receive the message into</param>
102+
/// <param name="pipe">a specific Pipe to receive on</param>
103+
/// <returns><c>true</c> if the message was received successfully, <c>false</c> if there were no messages to receive</returns>
104+
protected bool XRecvPipe(ref Msg msg, out Pipe pipe)
105+
{
106+
return m_fairQueueing.RecvPipe(ref msg, out pipe);
107+
}
108+
87109
/// <summary>
88110
/// If there is a message available and one has not been pre-fetched yet,
89111
/// preserve that message as our pre-fetched one.

src/NetMQ/Core/Patterns/Peer.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -281,34 +281,32 @@ protected override bool XRecv(ref Msg msg)
281281
return true;
282282
}
283283

284-
var pipe = new Pipe[1];
285-
286-
bool isMessageAvailable = m_fairQueueing.RecvPipe(pipe, ref msg);
284+
bool isMessageAvailable = m_fairQueueing.RecvPipe(ref msg, out Pipe pipe);
287285

288286
// Drop any messages with more flag
289287
while (isMessageAvailable && msg.HasMore)
290288
{
291289
// drop all frames of the current multi-frame message
292-
isMessageAvailable = m_fairQueueing.RecvPipe(pipe, ref msg);
290+
isMessageAvailable = m_fairQueueing.RecvPipe(ref msg, out pipe);
293291

294292
while (isMessageAvailable && msg.HasMore)
295-
isMessageAvailable = m_fairQueueing.RecvPipe(pipe, ref msg);
293+
isMessageAvailable = m_fairQueueing.RecvPipe(ref msg, out pipe);
296294

297295
// get the new message
298-
isMessageAvailable = m_fairQueueing.RecvPipe(pipe, ref msg);
296+
isMessageAvailable = m_fairQueueing.RecvPipe(ref msg, out pipe);
299297
}
300298

301299
if (!isMessageAvailable)
302300
return false;
303301

304-
Debug.Assert(pipe[0] != null);
302+
Debug.Assert(pipe != null);
305303

306304
// We are at the beginning of a message.
307305
// Keep the message part we have in the prefetch buffer
308306
// and return the ID of the peer instead.
309307
m_prefetchedMsg.Move(ref msg);
310308

311-
byte[] routingId = pipe[0].RoutingId;
309+
byte[] routingId = pipe.RoutingId;
312310
msg.InitPool(routingId.Length);
313311
msg.Put(routingId, 0, routingId.Length);
314312
msg.SetFlags(MsgFlags.More);

0 commit comments

Comments
 (0)