1
1
using System ;
2
2
using System . Text ;
3
+ using System . Threading ;
3
4
using System . Threading . Tasks ;
4
5
using JetBrains . Annotations ;
5
6
@@ -18,19 +19,26 @@ public static class ReceiveThreadSafeSocketExtensions
18
19
/// Receive a bytes from <paramref name="socket"/>, blocking until one arrives.
19
20
/// </summary>
20
21
/// <param name="socket">The socket to receive from.</param>
22
+ /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
21
23
/// <returns>The content of the received message.</returns>
24
+ /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
22
25
[ NotNull ]
23
- public static byte [ ] ReceiveBytes ( [ NotNull ] this IThreadSafeInSocket socket )
26
+ public static byte [ ] ReceiveBytes ( [ NotNull ] this IThreadSafeInSocket socket ,
27
+ CancellationToken cancellationToken = default )
24
28
{
25
29
var msg = new Msg ( ) ;
26
30
msg . InitEmpty ( ) ;
27
31
28
- socket . Receive ( ref msg ) ;
29
-
30
- var data = msg . CloneData ( ) ;
31
-
32
- msg . Close ( ) ;
33
- return data ;
32
+ try
33
+ {
34
+ socket . Receive ( ref msg , cancellationToken ) ;
35
+ var data = msg . CloneData ( ) ;
36
+ return data ;
37
+ }
38
+ finally
39
+ {
40
+ msg . Close ( ) ;
41
+ }
34
42
}
35
43
36
44
#endregion
@@ -60,14 +68,16 @@ public static bool TryReceiveBytes([NotNull] this IThreadSafeInSocket socket, ou
60
68
/// <param name="socket">The socket to receive from.</param>
61
69
/// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
62
70
/// <param name="bytes">The content of the received message, or <c>null</c> if no message was available.</param>
71
+ /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
63
72
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
73
+ /// <remarks>The method would return false if cancellation has had requested.</remarks>
64
74
public static bool TryReceiveBytes ( [ NotNull ] this IThreadSafeInSocket socket , TimeSpan timeout ,
65
- out byte [ ] bytes )
75
+ out byte [ ] bytes , CancellationToken cancellationToken = default )
66
76
{
67
77
var msg = new Msg ( ) ;
68
78
msg . InitEmpty ( ) ;
69
79
70
- if ( ! socket . TryReceive ( ref msg , timeout ) )
80
+ if ( ! socket . TryReceive ( ref msg , timeout , cancellationToken ) )
71
81
{
72
82
msg . Close ( ) ;
73
83
bytes = null ;
@@ -88,15 +98,20 @@ public static bool TryReceiveBytes([NotNull] this IThreadSafeInSocket socket, Ti
88
98
/// Receive a bytes from <paramref name="socket"/> asynchronously.
89
99
/// </summary>
90
100
/// <param name="socket">The socket to receive from.</param>
101
+ /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
91
102
/// <returns>The content of the received message.</returns>
92
- public static ValueTask < byte [ ] > ReceiveBytesAsync ( [ NotNull ] this IThreadSafeInSocket socket )
103
+ /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
104
+ public static ValueTask < byte [ ] > ReceiveBytesAsync ( [ NotNull ] this IThreadSafeInSocket socket ,
105
+ CancellationToken cancellationToken = default )
93
106
{
94
107
if ( TryReceiveBytes ( socket , out var bytes ) )
95
108
return new ValueTask < byte [ ] > ( bytes ) ;
96
109
97
110
// TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously
98
111
// and probably implement IValueTaskSource
99
- return new ValueTask < byte [ ] > ( Task . Factory . StartNew ( socket . ReceiveBytes , TaskCreationOptions . LongRunning ) ) ;
112
+ // TODO: should we avoid lambda here as it cause heap allocation for the environment?
113
+ return new ValueTask < byte [ ] > ( Task . Factory . StartNew ( ( ) => socket . ReceiveBytes ( cancellationToken ) ,
114
+ cancellationToken , TaskCreationOptions . LongRunning , TaskScheduler . Default ) ) ;
100
115
}
101
116
102
117
#endregion
@@ -111,27 +126,32 @@ public static ValueTask<byte[]> ReceiveBytesAsync([NotNull] this IThreadSafeInSo
111
126
/// Receive a string from <paramref name="socket"/>, blocking until one arrives, and decode using <see cref="SendReceiveConstants.DefaultEncoding"/>.
112
127
/// </summary>
113
128
/// <param name="socket">The socket to receive from.</param>
129
+ /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
114
130
/// <returns>The content of the received message.</returns>
115
- public static string ReceiveString ( [ NotNull ] this IThreadSafeInSocket socket )
131
+ /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
132
+ public static string ReceiveString ( [ NotNull ] this IThreadSafeInSocket socket ,
133
+ CancellationToken cancellationToken = default )
116
134
{
117
- return socket . ReceiveString ( SendReceiveConstants . DefaultEncoding ) ;
135
+ return socket . ReceiveString ( SendReceiveConstants . DefaultEncoding , cancellationToken ) ;
118
136
}
119
137
120
138
/// <summary>
121
139
/// Receive a string from <paramref name="socket"/>, blocking until one arrives, and decode using <paramref name="encoding"/>.
122
140
/// </summary>
123
141
/// <param name="socket">The socket to receive from.</param>
124
142
/// <param name="encoding">The encoding used to convert the data to a string.</param>
143
+ /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
125
144
/// <returns>The content of the received message.</returns>
126
- public static string ReceiveString ( [ NotNull ] this IThreadSafeInSocket socket , [ NotNull ] Encoding encoding )
145
+ /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
146
+ public static string ReceiveString ( [ NotNull ] this IThreadSafeInSocket socket , [ NotNull ] Encoding encoding ,
147
+ CancellationToken cancellationToken = default )
127
148
{
128
149
var msg = new Msg ( ) ;
129
150
msg . InitEmpty ( ) ;
130
151
131
- socket . Receive ( ref msg ) ;
132
-
133
152
try
134
153
{
154
+ socket . Receive ( ref msg , cancellationToken ) ;
135
155
return msg . Size > 0
136
156
? msg . GetString ( encoding )
137
157
: string . Empty ;
@@ -182,11 +202,14 @@ public static bool TryReceiveString([NotNull] this IThreadSafeInSocket socket, [
182
202
/// </summary>
183
203
/// <param name="socket">The socket to receive from.</param>
184
204
/// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
185
- /// <param name="str">The content of the received message, or <c>null</c> if no message was available.</param>
205
+ /// <param name="str">The conent of the received message, or <c>null</c> if no message was available.</param>
206
+ /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
186
207
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
187
- public static bool TryReceiveString ( [ NotNull ] this IThreadSafeInSocket socket , TimeSpan timeout , out string str )
208
+ /// <remarks>The method would return false if cancellation has had requested.</remarks>
209
+ public static bool TryReceiveString ( [ NotNull ] this IThreadSafeInSocket socket , TimeSpan timeout , out string str ,
210
+ CancellationToken cancellationToken = default )
188
211
{
189
- return socket . TryReceiveString ( timeout , SendReceiveConstants . DefaultEncoding , out str ) ;
212
+ return socket . TryReceiveString ( timeout , SendReceiveConstants . DefaultEncoding , out str , cancellationToken ) ;
190
213
}
191
214
192
215
/// <summary>
@@ -197,14 +220,16 @@ public static bool TryReceiveString([NotNull] this IThreadSafeInSocket socket, T
197
220
/// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
198
221
/// <param name="encoding">The encoding used to convert the data to a string.</param>
199
222
/// <param name="str">The content of the received message, or <c>null</c> if no message was available.</param>
223
+ /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
200
224
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
225
+ /// <remarks>The method would return false if cancellation has had requested.</remarks>
201
226
public static bool TryReceiveString ( [ NotNull ] this IThreadSafeInSocket socket , TimeSpan timeout ,
202
- [ NotNull ] Encoding encoding , out string str )
227
+ [ NotNull ] Encoding encoding , out string str , CancellationToken cancellationToken = default )
203
228
{
204
229
var msg = new Msg ( ) ;
205
230
msg . InitEmpty ( ) ;
206
231
207
- if ( socket . TryReceive ( ref msg , timeout ) )
232
+ if ( socket . TryReceive ( ref msg , timeout , cancellationToken ) )
208
233
{
209
234
try
210
235
{
@@ -215,7 +240,7 @@ public static bool TryReceiveString([NotNull] this IThreadSafeInSocket socket, T
215
240
}
216
241
finally
217
242
{
218
- msg . Close ( ) ;
243
+ msg . Close ( ) ;
219
244
}
220
245
}
221
246
@@ -232,15 +257,19 @@ public static bool TryReceiveString([NotNull] this IThreadSafeInSocket socket, T
232
257
/// Receive a string from <paramref name="socket"/> asynchronously.
233
258
/// </summary>
234
259
/// <param name="socket">The socket to receive from.</param>
260
+ /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
235
261
/// <returns>The content of the received message.</returns>
236
- public static ValueTask < string > ReceiveStringAsync ( [ NotNull ] this IThreadSafeInSocket socket )
262
+ /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
263
+ public static ValueTask < string > ReceiveStringAsync ( [ NotNull ] this IThreadSafeInSocket socket ,
264
+ CancellationToken cancellationToken = default )
237
265
{
238
266
if ( TryReceiveString ( socket , out var msg ) )
239
267
return new ValueTask < string > ( msg ) ;
240
268
241
269
// TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously
242
270
// and probably implement IValueTaskSource
243
- return new ValueTask < string > ( Task . Factory . StartNew ( socket . ReceiveString , TaskCreationOptions . LongRunning ) ) ;
271
+ return new ValueTask < string > ( Task . Factory . StartNew ( ( ) => socket . ReceiveString ( cancellationToken ) ,
272
+ cancellationToken , TaskCreationOptions . LongRunning , TaskScheduler . Default ) ) ;
244
273
}
245
274
246
275
#endregion
0 commit comments