Skip to content

Commit 4cd27a1

Browse files
authored
Merge pull request #871 from somdoron/thread_safe
problem: netmq is not thread safe
2 parents a1c452f + 2a8f519 commit 4cd27a1

25 files changed

+2846
-438
lines changed

src/NetMQ-unix.sln

Lines changed: 0 additions & 50 deletions
This file was deleted.

src/NetMQ.Tests/ClientServer.cs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using NetMQ;
2+
using NetMQ.Sockets;
3+
using Xunit;
4+
5+
namespace NetMQ.Tests
6+
{
7+
public class ClientServer
8+
{
9+
[Fact]
10+
public void Inproc()
11+
{
12+
using var server = new ServerSocket();
13+
using var client = new ClientSocket();
14+
server.Bind("inproc://client-server");
15+
client.Connect("inproc://client-server");
16+
17+
client.Send("Hello");
18+
var (routingId, clientMsg) = server.ReceiveString();
19+
Assert.NotEqual<uint>(0, routingId);
20+
Assert.Equal("Hello", clientMsg);
21+
22+
server.Send(routingId, "World");
23+
var serverMsg = client.ReceiveString();
24+
Assert.Equal("World", serverMsg);
25+
}
26+
27+
[Fact]
28+
public void Tcp()
29+
{
30+
using var server = new ServerSocket();
31+
using var client = new ClientSocket();
32+
int port = server.BindRandomPort("tcp://*");
33+
client.Connect($"tcp://localhost:{port}");
34+
35+
client.Send("Hello");
36+
var (routingId, clientMsg) = server.ReceiveString();
37+
Assert.NotEqual<uint>(0, routingId);
38+
Assert.Equal("Hello", clientMsg);
39+
40+
server.Send(routingId, "World");
41+
var serverMsg = client.ReceiveString();
42+
Assert.Equal("World", serverMsg);
43+
}
44+
45+
[Fact]
46+
public async void Async()
47+
{
48+
using var server = new ServerSocket();
49+
using var client = new ClientSocket();
50+
int port = server.BindRandomPort("tcp://*");
51+
client.Connect($"tcp://localhost:{port}");
52+
53+
await client.SendAsync("Hello");
54+
var (routingId, clientMsg) = await server.ReceiveStringAsync();
55+
Assert.NotEqual<uint>(0, routingId);
56+
Assert.Equal("Hello", clientMsg);
57+
58+
await server.SendAsync(routingId, "World");
59+
var serverMsg = await client.ReceiveStringAsync();
60+
Assert.Equal("World", serverMsg);
61+
}
62+
}
63+
}

src/NetMQ/Core/IMailbox.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using JetBrains.Annotations;
2+
3+
namespace NetMQ.Core
4+
{
5+
internal interface IMailbox
6+
{
7+
void Send([NotNull] Command command);
8+
9+
bool TryRecv(int timeout, out Command command);
10+
11+
void Close();
12+
}
13+
}

src/NetMQ/Core/Mailbox.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ You should have received a copy of the GNU Lesser General Public License
2626

2727
namespace NetMQ.Core
2828
{
29-
internal interface IMailbox
30-
{
31-
void Send([NotNull] Command command);
32-
33-
void Close();
34-
}
35-
3629
internal interface IMailboxEvent
3730
{
3831
void Ready();
@@ -92,11 +85,16 @@ public void Send(Command command)
9285
}
9386
}
9487

88+
public bool TryRecv(int timeout, out Command command)
89+
{
90+
throw new System.NotImplementedException();
91+
}
92+
9593
public bool TryRecv(out Command command)
9694
{
9795
return m_commandPipe.TryRead(out command);
9896
}
99-
97+
10098
public void RaiseEvent()
10199
{
102100
if (!m_disposed)

src/NetMQ/Core/MailboxSafe.cs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
using System.Collections.Generic;
2+
using System.Diagnostics;
3+
using System.Threading;
4+
using JetBrains.Annotations;
5+
using NetMQ.Core.Utils;
6+
7+
namespace NetMQ.Core
8+
{
9+
internal class MailboxSafe : IMailbox
10+
{
11+
/// <summary>
12+
/// The pipe to store actual commands.
13+
/// </summary>
14+
private readonly YPipe<Command> m_commandPipe = new YPipe<Command>(Config.CommandPipeGranularity, "mailbox");
15+
16+
// Synchronize access to the mailbox from receivers and senders
17+
private object m_sync;
18+
19+
private List<Signaler> m_signalers = new List<Signaler>();
20+
21+
#if DEBUG
22+
/// <summary>Mailbox name. Only used for debugging.</summary>
23+
[NotNull] private readonly string m_name;
24+
#endif
25+
26+
/// <summary>
27+
/// Create a new MailboxSafe with the given name.
28+
/// </summary>
29+
/// <param name="name">the name to give this new Mailbox</param>
30+
/// <param name="sync">Synchronize access to the mailbox from receivers and senders</param>
31+
public MailboxSafe([NotNull] string name, object sync)
32+
{
33+
m_sync = sync;
34+
35+
// Get the pipe into passive state. That way, if the users starts by
36+
// polling on the associated file descriptor it will get woken up when
37+
// new command is posted.
38+
bool ok = m_commandPipe.TryRead(out Command cmd);
39+
Debug.Assert(!ok);
40+
41+
#if DEBUG
42+
m_name = name;
43+
#endif
44+
}
45+
46+
public void AddSignaler(Signaler signaler)
47+
{
48+
m_signalers.Add(signaler);
49+
}
50+
51+
public void RemoveSignaler(Signaler signaler)
52+
{
53+
m_signalers.Remove(signaler);
54+
}
55+
56+
public void ClearSignalers()
57+
{
58+
m_signalers.Clear();
59+
}
60+
61+
public void Send(Command cmd)
62+
{
63+
lock (m_sync)
64+
{
65+
m_commandPipe.Write(ref cmd, false);
66+
bool ok = m_commandPipe.Flush();
67+
68+
if (!ok)
69+
{
70+
Monitor.PulseAll(m_sync);
71+
72+
foreach (var signaler in m_signalers)
73+
{
74+
signaler.Send();
75+
}
76+
}
77+
}
78+
}
79+
80+
public bool TryRecv(int timeout, out Command command)
81+
{
82+
// Try to get the command straight away.
83+
if (m_commandPipe.TryRead(out command))
84+
return true;
85+
86+
// If the timeout is zero, it will be quicker to release the lock, giving other a chance to send a command
87+
// and immediately relock it.
88+
if (timeout == 0)
89+
{
90+
Monitor.Exit(m_sync);
91+
Monitor.Enter(m_sync);
92+
}
93+
else
94+
{
95+
// Wait for signal from the command sender.
96+
Monitor.Wait(m_sync, timeout);
97+
}
98+
99+
// Another thread may already fetch the command
100+
return m_commandPipe.TryRead(out command);
101+
}
102+
103+
public void Close()
104+
{
105+
Monitor.Enter(m_sync);
106+
Monitor.Exit(m_sync);
107+
}
108+
109+
#if DEBUG
110+
/// <summary>
111+
/// Override ToString to provide the type-name, plus the Mailbox name within brackets.
112+
/// </summary>
113+
/// <returns>a string of the form Mailbox[name]</returns>
114+
public override string ToString()
115+
{
116+
return base.ToString() + "[" + m_name + "]";
117+
}
118+
#endif
119+
}
120+
}

src/NetMQ/Core/Mechanisms/Mechanism.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ static class SocketNames
3131
public const string Xsub = "XSUB";
3232
public const string Stream = "STREAM";
3333
public const string Peer = "PEER";
34+
public const string Server = "SERVER";
35+
public const string Client = "CLIENT";
3436
}
3537

3638
const int NameLengthSize = sizeof(byte);
@@ -119,6 +121,10 @@ protected string GetSocketName(ZmqSocketType socketType)
119121
return SocketNames.Stream;
120122
case ZmqSocketType.Peer:
121123
return SocketNames.Peer;
124+
case ZmqSocketType.Server:
125+
return SocketNames.Server;
126+
case ZmqSocketType.Client:
127+
return SocketNames.Client;
122128
default:
123129
throw new ArgumentOutOfRangeException(nameof(socketType), socketType, null);
124130
}
@@ -300,6 +306,10 @@ private bool CheckSocketType(string type)
300306
return type == SocketNames.Push;
301307
case ZmqSocketType.Push:
302308
return type == SocketNames.Pull;
309+
case ZmqSocketType.Server:
310+
return type == SocketNames.Client;
311+
case ZmqSocketType.Client:
312+
return type == SocketNames.Server;
303313
case ZmqSocketType.Peer:
304314
return type == SocketNames.Peer;
305315
default:

0 commit comments

Comments
 (0)