Skip to content

Commit 84764e3

Browse files
authored
feat(ITcpSocketClient): add AutoConnect parameter (#6372)
* feat: 增加自动重连配置 * refactor: 使用取消令牌 * refactor: 更改参数为 IsAutoReconnect * test: 更新单元测试 * feat: 断线时也需要触发 ReceivedCallBack 回调 * refactor: 提高性能避免线程切换 * feat: 连接方法增加保护 * refactor: 增加 Reconnect 方法 * test: 增加重连单元测试 * feat: 增加发送接收时断线重连机制 * test: 增加发送接收时断开重连单元测试
1 parent 7e2f1f1 commit 84764e3

File tree

4 files changed

+320
-47
lines changed

4 files changed

+320
-47
lines changed

src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClientProvider.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class DefaultSocketClientProvider : ISocketClientProvider
3333
public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken token = default)
3434
{
3535
_client = new TcpClient(LocalEndPoint);
36-
await _client.ConnectAsync(endPoint, token);
36+
await _client.ConnectAsync(endPoint, token).ConfigureAwait(false);
3737
if (_client.Connected)
3838
{
3939
if (_client.Client.LocalEndPoint is IPEndPoint localEndPoint)
@@ -53,7 +53,7 @@ public async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationTo
5353
if (_client != null)
5454
{
5555
var stream = _client.GetStream();
56-
await stream.WriteAsync(data, token);
56+
await stream.WriteAsync(data, token).ConfigureAwait(false);
5757
ret = true;
5858
}
5959
return ret;
@@ -68,7 +68,7 @@ public async ValueTask<int> ReceiveAsync(Memory<byte> buffer, CancellationToken
6868
if (_client is { Connected: true })
6969
{
7070
var stream = _client.GetStream();
71-
len = await stream.ReadAsync(buffer, token);
71+
len = await stream.ReadAsync(buffer, token).ConfigureAwait(false);
7272
}
7373
return len;
7474
}

src/BootstrapBlazor/Services/TcpSocket/SocketClientOptions.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,14 @@ public class SocketClientOptions
5555
/// Gets or sets a value indicating whether logging is enabled. Default value is false.
5656
/// </summary>
5757
public bool EnableLog { get; set; }
58+
59+
/// <summary>
60+
/// Gets or sets a value indicating whether the system should automatically attempt to reconnect after a connection is lost. Default value is false.
61+
/// </summary>
62+
public bool IsAutoReconnect { get; set; }
63+
64+
/// <summary>
65+
/// Gets or sets the interval, in milliseconds, between reconnection attempts. Default value is 5000.
66+
/// </summary>
67+
public int ReconnectInterval { get; set; } = 5000;
5868
}

src/BootstrapBlazor/Services/TcpSocket/TcpSocketClientBase.cs

Lines changed: 110 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public abstract class TcpSocketClientBase(SocketClientOptions options) : ITcpSoc
6363
private IPEndPoint? _remoteEndPoint;
6464
private IPEndPoint? _localEndPoint;
6565
private CancellationTokenSource? _receiveCancellationTokenSource;
66+
private CancellationTokenSource? _autoConnectTokenSource;
6667

6768
/// <summary>
6869
/// <inheritdoc/>
@@ -72,39 +73,21 @@ public abstract class TcpSocketClientBase(SocketClientOptions options) : ITcpSoc
7273
/// <returns></returns>
7374
public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken token = default)
7475
{
76+
if (IsConnected)
77+
{
78+
return true;
79+
}
80+
7581
var ret = false;
7682
SocketClientProvider = ServiceProvider?.GetRequiredService<ISocketClientProvider>()
7783
?? throw new InvalidOperationException("SocketClientProvider is not registered in the service provider.");
7884

7985
try
8086
{
81-
// 释放资源
82-
await CloseAsync();
83-
84-
// 创建新的 TcpClient 实例
85-
SocketClientProvider.LocalEndPoint = Options.LocalEndPoint;
86-
87-
_localEndPoint = Options.LocalEndPoint;
88-
_remoteEndPoint = null;
89-
90-
var connectionToken = token;
91-
if (Options.ConnectTimeout > 0)
87+
ret = await ConnectCoreAsync(SocketClientProvider, endPoint, token);
88+
if (ret == false)
9289
{
93-
// 设置连接超时时间
94-
var connectTokenSource = new CancellationTokenSource(options.ConnectTimeout);
95-
connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token;
96-
}
97-
ret = await SocketClientProvider.ConnectAsync(endPoint, connectionToken);
98-
99-
if (ret)
100-
{
101-
_localEndPoint = SocketClientProvider.LocalEndPoint;
102-
_remoteEndPoint = endPoint;
103-
104-
if (options.IsAutoReceive)
105-
{
106-
_ = Task.Run(AutoReceiveAsync, token);
107-
}
90+
Reconnect();
10891
}
10992
}
11093
catch (OperationCanceledException ex)
@@ -116,15 +99,75 @@ public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken
11699
else
117100
{
118101
Log(LogLevel.Warning, ex, $"TCP Socket connect operation timed out from {LocalEndPoint} to {endPoint}");
102+
Reconnect();
119103
}
120104
}
121105
catch (Exception ex)
122106
{
123107
Log(LogLevel.Error, ex, $"TCP Socket connection failed from {LocalEndPoint} to {endPoint}");
108+
Reconnect();
109+
}
110+
return ret;
111+
}
112+
113+
private void Reconnect()
114+
{
115+
if (options.IsAutoReconnect)
116+
{
117+
Task.Run(async () =>
118+
{
119+
try
120+
{
121+
_autoConnectTokenSource ??= new();
122+
await Task.Delay(options.ReconnectInterval, _autoConnectTokenSource.Token).ConfigureAwait(false);
123+
HandleDisconnection();
124+
}
125+
catch { }
126+
}, CancellationToken.None).ConfigureAwait(false);
127+
}
128+
}
129+
130+
private async ValueTask<bool> ConnectCoreAsync(ISocketClientProvider provider, IPEndPoint endPoint, CancellationToken token)
131+
{
132+
// 释放资源
133+
await CloseAsync();
134+
135+
// 创建新的 TcpClient 实例
136+
provider.LocalEndPoint = Options.LocalEndPoint;
137+
138+
_localEndPoint = Options.LocalEndPoint;
139+
_remoteEndPoint = endPoint;
140+
141+
var connectionToken = token;
142+
if (Options.ConnectTimeout > 0)
143+
{
144+
// 设置连接超时时间
145+
var connectTokenSource = new CancellationTokenSource(options.ConnectTimeout);
146+
connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token;
147+
}
148+
var ret = await provider.ConnectAsync(endPoint, connectionToken);
149+
150+
if (ret)
151+
{
152+
_localEndPoint = provider.LocalEndPoint;
153+
154+
if (options.IsAutoReceive)
155+
{
156+
_ = Task.Run(AutoReceiveAsync, CancellationToken.None).ConfigureAwait(false);
157+
}
124158
}
125159
return ret;
126160
}
127161

162+
private void HandleDisconnection()
163+
{
164+
if (options.IsAutoReconnect && _remoteEndPoint != null)
165+
{
166+
_autoConnectTokenSource ??= new();
167+
_ = Task.Run(() => ConnectAsync(_remoteEndPoint, _autoConnectTokenSource.Token)).ConfigureAwait(false);
168+
}
169+
}
170+
128171
/// <summary>
129172
/// <inheritdoc/>
130173
/// </summary>
@@ -149,6 +192,10 @@ public virtual async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, Cancel
149192
sendToken = CancellationTokenSource.CreateLinkedTokenSource(token, sendTokenSource.Token).Token;
150193
}
151194
ret = await SocketClientProvider.SendAsync(data, sendToken);
195+
if (ret == false)
196+
{
197+
HandleDisconnection();
198+
}
152199
}
153200
catch (OperationCanceledException ex)
154201
{
@@ -164,6 +211,8 @@ public virtual async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, Cancel
164211
catch (Exception ex)
165212
{
166213
Log(LogLevel.Error, ex, $"TCP Socket send failed from {_localEndPoint} to {_remoteEndPoint}");
214+
215+
HandleDisconnection();
167216
}
168217

169218
if (options.EnableLog)
@@ -193,14 +242,18 @@ public virtual async ValueTask<Memory<byte>> ReceiveAsync(CancellationToken toke
193242
using var block = MemoryPool<byte>.Shared.Rent(options.ReceiveBufferSize);
194243
var buffer = block.Memory;
195244
var len = await ReceiveCoreAsync(SocketClientProvider, buffer, token);
245+
if (len == 0)
246+
{
247+
HandleDisconnection();
248+
}
196249
return buffer[..len];
197250
}
198251

199252
private async ValueTask AutoReceiveAsync()
200253
{
201254
// 自动接收方法
202255
_receiveCancellationTokenSource ??= new();
203-
while (true)
256+
while (_receiveCancellationTokenSource is { IsCancellationRequested: false })
204257
{
205258
if (SocketClientProvider is not { IsConnected: true })
206259
{
@@ -216,6 +269,8 @@ private async ValueTask AutoReceiveAsync()
216269
break;
217270
}
218271
}
272+
273+
HandleDisconnection();
219274
}
220275

221276
private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memory<byte> buffer, CancellationToken token)
@@ -236,16 +291,19 @@ private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memo
236291
{
237292
// 远端主机关闭链路
238293
Log(LogLevel.Information, null, $"TCP Socket {_localEndPoint} received 0 data closed by {_remoteEndPoint}");
294+
buffer = Memory<byte>.Empty;
295+
296+
HandleDisconnection();
239297
}
240298
else
241299
{
242300
buffer = buffer[..len];
301+
}
243302

244-
if (ReceivedCallBack != null)
245-
{
246-
// 如果订阅回调则触发回调
247-
await ReceivedCallBack(buffer);
248-
}
303+
if (ReceivedCallBack != null)
304+
{
305+
// 如果订阅回调则触发回调
306+
await ReceivedCallBack(buffer);
249307
}
250308
}
251309
catch (OperationCanceledException ex)
@@ -262,6 +320,8 @@ private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memo
262320
catch (Exception ex)
263321
{
264322
Log(LogLevel.Error, ex, $"TCP Socket receive failed from {_localEndPoint} to {_remoteEndPoint}");
323+
324+
HandleDisconnection();
265325
}
266326

267327
if (options.EnableLog)
@@ -283,9 +343,19 @@ protected virtual void Log(LogLevel logLevel, Exception? ex, string? message)
283343
/// <summary>
284344
/// <inheritdoc/>
285345
/// </summary>
286-
public virtual ValueTask CloseAsync()
346+
public virtual async ValueTask CloseAsync()
287347
{
288-
return DisposeAsync(true);
348+
// 取消接收数据的任务
349+
if (_receiveCancellationTokenSource != null)
350+
{
351+
_receiveCancellationTokenSource.Cancel();
352+
_receiveCancellationTokenSource.Dispose();
353+
_receiveCancellationTokenSource = null;
354+
}
355+
if (SocketClientProvider != null)
356+
{
357+
await SocketClientProvider.CloseAsync();
358+
}
289359
}
290360

291361
/// <summary>
@@ -300,18 +370,15 @@ protected virtual async ValueTask DisposeAsync(bool disposing)
300370
{
301371
if (disposing)
302372
{
303-
// 取消接收数据的任务
304-
if (_receiveCancellationTokenSource != null)
373+
// 取消重连任务
374+
if (_autoConnectTokenSource != null)
305375
{
306-
_receiveCancellationTokenSource.Cancel();
307-
_receiveCancellationTokenSource.Dispose();
308-
_receiveCancellationTokenSource = null;
376+
_autoConnectTokenSource.Cancel();
377+
_autoConnectTokenSource.Dispose();
378+
_autoConnectTokenSource = null;
309379
}
310380

311-
if (SocketClientProvider != null)
312-
{
313-
await SocketClientProvider.CloseAsync();
314-
}
381+
await CloseAsync();
315382
}
316383
}
317384

0 commit comments

Comments
 (0)