Skip to content

Commit 59b781d

Browse files
committed
2 parents e55569a + 1ae8579 commit 59b781d

File tree

4 files changed

+191
-120
lines changed

4 files changed

+191
-120
lines changed

src/BootstrapBlazor/Services/TcpSocket/TcpSocketClientBase.cs

Lines changed: 59 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ public abstract class TcpSocketClientBase(SocketClientOptions options) : ITcpSoc
6565
private CancellationTokenSource? _receiveCancellationTokenSource;
6666
private CancellationTokenSource? _autoConnectTokenSource;
6767

68+
private readonly SemaphoreSlim _semaphoreSlim = new(1, 1);
69+
6870
/// <summary>
6971
/// <inheritdoc/>
7072
/// </summary>
@@ -78,49 +80,72 @@ public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken
7880
return true;
7981
}
8082

83+
var connectionToken = GenerateConnectionToken(token);
84+
try
85+
{
86+
await _semaphoreSlim.WaitAsync(connectionToken).ConfigureAwait(false);
87+
}
88+
catch (OperationCanceledException)
89+
{
90+
// 如果信号量等待被取消,则直接返回 IsConnected
91+
// 不管是超时还是被取消,都不需要重连,肯定有其他线程在连接中
92+
return IsConnected;
93+
}
94+
95+
if (IsConnected)
96+
{
97+
_semaphoreSlim.Release();
98+
return true;
99+
}
100+
101+
var reconnect = true;
81102
var ret = false;
82103
SocketClientProvider = ServiceProvider?.GetRequiredService<ISocketClientProvider>()
83104
?? throw new InvalidOperationException("SocketClientProvider is not registered in the service provider.");
84105

85106
try
86107
{
87-
ret = await ConnectCoreAsync(SocketClientProvider, endPoint, token);
88-
if (ret == false)
89-
{
90-
Reconnect();
91-
}
108+
ret = await ConnectCoreAsync(SocketClientProvider, endPoint, connectionToken);
92109
}
93110
catch (OperationCanceledException ex)
94111
{
95112
if (token.IsCancellationRequested)
96113
{
97114
Log(LogLevel.Warning, ex, $"TCP Socket connect operation was canceled from {LocalEndPoint} to {endPoint}");
115+
reconnect = false;
98116
}
99117
else
100118
{
101119
Log(LogLevel.Warning, ex, $"TCP Socket connect operation timed out from {LocalEndPoint} to {endPoint}");
102-
Reconnect();
103120
}
104121
}
105122
catch (Exception ex)
106123
{
107124
Log(LogLevel.Error, ex, $"TCP Socket connection failed from {LocalEndPoint} to {endPoint}");
125+
}
126+
127+
// 释放信号量
128+
_semaphoreSlim.Release();
129+
130+
if (!ret && reconnect)
131+
{
108132
Reconnect();
109133
}
134+
110135
return ret;
111136
}
112137

113138
private void Reconnect()
114139
{
115-
if (options.IsAutoReconnect)
140+
if (options.IsAutoReconnect && _remoteEndPoint != null)
116141
{
117142
Task.Run(async () =>
118143
{
119144
try
120145
{
121146
_autoConnectTokenSource ??= new();
122147
await Task.Delay(options.ReconnectInterval, _autoConnectTokenSource.Token).ConfigureAwait(false);
123-
HandleDisconnection();
148+
await ConnectAsync(_remoteEndPoint, _autoConnectTokenSource.Token).ConfigureAwait(false);
124149
}
125150
catch { }
126151
}, CancellationToken.None).ConfigureAwait(false);
@@ -138,14 +163,7 @@ private async ValueTask<bool> ConnectCoreAsync(ISocketClientProvider provider, I
138163
_localEndPoint = Options.LocalEndPoint;
139164
_remoteEndPoint = endPoint;
140165

141-
var connectionToken = token;
142-
if (Options.ConnectTimeout > 0)
143-
{
144-
// 设置连接超时时间
145-
var connectTokenSource = new CancellationTokenSource(options.ConnectTimeout);
146-
connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token;
147-
}
148-
var ret = await provider.ConnectAsync(endPoint, connectionToken);
166+
var ret = await provider.ConnectAsync(endPoint, token);
149167

150168
if (ret)
151169
{
@@ -159,13 +177,16 @@ private async ValueTask<bool> ConnectCoreAsync(ISocketClientProvider provider, I
159177
return ret;
160178
}
161179

162-
private void HandleDisconnection()
180+
private CancellationToken GenerateConnectionToken(CancellationToken token)
163181
{
164-
if (options.IsAutoReconnect && _remoteEndPoint != null)
182+
var connectionToken = token;
183+
if (Options.ConnectTimeout > 0)
165184
{
166-
_autoConnectTokenSource ??= new();
167-
_ = Task.Run(() => ConnectAsync(_remoteEndPoint, _autoConnectTokenSource.Token)).ConfigureAwait(false);
185+
// 设置连接超时时间
186+
var connectTokenSource = new CancellationTokenSource(options.ConnectTimeout);
187+
connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token;
168188
}
189+
return connectionToken;
169190
}
170191

171192
/// <summary>
@@ -182,6 +203,7 @@ public virtual async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, Cancel
182203
}
183204

184205
var ret = false;
206+
var reconnect = true;
185207
try
186208
{
187209
var sendToken = token;
@@ -192,15 +214,12 @@ public virtual async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, Cancel
192214
sendToken = CancellationTokenSource.CreateLinkedTokenSource(token, sendTokenSource.Token).Token;
193215
}
194216
ret = await SocketClientProvider.SendAsync(data, sendToken);
195-
if (ret == false)
196-
{
197-
HandleDisconnection();
198-
}
199217
}
200218
catch (OperationCanceledException ex)
201219
{
202220
if (token.IsCancellationRequested)
203221
{
222+
reconnect = false;
204223
Log(LogLevel.Warning, ex, $"TCP Socket send operation was canceled from {_localEndPoint} to {_remoteEndPoint}");
205224
}
206225
else
@@ -211,14 +230,18 @@ public virtual async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, Cancel
211230
catch (Exception ex)
212231
{
213232
Log(LogLevel.Error, ex, $"TCP Socket send failed from {_localEndPoint} to {_remoteEndPoint}");
214-
215-
HandleDisconnection();
216233
}
217234

218235
if (options.EnableLog)
219236
{
220237
Log(LogLevel.Information, null, $"Sending data from {_localEndPoint} to {_remoteEndPoint}, Data Length: {data.Length} Data Content: {BitConverter.ToString(data.ToArray())} Result: {ret}");
221238
}
239+
240+
if (!ret && reconnect)
241+
{
242+
// 如果发送失败并且需要重连则尝试重连
243+
Reconnect();
244+
}
222245
return ret;
223246
}
224247

@@ -244,7 +267,7 @@ public virtual async ValueTask<Memory<byte>> ReceiveAsync(CancellationToken toke
244267
var len = await ReceiveCoreAsync(SocketClientProvider, buffer, token);
245268
if (len == 0)
246269
{
247-
HandleDisconnection();
270+
Reconnect();
248271
}
249272
return buffer[..len];
250273
}
@@ -270,11 +293,12 @@ private async ValueTask AutoReceiveAsync()
270293
}
271294
}
272295

273-
HandleDisconnection();
296+
Reconnect();
274297
}
275298

276299
private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memory<byte> buffer, CancellationToken token)
277300
{
301+
var reconnect = true;
278302
var len = 0;
279303
try
280304
{
@@ -292,8 +316,6 @@ private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memo
292316
// 远端主机关闭链路
293317
Log(LogLevel.Information, null, $"TCP Socket {_localEndPoint} received 0 data closed by {_remoteEndPoint}");
294318
buffer = Memory<byte>.Empty;
295-
296-
HandleDisconnection();
297319
}
298320
else
299321
{
@@ -311,6 +333,7 @@ private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memo
311333
if (token.IsCancellationRequested)
312334
{
313335
Log(LogLevel.Warning, ex, $"TCP Socket receive operation canceled from {_localEndPoint} to {_remoteEndPoint}");
336+
reconnect = false;
314337
}
315338
else
316339
{
@@ -320,14 +343,18 @@ private async ValueTask<int> ReceiveCoreAsync(ISocketClientProvider client, Memo
320343
catch (Exception ex)
321344
{
322345
Log(LogLevel.Error, ex, $"TCP Socket receive failed from {_localEndPoint} to {_remoteEndPoint}");
323-
324-
HandleDisconnection();
325346
}
326347

327348
if (options.EnableLog)
328349
{
329350
Log(LogLevel.Information, null, $"Receiving data from {_localEndPoint} to {_remoteEndPoint}, Data Length: {len} Data Content: {BitConverter.ToString(buffer.ToArray())}");
330351
}
352+
353+
if (len == 0 && reconnect)
354+
{
355+
// 如果接收数据长度为 0 并且需要重连则尝试重连
356+
Reconnect();
357+
}
331358
return len;
332359
}
333360

src/BootstrapBlazor/Services/ThrottleDispatcher.cs

Lines changed: 23 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,13 @@ namespace BootstrapBlazor.Components;
1010
/// </summary>
1111
public class ThrottleDispatcher(ThrottleOptions options)
1212
{
13-
private readonly object _locker = new();
14-
private Task? _lastTask;
1513
private DateTime? _invokeTime;
16-
private bool _busy;
1714

1815
/// <summary>
1916
/// 判断是否等待方法
2017
/// </summary>
2118
/// <returns></returns>
22-
protected virtual bool ShouldWait() => _busy || _invokeTime.HasValue && (DateTime.UtcNow - _invokeTime.Value) < options.Interval;
19+
protected virtual bool ShouldWait() => _invokeTime.HasValue && (DateTime.UtcNow - _invokeTime.Value) < options.Interval;
2320

2421
/// <summary>
2522
/// 异步限流方法
@@ -32,83 +29,53 @@ public class ThrottleDispatcher(ThrottleOptions options)
3229
/// 同步限流方法
3330
/// </summary>
3431
/// <param name="action">同步回调方法</param>
35-
/// <param name="cancellationToken">取消令牌</param>
36-
public void Throttle(Action action, CancellationToken cancellationToken = default)
32+
/// <param name="token">取消令牌</param>
33+
public void Throttle(Action action, CancellationToken token = default)
3734
{
3835
var task = InternalThrottleAsync(() => Task.Run(() =>
3936
{
4037
action();
4138
return Task.CompletedTask;
42-
}, cancellationToken), cancellationToken);
43-
Wait();
44-
return;
45-
46-
[ExcludeFromCodeCoverage]
47-
void Wait()
39+
}, CancellationToken.None), token);
40+
try
4841
{
49-
try
50-
{
51-
task.Wait(cancellationToken);
52-
}
53-
catch (AggregateException ex)
54-
{
55-
if (ex.InnerException is not null)
56-
{
57-
throw ex.InnerException;
58-
}
59-
}
60-
catch (Exception)
61-
{
62-
throw;
63-
}
42+
task.Wait(token);
43+
}
44+
catch (Exception)
45+
{
46+
throw;
6447
}
48+
return;
6549
}
6650

67-
/// <summary>
68-
/// 任务实例
69-
/// </summary>
70-
protected Task LastTask => _lastTask ?? Task.CompletedTask;
71-
7251
/// <summary>
7352
/// 限流异步方法
7453
/// </summary>
7554
/// <param name="function">异步回调方法</param>
7655
/// <param name="cancellationToken">取消令牌</param>
77-
private Task InternalThrottleAsync(Func<Task> function, CancellationToken cancellationToken = default)
56+
private async Task InternalThrottleAsync(Func<Task> function, CancellationToken cancellationToken = default)
7857
{
7958
if (ShouldWait())
8059
{
81-
return LastTask;
60+
return;
8261
}
8362

84-
lock (_locker)
63+
_invokeTime = DateTime.UtcNow;
64+
65+
try
8566
{
86-
if (ShouldWait())
67+
await function();
68+
if (options.DelayAfterExecution)
8769
{
88-
return LastTask;
70+
_invokeTime = DateTime.UtcNow;
8971
}
90-
91-
_busy = true;
92-
_invokeTime = DateTime.UtcNow;
93-
_lastTask = function();
94-
_lastTask.ContinueWith(_ =>
95-
{
96-
if (options.DelayAfterExecution)
97-
{
98-
_invokeTime = DateTime.UtcNow;
99-
}
100-
_busy = false;
101-
}, cancellationToken);
102-
72+
}
73+
catch
74+
{
10375
if (options.ResetIntervalOnException)
10476
{
105-
_lastTask.ContinueWith((_, _) =>
106-
{
107-
_lastTask = null;
108-
_invokeTime = null;
109-
}, cancellationToken, TaskContinuationOptions.OnlyOnFaulted);
77+
_invokeTime = null;
11078
}
111-
return LastTask;
11279
}
11380
}
11481
}

0 commit comments

Comments
 (0)