Skip to content

Commit e3d081e

Browse files
authored
feat(ITcpSocketClient): add concurrency connect logic (#6377)
* feat: 增加锁控制并发 * refactor: 合并重连方法提高可读性 * feat: 使用 SemaphoreSlim 信号量控制重连并发逻辑 * test: 增加重连单元测试
1 parent 84764e3 commit e3d081e

File tree

2 files changed

+130
-32
lines changed

2 files changed

+130
-32
lines changed

src/BootstrapBlazor/Services/TcpSocket/TcpSocketClientBase.cs

Lines changed: 59 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ public abstract class TcpSocketClientBase(SocketClientOptions options) : ITcpSoc
6565
private CancellationTokenSource? _receiveCancellationTokenSource;
6666
private CancellationTokenSource? _autoConnectTokenSource;
6767

68+
private readonly SemaphoreSlim _semaphoreSlim = new(1, 1);
69+
6870
/// <summary>
6971
/// <inheritdoc/>
7072
/// </summary>
@@ -78,49 +80,72 @@ public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken
7880
return true;
7981
}
8082

83+
var connectionToken = GenerateConnectionToken(token);
84+
try
85+
{
86+
await _semaphoreSlim.WaitAsync(connectionToken).ConfigureAwait(false);
87+
}
88+
catch (OperationCanceledException)
89+
{
90+
// 如果信号量等待被取消,则直接返回 IsConnected
91+
// 不管是超时还是被取消,都不需要重连,肯定有其他线程在连接中
92+
return IsConnected;
93+
}
94+
95+
if (IsConnected)
96+
{
97+
_semaphoreSlim.Release();
98+
return true;
99+
}
100+
101+
var reconnect = true;
81102
var ret = false;
82103
SocketClientProvider = ServiceProvider?.GetRequiredService<ISocketClientProvider>()
83104
?? throw new InvalidOperationException("SocketClientProvider is not registered in the service provider.");
84105

85106
try
86107
{
87-
ret = await ConnectCoreAsync(SocketClientProvider, endPoint, token);
88-
if (ret == false)
89-
{
90-
Reconnect();
91-
}
108+
ret = await ConnectCoreAsync(SocketClientProvider, endPoint, connectionToken);
92109
}
93110
catch (OperationCanceledException ex)
94111
{
95112
if (token.IsCancellationRequested)
96113
{
97114
Log(LogLevel.Warning, ex, $"TCP Socket connect operation was canceled from {LocalEndPoint} to {endPoint}");
115+
reconnect = false;
98116
}
99117
else
100118
{
101119
Log(LogLevel.Warning, ex, $"TCP Socket connect operation timed out from {LocalEndPoint} to {endPoint}");
102-
Reconnect();
103120
}
104121
}
105122
catch (Exception ex)
106123
{
107124
Log(LogLevel.Error, ex, $"TCP Socket connection failed from {LocalEndPoint} to {endPoint}");
125+
}
126+
127+
// 释放信号量
128+
_semaphoreSlim.Release();
129+
130+
if (!ret && reconnect)
131+
{
108132
Reconnect();
109133
}
134+
110135
return ret;
111136
}
112137

113138
private void Reconnect()
114139
{
115-
if (options.IsAutoReconnect)
140+
if (options.IsAutoReconnect && _remoteEndPoint != null)
116141
{
117142
Task.Run(async () =>
118143
{
119144
try
120145
{
121146
_autoConnectTokenSource ??= new();
122147
await Task.Delay(options.ReconnectInterval, _autoConnectTokenSource.Token).ConfigureAwait(false);
123-
HandleDisconnection();
148+
await ConnectAsync(_remoteEndPoint, _autoConnectTokenSource.Token).ConfigureAwait(false);
124149
}
125150
catch { }
126151
}, CancellationToken.None).ConfigureAwait(false);
@@ -138,14 +163,7 @@ private async ValueTask<bool> ConnectCoreAsync(ISocketClientProvider provider, I
138163
_localEndPoint = Options.LocalEndPoint;
139164
_remoteEndPoint = endPoint;
140165

141-
var connectionToken = token;
142-
if (Options.ConnectTimeout > 0)
143-
{
144-
// 设置连接超时时间
145-
var connectTokenSource = new CancellationTokenSource(options.ConnectTimeout);
146-
connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token;
147-
}
148-
var ret = await provider.ConnectAsync(endPoint, connectionToken);
166+
var ret = await provider.ConnectAsync(endPoint, token);
149167

150168
if (ret)
151169
{
@@ -159,13 +177,16 @@ private async ValueTask<bool> ConnectCoreAsync(ISocketClientProvider provider, I
159177
return ret;
160178
}
161179

162-
private void HandleDisconnection()
180+
private CancellationToken GenerateConnectionToken(CancellationToken token)
163181
{
164-
if (options.IsAutoReconnect && _remoteEndPoint != null)
182+
var connectionToken = token;
183+
if (Options.ConnectTimeout > 0)
165184
{
166-
_autoConnectTokenSource ??= new();
167-
_ = Task.Run(() => ConnectAsync(_remoteEndPoint, _autoConnectTokenSource.Token)).ConfigureAwait(false);
185+
// 设置连接超时时间
186+
var connectTokenSource = new CancellationTokenSource(options.ConnectTimeout);
187+
connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token;
168188
}
189+
return connectionToken;
169190
}
170191

171192
/// <summary>
@@ -182,6 +203,7 @@ public virtual async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, Cancel
182203
}
183204

184205
var ret = false;
206+
var reconnect = true;
185207
try
186208
{
187209
var sendToken = token;
@@ -192,15 +214,12 @@ public virtual async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, Cancel
192214
sendToken = CancellationTokenSource.CreateLinkedTokenSource(token, sendTokenSource.Token).Token;
193215
}
194216
ret = await SocketClientProvider.SendAsync(data, sendToken);
195-
if (ret == false)
196-
{
197-
HandleDisconnection();
198-
}
199217
}
200218
catch (OperationCanceledException ex)
201219
{
202220
if (token.IsCancellationRequested)
203221
{
222+
reconnect = false;
204223
Log(LogLevel.Warning, ex, $"TCP Socket send operation was canceled from {_localEndPoint} to {_remoteEndPoint}");
205224
}
206225
else
@@ -211,14 +230,18 @@ public virtual async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, Cancel
211230
catch (Exception ex)
212231
{
213232
Log(LogLevel.Error, ex, $"TCP Socket send failed from {_localEndPoint} to {_remoteEndPoint}");
214-
215-
HandleDisconnection();
216233
}
217234

218235
if (options.EnableLog)
219236
{
220237
Log(LogLevel.Information, null, $"Sending data from {_localEndPoint} to {_remoteEndPoint}, Data Length: {data.Length} Data Content: {BitConverter.ToString(data.ToArray())} Result: {ret}");
221238
}
239+
240+
if (!ret && reconnect)
241+
{
242+
// 如果发送失败并且需要重连则尝试重连
243+
Reconnect();
244+
}
222245
return ret;
223246
}
224247

@@ -244,7 +267,7 @@ public virtual async ValueTask<Memory<byte>> ReceiveAsync(CancellationToken toke
244267
var len = await ReceiveCoreAsync(SocketClientProvider, buffer, token);
245268
if (len == 0)
246269
{
247-
HandleDisconnection();
270+
Reconnect();
248271
}
249272
return buffer[..len];
250273
}
@@ -270,11 +293,12 @@ private async ValueTask AutoReceiveAsync()
270293
}
271294
}
272295

273-
HandleDisconnection();
296+
Reconnect();
274297
}
275298

276299
private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memory<byte> buffer, CancellationToken token)
277300
{
301+
var reconnect = true;
278302
var len = 0;
279303
try
280304
{
@@ -292,8 +316,6 @@ private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memo
292316
// 远端主机关闭链路
293317
Log(LogLevel.Information, null, $"TCP Socket {_localEndPoint} received 0 data closed by {_remoteEndPoint}");
294318
buffer = Memory<byte>.Empty;
295-
296-
HandleDisconnection();
297319
}
298320
else
299321
{
@@ -311,6 +333,7 @@ private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memo
311333
if (token.IsCancellationRequested)
312334
{
313335
Log(LogLevel.Warning, ex, $"TCP Socket receive operation canceled from {_localEndPoint} to {_remoteEndPoint}");
336+
reconnect = false;
314337
}
315338
else
316339
{
@@ -320,14 +343,18 @@ private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memo
320343
catch (Exception ex)
321344
{
322345
Log(LogLevel.Error, ex, $"TCP Socket receive failed from {_localEndPoint} to {_remoteEndPoint}");
323-
324-
HandleDisconnection();
325346
}
326347

327348
if (options.EnableLog)
328349
{
329350
Log(LogLevel.Information, null, $"Receiving data from {_localEndPoint} to {_remoteEndPoint}, Data Length: {len} Data Content: {BitConverter.ToString(buffer.ToArray())}");
330351
}
352+
353+
if (len == 0 && reconnect)
354+
{
355+
// 如果接收数据长度为 0 并且需要重连则尝试重连
356+
Reconnect();
357+
}
331358
return len;
332359
}
333360

test/UnitTest/Services/TcpSocketFactoryTest.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,18 @@ public async Task ConnectAsync_Cancel()
6969
var cst = new CancellationTokenSource();
7070
cst.Cancel();
7171
var connect = await client.ConnectAsync("localhost", 9999, cst.Token);
72+
73+
// 由于信号量被取消,所以连接会失败
74+
Assert.False(connect);
75+
76+
// 测试真正的连接被取消逻辑
77+
cst = new CancellationTokenSource();
78+
_ = Task.Run(async () =>
79+
{
80+
await Task.Delay(50);
81+
cst.Cancel();
82+
});
83+
connect = await client.ConnectAsync("localhost", 9999, cst.Token);
7284
Assert.False(connect);
7385
}
7486

@@ -102,6 +114,29 @@ public async Task ConnectAsync_Error()
102114
methodInfo.Invoke(client, [LogLevel.Error, null!, "Test error log"]);
103115
}
104116

117+
[Fact]
118+
public async Task ConnectAsync_Lock()
119+
{
120+
// 测试并发锁问题
121+
var provider = new MockAutoReconnectLockSocketProvider();
122+
var client = CreateClient(builder =>
123+
{
124+
builder.AddTransient<ISocketClientProvider>(p => provider);
125+
});
126+
127+
// 开 5 个线程同时连接
128+
_ = Task.Run(async () =>
129+
{
130+
// 延时 150 保证有一个连接失败
131+
await Task.Delay(150);
132+
provider.SetConnected(true);
133+
});
134+
var results = await Task.WhenAll(Enumerable.Range(1, 5).Select(i => client.ConnectAsync("localhost", 0).AsTask()));
135+
// 期望结果是 1个 false 4个 true
136+
Assert.Equal(1, results.Count(r => !r));
137+
Assert.Equal(4, results.Count(r => r));
138+
}
139+
105140
[Fact]
106141
public async Task Send_Timeout()
107142
{
@@ -432,6 +467,7 @@ public async Task AutoReconnect_Cancel()
432467
});
433468

434469
await client.ConnectAsync("localhost", 0);
470+
await Task.Delay(100);
435471
await client.DisposeAsync();
436472
}
437473

@@ -912,6 +948,41 @@ public async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationTo
912948
}
913949
}
914950

951+
class MockAutoReconnectLockSocketProvider : ISocketClientProvider
952+
{
953+
public bool IsConnected { get; private set; }
954+
955+
public IPEndPoint LocalEndPoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 0);
956+
957+
public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken token = default)
958+
{
959+
await Task.Delay(100, token);
960+
return IsConnected;
961+
}
962+
963+
public ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
964+
{
965+
return ValueTask.FromResult(true);
966+
}
967+
968+
public ValueTask<int> ReceiveAsync(Memory<byte> buffer, CancellationToken token = default)
969+
{
970+
byte[] data = [1, 2, 3, 4, 5];
971+
data.CopyTo(buffer);
972+
return ValueTask.FromResult(5);
973+
}
974+
975+
public ValueTask CloseAsync()
976+
{
977+
return ValueTask.CompletedTask;
978+
}
979+
980+
public void SetConnected(bool state)
981+
{
982+
IsConnected = state;
983+
}
984+
}
985+
915986
class MockAutoReconnectSocketProvider : ISocketClientProvider
916987
{
917988
public bool IsConnected { get; private set; }

0 commit comments

Comments
 (0)