Skip to content

Commit 45c9979

Browse files
authored
feat(ITcpSocketClient): add Timeout parameter (#6318)
* feat: 增加 ReceiveAsync 方法 * doc: 增加默认值说明 * feat: 增加是否自动接收参数 * feat: 增加超时设置 * feat: 根据接口增加新设置 * feat: 增加接收数据实现 * feat: 缓存更改为 64K * feat: 增加连接超时功能 * test: 增加连接超时单元测试 * feat: 实现发送超时逻辑 * feat: 增加接收数据逻辑 * test: 增加单元测试 * test: 增加重连功能 * test: 更新单元测试 * feat: LocalEndPoint 更改未可为空
1 parent 342ff51 commit 45c9979

File tree

7 files changed

+327
-68
lines changed

7 files changed

+327
-68
lines changed

src/BootstrapBlazor/Services/TcpSocket/DataPackage/DataPackageHandlerBase.cs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,33 +16,28 @@ public abstract class DataPackageHandlerBase : IDataPackageHandler
1616
private Memory<byte> _lastReceiveBuffer = Memory<byte>.Empty;
1717

1818
/// <summary>
19-
/// Gets or sets the callback function to handle received data.
19+
/// <inheritdoc/>
2020
/// </summary>
21-
/// <remarks>The callback function should be designed to handle the received data efficiently and
22-
/// asynchronously. Ensure that the implementation does not block or perform long-running operations, as this may
23-
/// impact performance.</remarks>
2421
public Func<ReadOnlyMemory<byte>, ValueTask>? ReceivedCallBack { get; set; }
2522

2623
/// <summary>
27-
/// Sends the specified data asynchronously to the target destination.
24+
/// <inheritdoc/>
2825
/// </summary>
29-
/// <remarks>The method performs an asynchronous operation to send the provided data. The caller must
30-
/// ensure that the data is valid and non-empty. The returned memory block may contain a response or acknowledgment
31-
/// depending on the implementation of the target destination.</remarks>
32-
/// <param name="data">The data to be sent, represented as a block of memory.</param>
33-
/// <returns>A task that represents the asynchronous operation. The task result contains a <see cref="Memory{T}"/> of <see
34-
/// cref="byte"/> representing the response or acknowledgment received from the target destination.</returns>
35-
public virtual ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data)
26+
/// <param name="data"></param>
27+
/// <param name="token"></param>
28+
/// <returns></returns>
29+
public virtual ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
3630
{
3731
return ValueTask.FromResult(data);
3832
}
3933

4034
/// <summary>
41-
/// Processes the received data asynchronously.
35+
/// <inheritdoc/>
4236
/// </summary>
43-
/// <param name="data">A memory buffer containing the data to be processed. The buffer must not be empty.</param>
44-
/// <returns>A task that represents the asynchronous operation.</returns>
45-
public virtual ValueTask ReceiveAsync(ReadOnlyMemory<byte> data)
37+
/// <param name="data"></param>
38+
/// <param name="token"></param>
39+
/// <returns></returns>
40+
public virtual ValueTask ReceiveAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
4641
{
4742
return ValueTask.CompletedTask;
4843
}

src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ public DelimiterDataPackageHandler(byte[] delimiter)
5050
/// <inheritdoc/>
5151
/// </summary>
5252
/// <param name="data"></param>
53+
/// <param name="token"></param>
5354
/// <returns></returns>
54-
public override async ValueTask ReceiveAsync(ReadOnlyMemory<byte> data)
55+
public override async ValueTask ReceiveAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
5556
{
5657
data = ConcatBuffer(data);
5758

src/BootstrapBlazor/Services/TcpSocket/DataPackage/FixLengthDataPackageHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ public class FixLengthDataPackageHandler(int length) : DataPackageHandlerBase
2222
/// <inheritdoc/>
2323
/// </summary>
2424
/// <param name="data"></param>
25+
/// <param name="token"></param>
2526
/// <returns></returns>
26-
public override async ValueTask ReceiveAsync(ReadOnlyMemory<byte> data)
27+
public override async ValueTask ReceiveAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
2728
{
2829
while (data.Length > 0)
2930
{

src/BootstrapBlazor/Services/TcpSocket/DataPackage/IDataPackageHandler.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,19 @@ public interface IDataPackageHandler
2525
/// ensure that the data is valid and non-empty. The returned memory block may contain a response or acknowledgment
2626
/// depending on the implementation of the target destination.</remarks>
2727
/// <param name="data">The data to be sent, represented as a block of memory.</param>
28+
/// <param name="token">An optional <see cref="CancellationToken"/> to observe while waiting for the operation to complete.</param>
2829
/// <returns>A task that represents the asynchronous operation. The task result contains a <see cref="Memory{T}"/> of <see
2930
/// cref="byte"/> representing the response or acknowledgment received from the target destination.</returns>
30-
ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data);
31+
ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data, CancellationToken token = default);
3132

3233
/// <summary>
3334
/// Asynchronously receives data from a source and writes it into the provided memory buffer.
3435
/// </summary>
3536
/// <remarks>This method does not guarantee that the entire buffer will be filled. The number of bytes
3637
/// written depends on the availability of data.</remarks>
3738
/// <param name="data">The memory buffer to store the received data. The buffer must be writable and have sufficient capacity.</param>
39+
/// <param name="token">A cancellation token that can be used to cancel the operation. The default value is <see langword="default"/>.</param>
3840
/// <returns>A task that represents the asynchronous operation. The task result contains the number of bytes written to the
3941
/// buffer. Returns 0 if the end of the data stream is reached.</returns>
40-
ValueTask ReceiveAsync(ReadOnlyMemory<byte> data);
42+
ValueTask ReceiveAsync(ReadOnlyMemory<byte> data, CancellationToken token = default);
4143
}

src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs

Lines changed: 126 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
namespace BootstrapBlazor.Components;
1313

1414
[UnsupportedOSPlatform("browser")]
15-
sealed class DefaultTcpSocketClient(IPEndPoint endPoint) : ITcpSocketClient
15+
sealed class DefaultTcpSocketClient(IPEndPoint localEndPoint) : ITcpSocketClient
1616
{
1717
private TcpClient? _client;
1818
private IDataPackageHandler? _dataPackageHandler;
@@ -21,15 +21,23 @@ sealed class DefaultTcpSocketClient(IPEndPoint endPoint) : ITcpSocketClient
2121

2222
public bool IsConnected => _client?.Connected ?? false;
2323

24-
public IPEndPoint LocalEndPoint { get; set; } = endPoint;
24+
public IPEndPoint? LocalEndPoint { get; set; }
2525

2626
[NotNull]
2727
public ILogger<DefaultTcpSocketClient>? Logger { get; set; }
2828

29-
public int ReceiveBufferSize { get; set; } = 1024 * 10;
29+
public int ReceiveBufferSize { get; set; } = 1024 * 64;
30+
31+
public bool IsAutoReceive { get; set; } = true;
3032

3133
public Func<ReadOnlyMemory<byte>, ValueTask>? ReceivedCallBack { get; set; }
3234

35+
public int ConnectTimeout { get; set; }
36+
37+
public int SendTimeout { get; set; }
38+
39+
public int ReceiveTimeout { get; set; }
40+
3341
public void SetDataHandler(IDataPackageHandler handler)
3442
{
3543
_dataPackageHandler = handler;
@@ -44,19 +52,37 @@ public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken
4452
Close();
4553

4654
// 创建新的 TcpClient 实例
47-
_client ??= new TcpClient(LocalEndPoint);
48-
await _client.ConnectAsync(endPoint, token);
55+
_client ??= new TcpClient(localEndPoint);
4956

50-
// 开始接收数据
51-
_ = Task.Run(ReceiveAsync, token);
57+
var connectionToken = token;
58+
if (ConnectTimeout > 0)
59+
{
60+
// 设置连接超时时间
61+
var connectTokenSource = new CancellationTokenSource(ConnectTimeout);
62+
connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token;
63+
}
64+
await _client.ConnectAsync(endPoint, connectionToken);
5265

66+
// 设置本地以及远端端点信息
5367
LocalEndPoint = (IPEndPoint)_client.Client.LocalEndPoint!;
5468
_remoteEndPoint = endPoint;
69+
70+
if (IsAutoReceive)
71+
{
72+
_ = Task.Run(AutoReceiveAsync);
73+
}
5574
ret = true;
5675
}
5776
catch (OperationCanceledException ex)
5877
{
59-
Logger.LogWarning(ex, "TCP Socket connect operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint);
78+
if (token.IsCancellationRequested)
79+
{
80+
Logger.LogWarning(ex, "TCP Socket connect operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint);
81+
}
82+
else
83+
{
84+
Logger.LogWarning(ex, "TCP Socket connect operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint);
85+
}
6086
}
6187
catch (Exception ex)
6288
{
@@ -75,17 +101,34 @@ public async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationTo
75101
var ret = false;
76102
try
77103
{
104+
var stream = _client.GetStream();
105+
106+
var sendToken = token;
107+
if (SendTimeout > 0)
108+
{
109+
// 设置发送超时时间
110+
var sendTokenSource = new CancellationTokenSource(SendTimeout);
111+
sendToken = CancellationTokenSource.CreateLinkedTokenSource(token, sendTokenSource.Token).Token;
112+
}
113+
78114
if (_dataPackageHandler != null)
79115
{
80-
data = await _dataPackageHandler.SendAsync(data);
116+
data = await _dataPackageHandler.SendAsync(data, sendToken);
81117
}
82-
var stream = _client.GetStream();
83-
await stream.WriteAsync(data, token);
118+
119+
await stream.WriteAsync(data, sendToken);
84120
ret = true;
85121
}
86122
catch (OperationCanceledException ex)
87123
{
88-
Logger.LogWarning(ex, "TCP Socket send operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
124+
if (token.IsCancellationRequested)
125+
{
126+
Logger.LogWarning(ex, "TCP Socket send operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
127+
}
128+
else
129+
{
130+
Logger.LogWarning(ex, "TCP Socket send operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
131+
}
89132
}
90133
catch (Exception ex)
91134
{
@@ -94,7 +137,25 @@ public async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationTo
94137
return ret;
95138
}
96139

97-
private async ValueTask ReceiveAsync()
140+
public async ValueTask<Memory<byte>> ReceiveAsync(CancellationToken token = default)
141+
{
142+
if (_client == null || !_client.Connected)
143+
{
144+
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
145+
}
146+
147+
if (IsAutoReceive)
148+
{
149+
throw new InvalidOperationException("Cannot call ReceiveAsync when IsAutoReceive is true. Use the auto-receive mechanism instead.");
150+
}
151+
152+
using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
153+
var buffer = block.Memory;
154+
var len = await ReceiveCoreAsync(_client, buffer, token);
155+
return buffer[0..len];
156+
}
157+
158+
private async ValueTask AutoReceiveAsync()
98159
{
99160
_receiveCancellationTokenSource ??= new();
100161
while (_receiveCancellationTokenSource is { IsCancellationRequested: false })
@@ -104,42 +165,67 @@ private async ValueTask ReceiveAsync()
104165
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
105166
}
106167

107-
try
168+
using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
169+
var buffer = block.Memory;
170+
var len = await ReceiveCoreAsync(_client, buffer, _receiveCancellationTokenSource.Token);
171+
if (len == 0)
172+
{
173+
break;
174+
}
175+
}
176+
}
177+
178+
private async ValueTask<int> ReceiveCoreAsync(TcpClient client, Memory<byte> buffer, CancellationToken token)
179+
{
180+
var len = 0;
181+
try
182+
{
183+
var stream = client.GetStream();
184+
185+
var receiveToken = token;
186+
if (ReceiveTimeout > 0)
108187
{
109-
using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
110-
var buffer = block.Memory;
111-
var stream = _client.GetStream();
112-
var len = await stream.ReadAsync(buffer, _receiveCancellationTokenSource.Token);
113-
if (len == 0)
188+
// 设置接收超时时间
189+
var receiveTokenSource = new CancellationTokenSource(ReceiveTimeout);
190+
receiveToken = CancellationTokenSource.CreateLinkedTokenSource(receiveToken, receiveTokenSource.Token).Token;
191+
}
192+
len = await stream.ReadAsync(buffer, receiveToken);
193+
if (len == 0)
194+
{
195+
// 远端主机关闭链路
196+
Logger.LogInformation("TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
197+
}
198+
else
199+
{
200+
buffer = buffer[..len];
201+
202+
if (ReceivedCallBack != null)
114203
{
115-
// 远端主机关闭链路
116-
Logger.LogInformation("TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
117-
break;
204+
await ReceivedCallBack(buffer);
118205
}
119-
else
120-
{
121-
buffer = buffer[..len];
122-
123-
if (ReceivedCallBack != null)
124-
{
125-
await ReceivedCallBack(buffer);
126-
}
127206

128-
if (_dataPackageHandler != null)
129-
{
130-
await _dataPackageHandler.ReceiveAsync(buffer);
131-
}
207+
if (_dataPackageHandler != null)
208+
{
209+
await _dataPackageHandler.ReceiveAsync(buffer, receiveToken);
132210
}
133211
}
134-
catch (OperationCanceledException ex)
212+
}
213+
catch (OperationCanceledException ex)
214+
{
215+
if (token.IsCancellationRequested)
135216
{
136-
Logger.LogWarning(ex, "TCP Socket receive operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
217+
Logger.LogWarning(ex, "TCP Socket receive operation canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
137218
}
138-
catch (Exception ex)
219+
else
139220
{
140-
Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
221+
Logger.LogWarning(ex, "TCP Socket receive operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
141222
}
142223
}
224+
catch (Exception ex)
225+
{
226+
Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
227+
}
228+
return len;
143229
}
144230

145231
public void Close()
@@ -151,10 +237,11 @@ private void Dispose(bool disposing)
151237
{
152238
if (disposing)
153239
{
240+
LocalEndPoint = null;
154241
_remoteEndPoint = null;
155242

156243
// 取消接收数据的任务
157-
if (_receiveCancellationTokenSource is not null)
244+
if (_receiveCancellationTokenSource != null)
158245
{
159246
_receiveCancellationTokenSource.Cancel();
160247
_receiveCancellationTokenSource.Dispose();

0 commit comments

Comments
 (0)