Skip to content

Commit 2db7aa6

Browse files
committed
feat: 增加接收数据逻辑
1 parent 486349a commit 2db7aa6

File tree

1 file changed

+52
-63
lines changed

1 file changed

+52
-63
lines changed

src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs

Lines changed: 52 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken
6969

7070
if (IsAutoReceive)
7171
{
72-
// 开始接收数据
73-
_ = Task.Run(ReceiveAsync, token);
72+
_ = Task.Run(AutoReceiveAsync);
7473
}
7574
ret = true;
7675
}
@@ -140,24 +139,57 @@ public async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationTo
140139

141140
public async ValueTask<Memory<byte>> ReceiveAsync(CancellationToken token = default)
142141
{
142+
if (_client == null || !_client.Connected)
143+
{
144+
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
145+
}
146+
143147
if (IsAutoReceive)
144148
{
145149
throw new InvalidOperationException("Cannot call ReceiveAsync when IsAutoReceive is true. Use the auto-receive mechanism instead.");
146150
}
147151

148-
if (_client == null || !_client.Connected)
152+
using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
153+
var buffer = block.Memory;
154+
var len = await ReceiveCoreAsync(_client, buffer, token);
155+
return buffer[0..len];
156+
}
157+
158+
private async ValueTask AutoReceiveAsync()
159+
{
160+
_receiveCancellationTokenSource ??= new();
161+
while (_receiveCancellationTokenSource is { IsCancellationRequested: false })
149162
{
150-
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
163+
if (_client is not { Connected: true })
164+
{
165+
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
166+
}
167+
168+
using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
169+
var buffer = block.Memory;
170+
var len = await ReceiveCoreAsync(_client, buffer, _receiveCancellationTokenSource.Token);
171+
if (len == 0)
172+
{
173+
break;
174+
}
151175
}
176+
}
152177

153-
var ret = Memory<byte>.Empty;
178+
private async ValueTask<int> ReceiveCoreAsync(TcpClient client, Memory<byte> buffer, CancellationToken token)
179+
{
180+
var len = 0;
154181
try
155182
{
156-
_receiveCancellationTokenSource ??= new();
157-
using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
158-
var buffer = block.Memory;
159-
var stream = _client.GetStream();
160-
var len = await stream.ReadAsync(buffer, _receiveCancellationTokenSource.Token);
183+
var stream = client.GetStream();
184+
185+
var receiveToken = token;
186+
if (ReceiveTimeout > 0)
187+
{
188+
// 设置接收超时时间
189+
var receiveTokenSource = new CancellationTokenSource(ReceiveTimeout);
190+
receiveToken = CancellationTokenSource.CreateLinkedTokenSource(receiveToken, receiveTokenSource.Token).Token;
191+
}
192+
len = await stream.ReadAsync(buffer, receiveToken);
161193
if (len == 0)
162194
{
163195
// 远端主机关闭链路
@@ -174,69 +206,26 @@ public async ValueTask<Memory<byte>> ReceiveAsync(CancellationToken token = defa
174206

175207
if (_dataPackageHandler != null)
176208
{
177-
await _dataPackageHandler.ReceiveAsync(buffer);
209+
await _dataPackageHandler.ReceiveAsync(buffer, receiveToken);
178210
}
179-
180-
ret = buffer;
181211
}
182212
}
183213
catch (OperationCanceledException ex)
184214
{
185-
Logger.LogWarning(ex, "TCP Socket receive operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
186-
}
187-
catch (Exception ex)
188-
{
189-
Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
190-
}
191-
return ret;
192-
}
193-
194-
private async ValueTask ReceiveAsync()
195-
{
196-
_receiveCancellationTokenSource ??= new();
197-
while (_receiveCancellationTokenSource is { IsCancellationRequested: false })
198-
{
199-
if (_client is not { Connected: true })
200-
{
201-
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
202-
}
203-
204-
try
205-
{
206-
using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
207-
var buffer = block.Memory;
208-
var stream = _client.GetStream();
209-
var len = await stream.ReadAsync(buffer, _receiveCancellationTokenSource.Token);
210-
if (len == 0)
211-
{
212-
// 远端主机关闭链路
213-
Logger.LogInformation("TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
214-
break;
215-
}
216-
else
217-
{
218-
buffer = buffer[..len];
219-
220-
if (ReceivedCallBack != null)
221-
{
222-
await ReceivedCallBack(buffer);
223-
}
224-
225-
if (_dataPackageHandler != null)
226-
{
227-
await _dataPackageHandler.ReceiveAsync(buffer);
228-
}
229-
}
230-
}
231-
catch (OperationCanceledException ex)
215+
if (token.IsCancellationRequested)
232216
{
233-
Logger.LogWarning(ex, "TCP Socket receive operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
217+
Logger.LogWarning(ex, "TCP Socket receive operation canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
234218
}
235-
catch (Exception ex)
219+
else
236220
{
237-
Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
221+
Logger.LogWarning(ex, "TCP Socket receive operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
238222
}
239223
}
224+
catch (Exception ex)
225+
{
226+
Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
227+
}
228+
return len;
240229
}
241230

242231
public void Close()

0 commit comments

Comments
 (0)