Skip to content

Commit 3f4a1ec

Browse files
authored
Merge branch 'zeromq:master' into master
2 parents 984d563 + 4a80a4a commit 3f4a1ec

File tree

10 files changed

+295
-55
lines changed

10 files changed

+295
-55
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ This repository is for version 4, for version 3 go to: https://github.com/NetMQ/
2525

2626
## Using / Documentation
2727

28-
Before using NetMQ, make sure to read the [ZeroMQ Guide](http://zguide.zeromq.org/page:all).
28+
Before using NetMQ, make sure to read the [ZeroMQ Guide](http://zguide.zeromq.org/).
2929

3030
The NetMQ documentation can be found at [netmq.readthedocs.org](http://netmq.readthedocs.org/en/latest/). Thanks to [Sacha Barber](http://www.codeproject.com/Members/Sacha-Barber) who agreed to do the documentation.
3131

src/NetMQ.Tests/RouterTests.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,5 +130,26 @@ public void Handover()
130130
}
131131
}
132132
}
133+
134+
[Fact]
135+
public void RoutingKeys()
136+
{
137+
using var router = new RouterSocket("inproc://routing-keys");
138+
using var dealer = new DealerSocket("inproc://routing-keys");
139+
140+
dealer.SendRoutingKeys(new RoutingKey(1)).SendFrame("Hello");
141+
142+
var keys = router.ReceiveRoutingKeys();
143+
var message = router.ReceiveFrameString();
144+
145+
Assert.Equal("Hello", message);
146+
147+
router.SendRoutingKeys(keys).SendFrame("World");
148+
149+
dealer.ReceiveRoutingKeys();
150+
var reply = dealer.ReceiveFrameString();
151+
152+
Assert.Equal("World", reply);
153+
}
133154
}
134155
}

src/NetMQ/AsyncReceiveExtensions.cs

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public static class AsyncReceiveExtensions
2020
static Task<bool> s_trueTask = Task.FromResult(true);
2121
static Task<bool> s_falseTask = Task.FromResult(false);
2222

23-
#region Receiving frames as a multipart message
23+
#region Receiving frames as a multipart message
2424

2525
/// <summary>
2626
/// Receive a single frame from <paramref name="socket"/>, asynchronously.
@@ -52,9 +52,9 @@ public static async Task<NetMQMessage> ReceiveMultipartMessageAsync(
5252
return message;
5353
}
5454

55-
#endregion
55+
#endregion
5656

57-
#region Receiving a frame as a byte array
57+
#region Receiving a frame as a byte array
5858

5959
/// <summary>
6060
/// Receive a single frame from <paramref name="socket"/>, asynchronously.
@@ -85,7 +85,12 @@ public static async Task<NetMQMessage> ReceiveMultipartMessageAsync(
8585
}
8686

8787
TaskCompletionSource<(byte[], bool)> source = new TaskCompletionSource<(byte[], bool)>();
88-
cancellationToken.Register(() => source.SetCanceled());
88+
89+
CancellationTokenRegistration? registration = null;
90+
if (cancellationToken.CanBeCanceled)
91+
{
92+
registration = cancellationToken.Register(PropagateCancel);
93+
}
8994

9095
void Listener(object sender, NetMQSocketEventArgs args)
9196
{
@@ -96,18 +101,26 @@ void Listener(object sender, NetMQSocketEventArgs args)
96101
msg.Close();
97102

98103
socket.ReceiveReady -= Listener;
99-
source.SetResult((data, more));
104+
registration?.Dispose();
105+
source.TrySetResult((data, more));
100106
}
101107
}
108+
109+
void PropagateCancel()
110+
{
111+
socket.ReceiveReady -= Listener;
112+
registration?.Dispose();
113+
source.TrySetCanceled();
114+
}
102115

103116
socket.ReceiveReady += Listener;
104117

105118
return source.Task;
106119
}
107120

108-
#endregion
121+
#endregion
109122

110-
#region Receiving a frame as a string
123+
#region Receiving a frame as a string
111124

112125
/// <summary>
113126
/// Receive a single frame from <paramref name="socket"/>, asynchronously, and decode as a string using <see cref="SendReceiveConstants.DefaultEncoding"/>.
@@ -155,7 +168,12 @@ void Listener(object sender, NetMQSocketEventArgs args)
155168
}
156169

157170
TaskCompletionSource<(string, bool)> source = new TaskCompletionSource<(string,bool)>();
158-
cancellationToken.Register(() => source.SetCanceled());
171+
172+
CancellationTokenRegistration? registration = null;
173+
if (cancellationToken.CanBeCanceled)
174+
{
175+
registration = cancellationToken.Register(PropagateCancel);
176+
}
159177

160178
void Listener(object sender, NetMQSocketEventArgs args)
161179
{
@@ -165,28 +183,40 @@ void Listener(object sender, NetMQSocketEventArgs args)
165183
? msg.GetString(encoding)
166184
: string.Empty;
167185
bool more = msg.HasMore;
168-
169186
msg.Close();
187+
170188
socket.ReceiveReady -= Listener;
171-
source.SetResult((str, more));
189+
registration?.Dispose();
190+
source.TrySetResult((str, more));
172191
}
173192
}
174193

194+
void PropagateCancel()
195+
{
196+
socket.ReceiveReady -= Listener;
197+
registration?.Dispose();
198+
source.TrySetCanceled();
199+
}
200+
175201
socket.ReceiveReady += Listener;
176202

177203
return source.Task;
178204
}
179205

180-
#endregion
206+
#endregion
181207

182-
#region Skipping a message
208+
#region Skipping a message
183209

184210
/// <summary>
185211
/// Receive a single frame from <paramref name="socket"/>, asynchronously, then ignore its content.
186212
/// </summary>
187213
/// <param name="socket">The socket to receive from.</param>
214+
/// <param name="cancellationToken">The token used to propagate notification that this operation should be canceled.</param>
188215
/// <returns>Boolean indicate if another frame of the same message follows</returns>
189-
public static Task<bool> SkipFrameAsync(this NetMQSocket socket)
216+
public static Task<bool> SkipFrameAsync(
217+
this NetMQSocket socket,
218+
CancellationToken cancellationToken = default(CancellationToken)
219+
)
190220
{
191221
if (NetMQRuntime.Current == null)
192222
throw new InvalidOperationException("NetMQRuntime must be created before calling async functions");
@@ -206,26 +236,41 @@ public static Task<bool> SkipFrameAsync(this NetMQSocket socket)
206236

207237
TaskCompletionSource<bool> source = new TaskCompletionSource<bool>();
208238

239+
CancellationTokenRegistration? registration = null;
240+
if (cancellationToken.CanBeCanceled)
241+
{
242+
registration = cancellationToken.Register(PropagateCancel);
243+
}
244+
209245
void Listener(object sender, NetMQSocketEventArgs args)
210246
{
211247
if (socket.TryReceive(ref msg, TimeSpan.Zero))
212248
{
213249
bool more = msg.HasMore;
214250
msg.Close();
251+
215252
socket.ReceiveReady -= Listener;
216-
source.SetResult(more);
253+
registration?.Dispose();
254+
source.TrySetResult(more);
217255
}
218256
}
219257

258+
void PropagateCancel()
259+
{
260+
socket.ReceiveReady -= Listener;
261+
registration?.Dispose();
262+
source.TrySetCanceled();
263+
}
264+
220265
socket.ReceiveReady += Listener;
221266

222267
return source.Task;
223268
}
224269

225270

226-
#endregion
271+
#endregion
227272

228-
#region Skipping all frames of a multipart message
273+
#region Skipping all frames of a multipart message
229274

230275
/// <summary>
231276
/// Receive all frames of the next message from <paramref name="socket"/>, asynchronously, then ignore their contents.
@@ -242,9 +287,9 @@ public static async Task SkipMultipartMessageAsync(this NetMQSocket socket)
242287
}
243288

244289

245-
#endregion
290+
#endregion
246291

247-
#region Receiving a routing key
292+
#region Receiving a routing key
248293

249294
/// <summary>
250295
/// Receive a routing-key from <paramref name="socket"/>, blocking until one arrives.
@@ -259,7 +304,7 @@ public static async Task SkipMultipartMessageAsync(this NetMQSocket socket)
259304
return (new RoutingKey(bytes), more);
260305
}
261306

262-
#endregion
307+
#endregion
263308
}
264309
}
265310

src/NetMQ/Msg.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,18 @@ public Span<byte> Slice()
265265

266266
return new Span<byte>(m_data, m_offset, Size);
267267
}
268+
269+
/// <summary>
270+
/// Return the internal buffer as Memory
271+
/// </summary>
272+
/// <returns>The memory</returns>
273+
public Memory<byte> SliceAsMemory()
274+
{
275+
if (m_data == null)
276+
return Memory<byte>.Empty;
277+
278+
return new Memory<byte>(m_data, m_offset, Size);
279+
}
268280

269281
/// <summary>
270282
/// Returns a slice of the internal buffer.

src/NetMQ/NetMQ-unix.csproj

Lines changed: 0 additions & 15 deletions
This file was deleted.

src/NetMQ/NetMQ.csproj

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,9 @@
4949
<PackageReference Include="System.ServiceModel.Primitives" Version="4.4.0" />
5050
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
5151
</ItemGroup>
52-
53-
<ItemGroup Condition=" '$(TargetFramework)' == 'net45' ">
54-
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies.net45" Version="1.0.0" PrivateAssets="All" />
55-
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
56-
<Reference Include="System.ServiceModel" />
57-
<Reference Include="System" />
58-
<Reference Include="Microsoft.CSharp" />
59-
</ItemGroup>
6052

61-
<ItemGroup Condition=" '$(TargetFramework)' == 'net47' ">
62-
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies.net47" Version="1.0.0" PrivateAssets="All" />
53+
<ItemGroup Condition=" '$(TargetFrameworkIdentifier)' == '.NETFramework' ">
54+
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
6355
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
6456
<Reference Include="System.ServiceModel" />
6557
<Reference Include="System" />

src/NetMQ/NetMQBeacon.cs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ private void OnUdpReady(Socket socket)
199199
{
200200
Assumes.NotNull(m_pipe);
201201

202-
var frame = ReceiveUdpFrame(out string peerName);
202+
if (!TryReceiveUdpFrame(out NetMQFrame frame, out string peerName))
203+
return;
203204

204205
// If filter is set, check that beacon matches it
205206
var isValid = frame.MessageSize >= m_filter?.MessageSize && Compare(frame, m_filter, m_filter.MessageSize);
@@ -269,28 +270,37 @@ private void SendUdpFrame(NetMQFrame frame)
269270
{
270271
m_udpSocket.SendTo(frame.Buffer, 0, frame.MessageSize, SocketFlags.None, m_broadcastAddress);
271272
}
272-
catch (SocketException ex)
273+
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.AddressNotAvailable)
273274
{
274-
if (ex.SocketErrorCode != SocketError.AddressNotAvailable) { throw; }
275-
276275
// Initiate Creation of new Udp here to solve issue related to 'sudden' network change.
277276
// On windows (7 OR 10) incorrect/previous ip address might still exist instead of new Ip
278277
// due to network change which causes crash (if no try/catch and keep trying to send to incorrect/not available address.
279278
// This approach would solve the issue...
280279
}
281280
}
282281

283-
private NetMQFrame ReceiveUdpFrame(out string peerName)
282+
private bool TryReceiveUdpFrame(out NetMQFrame frame, out string peerName)
284283
{
285284
Assumes.NotNull(m_udpSocket);
286285

287286
var buffer = new byte[UdpFrameMax];
288287
EndPoint peer = new IPEndPoint(IPAddress.Any, 0);
289288

290-
var bytesRead = m_udpSocket.ReceiveFrom(buffer, ref peer);
291-
peerName = peer.ToString();
289+
int bytesRead = 0;
290+
try
291+
{
292+
bytesRead = m_udpSocket.ReceiveFrom(buffer, ref peer);
293+
}
294+
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.MessageSize)
295+
{
296+
frame = default;
297+
peerName = null;
298+
return false;
299+
}
292300

293-
return new NetMQFrame(buffer, bytesRead);
301+
peerName = peer.ToString();
302+
frame = new NetMQFrame(buffer, bytesRead);
303+
return true;
294304
}
295305
}
296306

0 commit comments

Comments
 (0)