Skip to content

Commit 07d6375

Browse files
authored
feat(ITcpSocketFactory): add ITcpSocketFactory service (#6254)
* feat: 增加扩展方法判断当前环境是否为 IsWasm * feat: 增加 ITcpSocketFactory 服务 * refactor: 更新 ConnectAsync 接口 * test: 更新单元测试 * refactor: 增加 ITcpSocketClient 服务 * test: 增加单元测试 * refactor: 重构日志实例逻辑 * refactor: 精简代码 * refactor: 增加取消记录日志逻辑 * refactor: 增加 Close 方法 * test: 增加实例单元测试 * feat: 增加 IDataPackageAdapter 接口 * refactor: 增加设置本地节点逻辑 * refactor: 增加数据处理器功能 * refactor: 增加 virtual 关键字 * test: 增加单元测试 * test: 更新单元测试 * feat: 增加数据处理类 * refactor: 增加连接后自动接收逻辑 * test: 增加单元测试 * refactor: 增加接收任务取消逻辑 * refactor: 精简代码逻辑 * test: 更新单元测试 * refactor: 实现拆包粘包处理逻辑 * refactor: 优化代码 Logger 不为空 * test: 更新单元测试 * test: 增加 SendAsync 单元测试 * test: 增加 Factory 单元测试 * test: 精简单元测试 * test: 增加 IsWasm 单元测试 * refactor: 接收方法内异常改为日志 * refactor: 防止缓存区被释放 * refactor: 精简代码提高可读性 * Revert "refactor: 接收方法内异常改为日志" This reverts commit 44e4bd3. * refactor: 更正方法名称为 HandleStickyPackage * refactor: 更改申请缓存区代码 * refactor: 重构拆包方法名称
1 parent 48a7777 commit 07d6375

File tree

11 files changed

+1047
-0
lines changed

11 files changed

+1047
-0
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the Apache 2.0 License
3+
// See the LICENSE file in the project root for more information.
4+
// Maintainer: Argo Zhang([email protected]) Website: https://www.blazor.zone
5+
6+
using Microsoft.Extensions.Hosting;
7+
8+
namespace BootstrapBlazor.Components;
9+
10+
/// <summary>
11+
/// <see cref="IHostEnvironment"/> 扩展方法"
12+
/// </summary>
13+
public static class HostEnvironmentExtensions
14+
{
15+
/// <summary>
16+
/// 当前程序是否为 WebAssembly 环境
17+
/// </summary>
18+
/// <param name="hostEnvironment"></param>
19+
/// <returns></returns>
20+
public static bool IsWasm(this IHostEnvironment hostEnvironment) => hostEnvironment is MockWasmHostEnvironment;
21+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the Apache 2.0 License
3+
// See the LICENSE file in the project root for more information.
4+
// Maintainer: Argo Zhang([email protected]) Website: https://www.blazor.zone
5+
6+
namespace BootstrapBlazor.Components;
7+
8+
/// <summary>
9+
/// Provides a base implementation for handling data packages in a communication system.
10+
/// </summary>
11+
/// <remarks>This abstract class defines the core contract for receiving and sending data packages. Derived
12+
/// classes should override and extend its functionality to implement specific data handling logic. The default
13+
/// implementation simply returns the provided data.</remarks>
14+
public abstract class DataPackageHandlerBase : IDataPackageHandler
15+
{
16+
private Memory<byte> _lastReceiveBuffer = Memory<byte>.Empty;
17+
18+
/// <summary>
19+
/// 当接收数据处理完成后,回调该函数执行接收
20+
/// </summary>
21+
public Func<Memory<byte>, Task>? ReceivedCallBack { get; set; }
22+
23+
/// <summary>
24+
/// Sends the specified data asynchronously to the target destination.
25+
/// </summary>
26+
/// <remarks>The method performs an asynchronous operation to send the provided data. The caller must
27+
/// ensure that the data is valid and non-empty. The returned memory block may contain a response or acknowledgment
28+
/// depending on the implementation of the target destination.</remarks>
29+
/// <param name="data">The data to be sent, represented as a block of memory.</param>
30+
/// <returns>A task that represents the asynchronous operation. The task result contains a <see cref="Memory{T}"/> of <see
31+
/// cref="byte"/> representing the response or acknowledgment received from the target destination.</returns>
32+
public virtual Task<Memory<byte>> SendAsync(Memory<byte> data)
33+
{
34+
return Task.FromResult(data);
35+
}
36+
37+
/// <summary>
38+
/// Processes the received data asynchronously.
39+
/// </summary>
40+
/// <param name="data">A memory buffer containing the data to be processed. The buffer must not be empty.</param>
41+
/// <returns>A task that represents the asynchronous operation.</returns>
42+
public virtual Task ReceiveAsync(Memory<byte> data)
43+
{
44+
return Task.CompletedTask;
45+
}
46+
47+
/// <summary>
48+
/// Handles the processing of a sticky package by adjusting the provided buffer and length.
49+
/// </summary>
50+
/// <remarks>This method processes the portion of the buffer beyond the specified length and updates the
51+
/// internal state accordingly. The caller must ensure that the <paramref name="buffer"/> contains sufficient data
52+
/// for the specified <paramref name="length"/>.</remarks>
53+
/// <param name="buffer">The memory buffer containing the data to process.</param>
54+
/// <param name="length">The length of the valid data within the buffer.</param>
55+
protected void SlicePackage(Memory<byte> buffer, int length)
56+
{
57+
_lastReceiveBuffer = buffer[length..].ToArray().AsMemory();
58+
}
59+
60+
/// <summary>
61+
/// Concatenates the provided buffer with any previously stored data and returns the combined result.
62+
/// </summary>
63+
/// <remarks>This method combines the provided buffer with any data stored in the internal buffer. After
64+
/// concatenation, the internal buffer is cleared. The returned memory block is allocated from a shared memory pool
65+
/// and should be used promptly to avoid holding onto pooled resources.</remarks>
66+
/// <param name="buffer">The buffer to concatenate with the previously stored data. Must not be empty.</param>
67+
/// <returns>A <see cref="Memory{T}"/> instance containing the concatenated data. If no previously stored data exists, the
68+
/// method returns the input <paramref name="buffer"/>.</returns>
69+
protected Memory<byte> ConcatBuffer(Memory<byte> buffer)
70+
{
71+
if (_lastReceiveBuffer.IsEmpty)
72+
{
73+
return buffer;
74+
}
75+
76+
// 计算缓存区长度
77+
Memory<byte> merged = new byte[_lastReceiveBuffer.Length + buffer.Length];
78+
_lastReceiveBuffer.CopyTo(merged);
79+
buffer.CopyTo(merged[_lastReceiveBuffer.Length..]);
80+
81+
// Clear the sticky buffer
82+
_lastReceiveBuffer = Memory<byte>.Empty;
83+
return merged;
84+
}
85+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the Apache 2.0 License
3+
// See the LICENSE file in the project root for more information.
4+
// Maintainer: Argo Zhang([email protected]) Website: https://www.blazor.zone
5+
6+
namespace BootstrapBlazor.Components;
7+
8+
/// <summary>
9+
/// Handles fixed-length data packages by processing incoming data of a specified length.
10+
/// </summary>
11+
/// <remarks>This class is designed to handle data packages with a fixed length, as specified during
12+
/// initialization. It extends <see cref="DataPackageHandlerBase"/> and overrides its behavior to process fixed-length
13+
/// data.</remarks>
14+
/// <param name="length">The data package total data length.</param>
15+
public class FixLengthDataPackageHandler(int length) : DataPackageHandlerBase
16+
{
17+
private readonly Memory<byte> _data = new byte[length];
18+
19+
private int _receivedLength;
20+
21+
/// <summary>
22+
/// <inheritdoc/>
23+
/// </summary>
24+
/// <param name="data"></param>
25+
/// <returns></returns>
26+
public override async Task ReceiveAsync(Memory<byte> data)
27+
{
28+
// 处理上次粘包数据
29+
data = ConcatBuffer(data);
30+
31+
// 拷贝数据
32+
var len = length - _receivedLength;
33+
var segment = data.Length > len ? data[..len] : data;
34+
segment.CopyTo(_data[_receivedLength..]);
35+
36+
if (data.Length > len)
37+
{
38+
SlicePackage(data, data.Length - len);
39+
}
40+
41+
// 更新已接收长度
42+
_receivedLength += segment.Length;
43+
44+
// 如果已接收长度等于总长度则触发回调
45+
if (_receivedLength == length)
46+
{
47+
// 重置已接收长度
48+
_receivedLength = 0;
49+
if (ReceivedCallBack != null)
50+
{
51+
await ReceivedCallBack(_data);
52+
}
53+
}
54+
}
55+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the Apache 2.0 License
3+
// See the LICENSE file in the project root for more information.
4+
// Maintainer: Argo Zhang([email protected]) Website: https://www.blazor.zone
5+
6+
namespace BootstrapBlazor.Components;
7+
8+
/// <summary>
9+
/// Defines an interface for adapting data packages to and from a TCP socket connection.
10+
/// </summary>
11+
/// <remarks>Implementations of this interface are responsible for converting raw data received from a TCP socket
12+
/// into structured data packages and vice versa. This allows for custom serialization and deserialization logic
13+
/// tailored to specific application protocols.</remarks>
14+
public interface IDataPackageHandler
15+
{
16+
/// <summary>
17+
/// Gets or sets the callback function to be invoked when data is received asynchronously.
18+
/// </summary>
19+
Func<Memory<byte>, Task>? ReceivedCallBack { get; set; }
20+
21+
/// <summary>
22+
/// Sends the specified data asynchronously to the target destination.
23+
/// </summary>
24+
/// <remarks>The method performs an asynchronous operation to send the provided data. The caller must
25+
/// ensure that the data is valid and non-empty. The returned memory block may contain a response or acknowledgment
26+
/// depending on the implementation of the target destination.</remarks>
27+
/// <param name="data">The data to be sent, represented as a block of memory.</param>
28+
/// <returns>A task that represents the asynchronous operation. The task result contains a <see cref="Memory{T}"/> of <see
29+
/// cref="byte"/> representing the response or acknowledgment received from the target destination.</returns>
30+
Task<Memory<byte>> SendAsync(Memory<byte> data);
31+
32+
/// <summary>
33+
/// Asynchronously receives data from a source and writes it into the provided memory buffer.
34+
/// </summary>
35+
/// <remarks>This method does not guarantee that the entire buffer will be filled. The number of bytes
36+
/// written depends on the availability of data.</remarks>
37+
/// <param name="data">The memory buffer to store the received data. The buffer must be writable and have sufficient capacity.</param>
38+
/// <returns>A task that represents the asynchronous operation. The task result contains the number of bytes written to the
39+
/// buffer. Returns 0 if the end of the data stream is reached.</returns>
40+
Task ReceiveAsync(Memory<byte> data);
41+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the Apache 2.0 License
3+
// See the LICENSE file in the project root for more information.
4+
// Maintainer: Argo Zhang([email protected]) Website: https://www.blazor.zone
5+
6+
using Microsoft.Extensions.Logging;
7+
using System.Buffers;
8+
using System.Net;
9+
using System.Net.Sockets;
10+
using System.Runtime.Versioning;
11+
12+
namespace BootstrapBlazor.Components;
13+
14+
[UnsupportedOSPlatform("browser")]
15+
class DefaultTcpSocketClient : ITcpSocketClient
16+
{
17+
private TcpClient? _client;
18+
private IDataPackageHandler? _dataPackageHandler;
19+
private CancellationTokenSource? _receiveCancellationTokenSource;
20+
private IPEndPoint? _remoteEndPoint;
21+
22+
public bool IsConnected => _client?.Connected ?? false;
23+
24+
public IPEndPoint LocalEndPoint { get; set; }
25+
26+
[NotNull]
27+
public ILogger<DefaultTcpSocketClient>? Logger { get; set; }
28+
29+
public int ReceiveBufferSize { get; set; } = 1024 * 10;
30+
31+
public DefaultTcpSocketClient(string host, int port = 0)
32+
{
33+
LocalEndPoint = new IPEndPoint(GetIPAddress(host), port);
34+
}
35+
36+
private static IPAddress GetIPAddress(string host) => host.Equals("localhost", StringComparison.OrdinalIgnoreCase)
37+
? IPAddress.Loopback
38+
: IPAddress.TryParse(host, out var ip) ? ip : IPAddressByHostName;
39+
40+
[ExcludeFromCodeCoverage]
41+
private static IPAddress IPAddressByHostName => Dns.GetHostAddresses(Dns.GetHostName(), AddressFamily.InterNetwork).FirstOrDefault() ?? IPAddress.Loopback;
42+
43+
public void SetDataHandler(IDataPackageHandler handler)
44+
{
45+
_dataPackageHandler = handler;
46+
}
47+
48+
public Task<bool> ConnectAsync(string host, int port, CancellationToken token = default)
49+
{
50+
var endPoint = new IPEndPoint(GetIPAddress(host), port);
51+
return ConnectAsync(endPoint, token);
52+
}
53+
54+
public async Task<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken token = default)
55+
{
56+
var ret = false;
57+
try
58+
{
59+
// 释放资源
60+
Close();
61+
62+
// 创建新的 TcpClient 实例
63+
_client ??= new TcpClient(LocalEndPoint);
64+
await _client.ConnectAsync(endPoint, token);
65+
66+
// 开始接收数据
67+
_ = Task.Run(ReceiveAsync, token);
68+
69+
LocalEndPoint = (IPEndPoint)_client.Client.LocalEndPoint!;
70+
_remoteEndPoint = endPoint;
71+
ret = true;
72+
}
73+
catch (OperationCanceledException ex)
74+
{
75+
Logger.LogWarning(ex, "TCP Socket connect operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint);
76+
}
77+
catch (Exception ex)
78+
{
79+
Logger.LogError(ex, "TCP Socket connection failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint);
80+
}
81+
return ret;
82+
}
83+
84+
public async Task<bool> SendAsync(Memory<byte> data, CancellationToken token = default)
85+
{
86+
if (_client is not { Connected: true })
87+
{
88+
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
89+
}
90+
91+
var ret = false;
92+
try
93+
{
94+
if (_dataPackageHandler != null)
95+
{
96+
data = await _dataPackageHandler.SendAsync(data);
97+
}
98+
var stream = _client.GetStream();
99+
await stream.WriteAsync(data, token);
100+
ret = true;
101+
}
102+
catch (OperationCanceledException ex)
103+
{
104+
Logger.LogWarning(ex, "TCP Socket send operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
105+
}
106+
catch (Exception ex)
107+
{
108+
Logger.LogError(ex, "TCP Socket send failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
109+
}
110+
return ret;
111+
}
112+
113+
private async Task ReceiveAsync()
114+
{
115+
_receiveCancellationTokenSource ??= new();
116+
while (_receiveCancellationTokenSource is { IsCancellationRequested: false })
117+
{
118+
if (_client is not { Connected: true })
119+
{
120+
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
121+
}
122+
123+
try
124+
{
125+
using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
126+
var buffer = block.Memory;
127+
var stream = _client.GetStream();
128+
var len = await stream.ReadAsync(buffer, _receiveCancellationTokenSource.Token);
129+
if (len == 0)
130+
{
131+
// 远端主机关闭链路
132+
Logger.LogInformation("TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
133+
break;
134+
}
135+
else
136+
{
137+
buffer = buffer[..len];
138+
139+
if (_dataPackageHandler != null)
140+
{
141+
await _dataPackageHandler.ReceiveAsync(buffer);
142+
}
143+
}
144+
}
145+
catch (OperationCanceledException ex)
146+
{
147+
Logger.LogWarning(ex, "TCP Socket receive operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
148+
}
149+
catch (Exception ex)
150+
{
151+
Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
152+
}
153+
}
154+
}
155+
156+
public void Close()
157+
{
158+
Dispose(true);
159+
}
160+
161+
private void Dispose(bool disposing)
162+
{
163+
if (disposing)
164+
{
165+
_remoteEndPoint = null;
166+
167+
// 取消接收数据的任务
168+
if (_receiveCancellationTokenSource is not null)
169+
{
170+
_receiveCancellationTokenSource.Cancel();
171+
_receiveCancellationTokenSource.Dispose();
172+
_receiveCancellationTokenSource = null;
173+
}
174+
175+
// 释放 TcpClient 资源
176+
if (_client != null)
177+
{
178+
_client.Close();
179+
_client = null;
180+
}
181+
}
182+
}
183+
184+
/// <summary>
185+
/// <inheritdoc/>
186+
/// </summary>
187+
public void Dispose()
188+
{
189+
Dispose(true);
190+
GC.SuppressFinalize(this);
191+
}
192+
}

0 commit comments

Comments
 (0)