Skip to content

Commit d297a27

Browse files
committed
Only propagate cancellation if token can be canceled
`ReceiveFrameStringAsync` and `SkipFrameAsync` have also been updated.
1 parent 7db326c commit d297a27

File tree

1 file changed

+54
-18
lines changed

1 file changed

+54
-18
lines changed

src/NetMQ/AsyncReceiveExtensions.cs

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public static class AsyncReceiveExtensions
2020
static Task<bool> s_trueTask = Task.FromResult(true);
2121
static Task<bool> s_falseTask = Task.FromResult(false);
2222

23-
#region Receiving frames as a multipart message
23+
#region Receiving frames as a multipart message
2424

2525
/// <summary>
2626
/// Receive a single frame from <paramref name="socket"/>, asynchronously.
@@ -52,9 +52,9 @@ public static async Task<NetMQMessage> ReceiveMultipartMessageAsync(
5252
return message;
5353
}
5454

55-
#endregion
55+
#endregion
5656

57-
#region Receiving a frame as a byte array
57+
#region Receiving a frame as a byte array
5858

5959
/// <summary>
6060
/// Receive a single frame from <paramref name="socket"/>, asynchronously.
@@ -85,8 +85,12 @@ public static async Task<NetMQMessage> ReceiveMultipartMessageAsync(
8585
}
8686

8787
TaskCompletionSource<(byte[], bool)> source = new TaskCompletionSource<(byte[], bool)>();
88+
8889
CancellationTokenRegistration? registration = null;
89-
registration = cancellationToken.Register(PropagateCancel);
90+
if (cancellationToken.CanBeCanceled)
91+
{
92+
registration = cancellationToken.Register(PropagateCancel);
93+
}
9094

9195
void Listener(object sender, NetMQSocketEventArgs args)
9296
{
@@ -114,9 +118,9 @@ void PropagateCancel()
114118
return source.Task;
115119
}
116120

117-
#endregion
121+
#endregion
118122

119-
#region Receiving a frame as a string
123+
#region Receiving a frame as a string
120124

121125
/// <summary>
122126
/// Receive a single frame from <paramref name="socket"/>, asynchronously, and decode as a string using <see cref="SendReceiveConstants.DefaultEncoding"/>.
@@ -164,7 +168,12 @@ void PropagateCancel()
164168
}
165169

166170
TaskCompletionSource<(string, bool)> source = new TaskCompletionSource<(string,bool)>();
167-
cancellationToken.Register(() => source.SetCanceled());
171+
172+
CancellationTokenRegistration? registration = null;
173+
if (cancellationToken.CanBeCanceled)
174+
{
175+
registration = cancellationToken.Register(PropagateCancel);
176+
}
168177

169178
void Listener(object sender, NetMQSocketEventArgs args)
170179
{
@@ -174,28 +183,40 @@ void Listener(object sender, NetMQSocketEventArgs args)
174183
? msg.GetString(encoding)
175184
: string.Empty;
176185
bool more = msg.HasMore;
177-
178186
msg.Close();
187+
179188
socket.ReceiveReady -= Listener;
180-
source.SetResult((str, more));
189+
registration?.Dispose();
190+
source.TrySetResult((str, more));
181191
}
182192
}
183193

194+
void PropagateCancel()
195+
{
196+
socket.ReceiveReady -= Listener;
197+
registration?.Dispose();
198+
source.TrySetCanceled();
199+
}
200+
184201
socket.ReceiveReady += Listener;
185202

186203
return source.Task;
187204
}
188205

189-
#endregion
206+
#endregion
190207

191-
#region Skipping a message
208+
#region Skipping a message
192209

193210
/// <summary>
194211
/// Receive a single frame from <paramref name="socket"/>, asynchronously, then ignore its content.
195212
/// </summary>
196213
/// <param name="socket">The socket to receive from.</param>
214+
/// <param name="cancellationToken">The token used to propagate notification that this operation should be canceled.</param>
197215
/// <returns>Boolean indicate if another frame of the same message follows</returns>
198-
public static Task<bool> SkipFrameAsync(this NetMQSocket socket)
216+
public static Task<bool> SkipFrameAsync(
217+
this NetMQSocket socket,
218+
CancellationToken cancellationToken = default(CancellationToken)
219+
)
199220
{
200221
if (NetMQRuntime.Current == null)
201222
throw new InvalidOperationException("NetMQRuntime must be created before calling async functions");
@@ -215,26 +236,41 @@ public static Task<bool> SkipFrameAsync(this NetMQSocket socket)
215236

216237
TaskCompletionSource<bool> source = new TaskCompletionSource<bool>();
217238

239+
CancellationTokenRegistration? registration = null;
240+
if (cancellationToken.CanBeCanceled)
241+
{
242+
registration = cancellationToken.Register(PropagateCancel);
243+
}
244+
218245
void Listener(object sender, NetMQSocketEventArgs args)
219246
{
220247
if (socket.TryReceive(ref msg, TimeSpan.Zero))
221248
{
222249
bool more = msg.HasMore;
223250
msg.Close();
251+
224252
socket.ReceiveReady -= Listener;
225-
source.SetResult(more);
253+
registration?.Dispose();
254+
source.TrySetResult(more);
226255
}
227256
}
228257

258+
void PropagateCancel()
259+
{
260+
socket.ReceiveReady -= Listener;
261+
registration?.Dispose();
262+
source.TrySetCanceled();
263+
}
264+
229265
socket.ReceiveReady += Listener;
230266

231267
return source.Task;
232268
}
233269

234270

235-
#endregion
271+
#endregion
236272

237-
#region Skipping all frames of a multipart message
273+
#region Skipping all frames of a multipart message
238274

239275
/// <summary>
240276
/// Receive all frames of the next message from <paramref name="socket"/>, asynchronously, then ignore their contents.
@@ -251,9 +287,9 @@ public static async Task SkipMultipartMessageAsync(this NetMQSocket socket)
251287
}
252288

253289

254-
#endregion
290+
#endregion
255291

256-
#region Receiving a routing key
292+
#region Receiving a routing key
257293

258294
/// <summary>
259295
/// Receive a routing-key from <paramref name="socket"/>, blocking until one arrives.
@@ -268,7 +304,7 @@ public static async Task SkipMultipartMessageAsync(this NetMQSocket socket)
268304
return (new RoutingKey(bytes), more);
269305
}
270306

271-
#endregion
307+
#endregion
272308
}
273309
}
274310

0 commit comments

Comments
 (0)