Skip to content

Commit b90e7f6

Browse files
authored
Merge pull request #766 from AbirAtur/queue_capacity_bugfix
fix for #759
2 parents 1f5cbd7 + c02b03b commit b90e7f6

File tree

3 files changed

+50
-4
lines changed

3 files changed

+50
-4
lines changed

src/NetMQ.Tests/NetMQQueueTests.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
#if !NET35
22
using System;
3+
using System.Net.Sockets;
34
using System.Threading;
5+
using System.Threading.Tasks;
6+
using NetMQ.Sockets;
47
using Xunit;
58

69
namespace NetMQ.Tests
@@ -20,6 +23,27 @@ public void EnqueueDequeue()
2023
}
2124
}
2225

26+
[Fact]
27+
public void EnqueueShouldNotBlockWhenCapacityIsZero()
28+
{
29+
using (var mockSocket = new PairSocket())
30+
using (var queue = new NetMQQueue<int>())
31+
{
32+
int socketWatermarkCapacity = mockSocket.Options.SendHighWatermark + mockSocket.Options.ReceiveHighWatermark;
33+
34+
Task task = Task.Run(() =>
35+
{
36+
for (int i = 0; i < socketWatermarkCapacity + 100; i++)
37+
{
38+
queue.Enqueue(i);
39+
}
40+
});
41+
42+
bool completed = task.Wait(TimeSpan.FromSeconds(1));
43+
Assert.True(completed, "Enqueue task should have completed " + socketWatermarkCapacity + " enqueue within 1 second");
44+
}
45+
}
46+
2347
[Fact]
2448
public void TryDequeue()
2549
{

src/NetMQ/NetMQQueue.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ public NetMQQueue(int capacity = 0)
3535
throw new ArgumentOutOfRangeException(nameof(capacity));
3636

3737
m_queue = new ConcurrentQueue<T>();
38-
PairSocket.CreateSocketPair(out m_writer, out m_reader);
39-
40-
m_writer.Options.SendHighWatermark = m_reader.Options.ReceiveHighWatermark = capacity / 2;
38+
PairSocket.CreateSocketPair(out m_writer,
39+
out m_reader,
40+
writer => writer.Options.SendHighWatermark = capacity / 2,
41+
reader => reader.Options.ReceiveHighWatermark = capacity / 2);
4142

4243
m_eventDelegator = new EventDelegator<NetMQQueueEventArgs<T>>(
4344
() => m_reader.ReceiveReady += OnReceiveReady,

src/NetMQ/Sockets/PairSocket.cs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Threading;
1+
using System;
2+
using System.Threading;
23
using NetMQ.Core;
34

45
namespace NetMQ.Sockets
@@ -44,5 +45,25 @@ public static void CreateSocketPair(out PairSocket socket1, out PairSocket socke
4445
socket2 = new PairSocket();
4546
socket2.Connect(address);
4647
}
48+
49+
/// <summary>
50+
/// Create and return an inproc pipe where socket1 is bound and socket2 is connected.
51+
/// </summary>
52+
/// <param name="socket1">the Bind socket</param>
53+
/// <param name="socket2">the Connect socket</param>
54+
/// <param name="initSocket1">Method to initialize socket1 before connection</param>
55+
/// <param name="initSocket2">Method to initialize socket2 before connection</param>
56+
public static void CreateSocketPair(out PairSocket socket1, out PairSocket socket2, Action<PairSocket> initSocket1, Action<PairSocket> initSocket2)
57+
{
58+
string address = $"inproc://NetMQSocketPair#{Interlocked.Increment(ref s_sequence)}";
59+
60+
socket1 = new PairSocket();
61+
initSocket1(socket1);
62+
socket1.Bind(address);
63+
64+
socket2 = new PairSocket();
65+
initSocket2(socket2);
66+
socket2.Connect(address);
67+
}
4768
}
4869
}

0 commit comments

Comments
 (0)