Skip to content

Commit bad9dc7

Browse files
authored
Merge pull request #876 from somdoron/thread_safe
problem: no way to cancel receive operation
2 parents 4cd27a1 + 2658130 commit bad9dc7

File tree

8 files changed

+221
-95
lines changed

8 files changed

+221
-95
lines changed

src/NetMQ.Tests/ClientServer.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
14
using NetMQ;
25
using NetMQ.Sockets;
36
using Xunit;
@@ -59,5 +62,16 @@ public async void Async()
5962
var serverMsg = await client.ReceiveStringAsync();
6063
Assert.Equal("World", serverMsg);
6164
}
65+
66+
[Fact]
67+
public async void AsyncWithCancellationToken()
68+
{
69+
using CancellationTokenSource source = new CancellationTokenSource();
70+
using var server = new ServerSocket();
71+
72+
source.CancelAfter(100);
73+
74+
await Assert.ThrowsAsync<OperationCanceledException>(async () => await server.ReceiveStringAsync(source.Token));
75+
}
6276
}
6377
}

src/NetMQ/Core/CommandType.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ internal enum CommandType
106106
/// <summary>
107107
/// Send to reaper to stop the reaper immediatly
108108
/// </summary>
109-
ForceStop
109+
ForceStop,
110+
111+
/// <summary>
112+
/// Send a cancellation request to the socket from a cancellation token
113+
/// </summary>
114+
CancellationRequested
110115
}
111116
}

src/NetMQ/Core/SocketBase.cs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ You should have received a copy of the GNU Lesser General Public License
2525
using System.Diagnostics;
2626
using System.Linq;
2727
using System.Net.Sockets;
28+
using System.Threading;
2829
using AsyncIO;
2930
using JetBrains.Annotations;
3031
using NetMQ.Core.Patterns;
@@ -982,6 +983,7 @@ public bool TrySend(ref Msg msg, TimeSpan timeout, bool more)
982983
/// </summary>
983984
/// <param name="msg">the <c>Msg</c> to read the received message into</param>
984985
/// <param name="timeout">this controls whether the call blocks, and for how long.</param>
986+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
985987
/// <returns><c>true</c> if successful, <c>false</c> if it timed out</returns>
986988
/// <remarks>
987989
/// For <paramref name="timeout"/>, there are three categories of value:
@@ -993,7 +995,7 @@ public bool TrySend(ref Msg msg, TimeSpan timeout, bool more)
993995
/// </remarks>
994996
/// <exception cref="FaultException">the Msg must already have been uninitialised</exception>
995997
/// <exception cref="TerminatingException">The socket must not already be stopped.</exception>
996-
public bool TryRecv(ref Msg msg, TimeSpan timeout)
998+
public bool TryRecv(ref Msg msg, TimeSpan timeout, CancellationToken cancellationToken = default)
997999
{
9981000
Lock();
9991001
try
@@ -1056,7 +1058,10 @@ public bool TryRecv(ref Msg msg, TimeSpan timeout)
10561058
bool block = m_ticks != 0;
10571059
while (true)
10581060
{
1059-
ProcessCommands(block ? timeoutMillis : 0, false);
1061+
if (cancellationToken.IsCancellationRequested)
1062+
return false;
1063+
1064+
ProcessCommands(block ? timeoutMillis : 0, false, cancellationToken);
10601065

10611066
isMessageAvailable = XRecv(ref msg);
10621067
if (isMessageAvailable)
@@ -1168,15 +1173,21 @@ internal void StartReaping([NotNull] Poller poller)
11681173
/// </summary>
11691174
/// <param name="timeout">how much time to allow to wait for a command, before returning (in milliseconds)</param>
11701175
/// <param name="throttle">if true - throttle the rate of command-execution by doing only one per call</param>
1176+
/// <param name="cancellationToken">allows the caller to cancel the process commands operation</param>
11711177
/// <exception cref="TerminatingException">The Ctx context must not already be terminating.</exception>
1172-
private void ProcessCommands(int timeout, bool throttle)
1178+
private void ProcessCommands(int timeout, bool throttle, CancellationToken cancellationToken = default)
11731179
{
11741180
bool found;
11751181
Command command;
11761182
if (timeout != 0)
11771183
{
1178-
// If we are asked to wait, simply ask mailbox to wait.
1179-
found = m_mailbox.TryRecv(timeout, out command);
1184+
if (cancellationToken.CanBeCanceled)
1185+
{
1186+
using var registration = cancellationToken.Register(SendCancellationRequested);
1187+
found = m_mailbox.TryRecv(timeout, out command);
1188+
}
1189+
else
1190+
found = m_mailbox.TryRecv(timeout, out command);
11801191
}
11811192
else
11821193
{

src/NetMQ/Core/ZObject.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,14 @@ protected void SendDone()
240240
m_ctx.SendCommand(Ctx.TermTid, new Command(null, CommandType.Done));
241241
}
242242

243+
/// <summary>
244+
/// Socket sends a CancellationRequested command to itself when a CancellationToken has been cancelled
245+
/// </summary>
246+
protected void SendCancellationRequested()
247+
{
248+
SendCommand(new Command(this, CommandType.CancellationRequested, null));
249+
}
250+
243251
/// <summary>
244252
/// Send the given Command, on that commands Destination thread.
245253
/// </summary>
@@ -324,6 +332,10 @@ public void ProcessCommand([NotNull] Command cmd)
324332
case CommandType.ForceStop:
325333
ProcessForceStop();
326334
break;
335+
336+
case CommandType.CancellationRequested:
337+
ProcessCancellationRequested();
338+
break;
327339

328340
default:
329341
throw new ArgumentException();
@@ -468,6 +480,13 @@ protected virtual void ProcessSeqnum()
468480
throw new NotSupportedException();
469481
}
470482

483+
/// <summary>
484+
/// Handler for cancellation requested
485+
/// </summary>
486+
protected virtual void ProcessCancellationRequested()
487+
{
488+
}
489+
471490
#endregion
472491
}
473492
}

src/NetMQ/ReceiveThreadSafeSocketExtensions.cs

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Text;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using JetBrains.Annotations;
56

@@ -18,19 +19,26 @@ public static class ReceiveThreadSafeSocketExtensions
1819
/// Receive a bytes from <paramref name="socket"/>, blocking until one arrives.
1920
/// </summary>
2021
/// <param name="socket">The socket to receive from.</param>
22+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
2123
/// <returns>The content of the received message.</returns>
24+
/// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
2225
[NotNull]
23-
public static byte[] ReceiveBytes([NotNull] this IThreadSafeInSocket socket)
26+
public static byte[] ReceiveBytes([NotNull] this IThreadSafeInSocket socket,
27+
CancellationToken cancellationToken = default)
2428
{
2529
var msg = new Msg();
2630
msg.InitEmpty();
2731

28-
socket.Receive(ref msg);
29-
30-
var data = msg.CloneData();
31-
32-
msg.Close();
33-
return data;
32+
try
33+
{
34+
socket.Receive(ref msg, cancellationToken);
35+
var data = msg.CloneData();
36+
return data;
37+
}
38+
finally
39+
{
40+
msg.Close();
41+
}
3442
}
3543

3644
#endregion
@@ -60,14 +68,16 @@ public static bool TryReceiveBytes([NotNull] this IThreadSafeInSocket socket, ou
6068
/// <param name="socket">The socket to receive from.</param>
6169
/// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
6270
/// <param name="bytes">The content of the received message, or <c>null</c> if no message was available.</param>
71+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
6372
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
73+
/// <remarks>The method would return false if cancellation has had requested.</remarks>
6474
public static bool TryReceiveBytes([NotNull] this IThreadSafeInSocket socket, TimeSpan timeout,
65-
out byte[] bytes)
75+
out byte[] bytes, CancellationToken cancellationToken = default)
6676
{
6777
var msg = new Msg();
6878
msg.InitEmpty();
6979

70-
if (!socket.TryReceive(ref msg, timeout))
80+
if (!socket.TryReceive(ref msg, timeout, cancellationToken))
7181
{
7282
msg.Close();
7383
bytes = null;
@@ -88,15 +98,20 @@ public static bool TryReceiveBytes([NotNull] this IThreadSafeInSocket socket, Ti
8898
/// Receive a bytes from <paramref name="socket"/> asynchronously.
8999
/// </summary>
90100
/// <param name="socket">The socket to receive from.</param>
101+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
91102
/// <returns>The content of the received message.</returns>
92-
public static ValueTask<byte[]> ReceiveBytesAsync([NotNull] this IThreadSafeInSocket socket)
103+
/// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
104+
public static ValueTask<byte[]> ReceiveBytesAsync([NotNull] this IThreadSafeInSocket socket,
105+
CancellationToken cancellationToken = default)
93106
{
94107
if (TryReceiveBytes(socket, out var bytes))
95108
return new ValueTask<byte[]>(bytes);
96109

97110
// TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously
98111
// and probably implement IValueTaskSource
99-
return new ValueTask<byte[]>(Task.Factory.StartNew(socket.ReceiveBytes, TaskCreationOptions.LongRunning));
112+
// TODO: should we avoid lambda here as it cause heap allocation for the environment?
113+
return new ValueTask<byte[]>(Task.Factory.StartNew(() => socket.ReceiveBytes(cancellationToken),
114+
cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
100115
}
101116

102117
#endregion
@@ -111,27 +126,32 @@ public static ValueTask<byte[]> ReceiveBytesAsync([NotNull] this IThreadSafeInSo
111126
/// Receive a string from <paramref name="socket"/>, blocking until one arrives, and decode using <see cref="SendReceiveConstants.DefaultEncoding"/>.
112127
/// </summary>
113128
/// <param name="socket">The socket to receive from.</param>
129+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
114130
/// <returns>The content of the received message.</returns>
115-
public static string ReceiveString([NotNull] this IThreadSafeInSocket socket)
131+
/// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
132+
public static string ReceiveString([NotNull] this IThreadSafeInSocket socket,
133+
CancellationToken cancellationToken = default)
116134
{
117-
return socket.ReceiveString(SendReceiveConstants.DefaultEncoding);
135+
return socket.ReceiveString(SendReceiveConstants.DefaultEncoding, cancellationToken);
118136
}
119137

120138
/// <summary>
121139
/// Receive a string from <paramref name="socket"/>, blocking until one arrives, and decode using <paramref name="encoding"/>.
122140
/// </summary>
123141
/// <param name="socket">The socket to receive from.</param>
124142
/// <param name="encoding">The encoding used to convert the data to a string.</param>
143+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
125144
/// <returns>The content of the received message.</returns>
126-
public static string ReceiveString([NotNull] this IThreadSafeInSocket socket, [NotNull] Encoding encoding)
145+
/// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
146+
public static string ReceiveString([NotNull] this IThreadSafeInSocket socket, [NotNull] Encoding encoding,
147+
CancellationToken cancellationToken = default)
127148
{
128149
var msg = new Msg();
129150
msg.InitEmpty();
130151

131-
socket.Receive(ref msg);
132-
133152
try
134153
{
154+
socket.Receive(ref msg, cancellationToken);
135155
return msg.Size > 0
136156
? msg.GetString(encoding)
137157
: string.Empty;
@@ -182,11 +202,14 @@ public static bool TryReceiveString([NotNull] this IThreadSafeInSocket socket, [
182202
/// </summary>
183203
/// <param name="socket">The socket to receive from.</param>
184204
/// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
185-
/// <param name="str">The content of the received message, or <c>null</c> if no message was available.</param>
205+
/// <param name="str">The conent of the received message, or <c>null</c> if no message was available.</param>
206+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
186207
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
187-
public static bool TryReceiveString([NotNull] this IThreadSafeInSocket socket, TimeSpan timeout, out string str)
208+
/// <remarks>The method would return false if cancellation has had requested.</remarks>
209+
public static bool TryReceiveString([NotNull] this IThreadSafeInSocket socket, TimeSpan timeout, out string str,
210+
CancellationToken cancellationToken = default)
188211
{
189-
return socket.TryReceiveString(timeout, SendReceiveConstants.DefaultEncoding, out str);
212+
return socket.TryReceiveString(timeout, SendReceiveConstants.DefaultEncoding, out str, cancellationToken);
190213
}
191214

192215
/// <summary>
@@ -197,14 +220,16 @@ public static bool TryReceiveString([NotNull] this IThreadSafeInSocket socket, T
197220
/// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
198221
/// <param name="encoding">The encoding used to convert the data to a string.</param>
199222
/// <param name="str">The content of the received message, or <c>null</c> if no message was available.</param>
223+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
200224
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
225+
/// <remarks>The method would return false if cancellation has had requested.</remarks>
201226
public static bool TryReceiveString([NotNull] this IThreadSafeInSocket socket, TimeSpan timeout,
202-
[NotNull] Encoding encoding, out string str)
227+
[NotNull] Encoding encoding, out string str, CancellationToken cancellationToken = default)
203228
{
204229
var msg = new Msg();
205230
msg.InitEmpty();
206231

207-
if (socket.TryReceive(ref msg, timeout))
232+
if (socket.TryReceive(ref msg, timeout, cancellationToken))
208233
{
209234
try
210235
{
@@ -215,7 +240,7 @@ public static bool TryReceiveString([NotNull] this IThreadSafeInSocket socket, T
215240
}
216241
finally
217242
{
218-
msg.Close();
243+
msg.Close();
219244
}
220245
}
221246

@@ -232,15 +257,19 @@ public static bool TryReceiveString([NotNull] this IThreadSafeInSocket socket, T
232257
/// Receive a string from <paramref name="socket"/> asynchronously.
233258
/// </summary>
234259
/// <param name="socket">The socket to receive from.</param>
260+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
235261
/// <returns>The content of the received message.</returns>
236-
public static ValueTask<string> ReceiveStringAsync([NotNull] this IThreadSafeInSocket socket)
262+
/// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
263+
public static ValueTask<string> ReceiveStringAsync([NotNull] this IThreadSafeInSocket socket,
264+
CancellationToken cancellationToken = default)
237265
{
238266
if (TryReceiveString(socket, out var msg))
239267
return new ValueTask<string>(msg);
240268

241269
// TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously
242270
// and probably implement IValueTaskSource
243-
return new ValueTask<string>(Task.Factory.StartNew(socket.ReceiveString, TaskCreationOptions.LongRunning));
271+
return new ValueTask<string>(Task.Factory.StartNew(() => socket.ReceiveString(cancellationToken),
272+
cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
244273
}
245274

246275
#endregion

0 commit comments

Comments
 (0)