diff --git a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DataPackageHandlerBase.cs b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DataPackageHandlerBase.cs index 3cb6c4500e5..f4d41cecacc 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DataPackageHandlerBase.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DataPackageHandlerBase.cs @@ -16,33 +16,28 @@ public abstract class DataPackageHandlerBase : IDataPackageHandler private Memory _lastReceiveBuffer = Memory.Empty; /// - /// Gets or sets the callback function to handle received data. + /// /// - /// The callback function should be designed to handle the received data efficiently and - /// asynchronously. Ensure that the implementation does not block or perform long-running operations, as this may - /// impact performance. public Func, ValueTask>? ReceivedCallBack { get; set; } /// - /// Sends the specified data asynchronously to the target destination. + /// /// - /// The method performs an asynchronous operation to send the provided data. The caller must - /// ensure that the data is valid and non-empty. The returned memory block may contain a response or acknowledgment - /// depending on the implementation of the target destination. - /// The data to be sent, represented as a block of memory. - /// A task that represents the asynchronous operation. The task result contains a of representing the response or acknowledgment received from the target destination. - public virtual ValueTask> SendAsync(ReadOnlyMemory data) + /// + /// + /// + public virtual ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) { return ValueTask.FromResult(data); } /// - /// Processes the received data asynchronously. + /// /// - /// A memory buffer containing the data to be processed. The buffer must not be empty. - /// A task that represents the asynchronous operation. - public virtual ValueTask ReceiveAsync(ReadOnlyMemory data) + /// + /// + /// + public virtual ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default) { return ValueTask.CompletedTask; } diff --git a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs index 0de2d040195..1eee4fdf3ed 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs @@ -50,8 +50,9 @@ public DelimiterDataPackageHandler(byte[] delimiter) /// /// /// + /// /// - public override async ValueTask ReceiveAsync(ReadOnlyMemory data) + public override async ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default) { data = ConcatBuffer(data); diff --git a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/FixLengthDataPackageHandler.cs b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/FixLengthDataPackageHandler.cs index e20458edde2..c669f845f11 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/FixLengthDataPackageHandler.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/FixLengthDataPackageHandler.cs @@ -22,8 +22,9 @@ public class FixLengthDataPackageHandler(int length) : DataPackageHandlerBase /// /// /// + /// /// - public override async ValueTask ReceiveAsync(ReadOnlyMemory data) + public override async ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default) { while (data.Length > 0) { diff --git a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/IDataPackageHandler.cs b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/IDataPackageHandler.cs index b6b5eb2e10a..ea8da06cd76 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/IDataPackageHandler.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/IDataPackageHandler.cs @@ -25,9 +25,10 @@ public interface IDataPackageHandler /// ensure that the data is valid and non-empty. The returned memory block may contain a response or acknowledgment /// depending on the implementation of the target destination. /// The data to be sent, represented as a block of memory. + /// An optional to observe while waiting for the operation to complete. /// A task that represents the asynchronous operation. The task result contains a of representing the response or acknowledgment received from the target destination. - ValueTask> SendAsync(ReadOnlyMemory data); + ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default); /// /// Asynchronously receives data from a source and writes it into the provided memory buffer. @@ -35,7 +36,8 @@ public interface IDataPackageHandler /// This method does not guarantee that the entire buffer will be filled. The number of bytes /// written depends on the availability of data. /// The memory buffer to store the received data. The buffer must be writable and have sufficient capacity. + /// A cancellation token that can be used to cancel the operation. The default value is . /// A task that represents the asynchronous operation. The task result contains the number of bytes written to the /// buffer. Returns 0 if the end of the data stream is reached. - ValueTask ReceiveAsync(ReadOnlyMemory data); + ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default); } diff --git a/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs b/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs index c13a22f20b5..d93712bb46d 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs @@ -12,7 +12,7 @@ namespace BootstrapBlazor.Components; [UnsupportedOSPlatform("browser")] -sealed class DefaultTcpSocketClient(IPEndPoint endPoint) : ITcpSocketClient +sealed class DefaultTcpSocketClient(IPEndPoint localEndPoint) : ITcpSocketClient { private TcpClient? _client; private IDataPackageHandler? _dataPackageHandler; @@ -21,15 +21,23 @@ sealed class DefaultTcpSocketClient(IPEndPoint endPoint) : ITcpSocketClient public bool IsConnected => _client?.Connected ?? false; - public IPEndPoint LocalEndPoint { get; set; } = endPoint; + public IPEndPoint? LocalEndPoint { get; set; } [NotNull] public ILogger? Logger { get; set; } - public int ReceiveBufferSize { get; set; } = 1024 * 10; + public int ReceiveBufferSize { get; set; } = 1024 * 64; + + public bool IsAutoReceive { get; set; } = true; public Func, ValueTask>? ReceivedCallBack { get; set; } + public int ConnectTimeout { get; set; } + + public int SendTimeout { get; set; } + + public int ReceiveTimeout { get; set; } + public void SetDataHandler(IDataPackageHandler handler) { _dataPackageHandler = handler; @@ -44,19 +52,37 @@ public async ValueTask ConnectAsync(IPEndPoint endPoint, CancellationToken Close(); // 创建新的 TcpClient 实例 - _client ??= new TcpClient(LocalEndPoint); - await _client.ConnectAsync(endPoint, token); + _client ??= new TcpClient(localEndPoint); - // 开始接收数据 - _ = Task.Run(ReceiveAsync, token); + var connectionToken = token; + if (ConnectTimeout > 0) + { + // 设置连接超时时间 + var connectTokenSource = new CancellationTokenSource(ConnectTimeout); + connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token; + } + await _client.ConnectAsync(endPoint, connectionToken); + // 设置本地以及远端端点信息 LocalEndPoint = (IPEndPoint)_client.Client.LocalEndPoint!; _remoteEndPoint = endPoint; + + if (IsAutoReceive) + { + _ = Task.Run(AutoReceiveAsync); + } ret = true; } catch (OperationCanceledException ex) { - Logger.LogWarning(ex, "TCP Socket connect operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint); + if (token.IsCancellationRequested) + { + Logger.LogWarning(ex, "TCP Socket connect operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint); + } + else + { + Logger.LogWarning(ex, "TCP Socket connect operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint); + } } catch (Exception ex) { @@ -75,17 +101,34 @@ public async ValueTask SendAsync(ReadOnlyMemory data, CancellationTo var ret = false; try { + var stream = _client.GetStream(); + + var sendToken = token; + if (SendTimeout > 0) + { + // 设置发送超时时间 + var sendTokenSource = new CancellationTokenSource(SendTimeout); + sendToken = CancellationTokenSource.CreateLinkedTokenSource(token, sendTokenSource.Token).Token; + } + if (_dataPackageHandler != null) { - data = await _dataPackageHandler.SendAsync(data); + data = await _dataPackageHandler.SendAsync(data, sendToken); } - var stream = _client.GetStream(); - await stream.WriteAsync(data, token); + + await stream.WriteAsync(data, sendToken); ret = true; } catch (OperationCanceledException ex) { - Logger.LogWarning(ex, "TCP Socket send operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); + if (token.IsCancellationRequested) + { + Logger.LogWarning(ex, "TCP Socket send operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); + } + else + { + Logger.LogWarning(ex, "TCP Socket send operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); + } } catch (Exception ex) { @@ -94,7 +137,25 @@ public async ValueTask SendAsync(ReadOnlyMemory data, CancellationTo return ret; } - private async ValueTask ReceiveAsync() + public async ValueTask> ReceiveAsync(CancellationToken token = default) + { + if (_client == null || !_client.Connected) + { + throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}"); + } + + if (IsAutoReceive) + { + throw new InvalidOperationException("Cannot call ReceiveAsync when IsAutoReceive is true. Use the auto-receive mechanism instead."); + } + + using var block = MemoryPool.Shared.Rent(ReceiveBufferSize); + var buffer = block.Memory; + var len = await ReceiveCoreAsync(_client, buffer, token); + return buffer[0..len]; + } + + private async ValueTask AutoReceiveAsync() { _receiveCancellationTokenSource ??= new(); while (_receiveCancellationTokenSource is { IsCancellationRequested: false }) @@ -104,42 +165,67 @@ private async ValueTask ReceiveAsync() throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}"); } - try + using var block = MemoryPool.Shared.Rent(ReceiveBufferSize); + var buffer = block.Memory; + var len = await ReceiveCoreAsync(_client, buffer, _receiveCancellationTokenSource.Token); + if (len == 0) + { + break; + } + } + } + + private async ValueTask ReceiveCoreAsync(TcpClient client, Memory buffer, CancellationToken token) + { + var len = 0; + try + { + var stream = client.GetStream(); + + var receiveToken = token; + if (ReceiveTimeout > 0) { - using var block = MemoryPool.Shared.Rent(ReceiveBufferSize); - var buffer = block.Memory; - var stream = _client.GetStream(); - var len = await stream.ReadAsync(buffer, _receiveCancellationTokenSource.Token); - if (len == 0) + // 设置接收超时时间 + var receiveTokenSource = new CancellationTokenSource(ReceiveTimeout); + receiveToken = CancellationTokenSource.CreateLinkedTokenSource(receiveToken, receiveTokenSource.Token).Token; + } + len = await stream.ReadAsync(buffer, receiveToken); + if (len == 0) + { + // 远端主机关闭链路 + Logger.LogInformation("TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); + } + else + { + buffer = buffer[..len]; + + if (ReceivedCallBack != null) { - // 远端主机关闭链路 - Logger.LogInformation("TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); - break; + await ReceivedCallBack(buffer); } - else - { - buffer = buffer[..len]; - - if (ReceivedCallBack != null) - { - await ReceivedCallBack(buffer); - } - if (_dataPackageHandler != null) - { - await _dataPackageHandler.ReceiveAsync(buffer); - } + if (_dataPackageHandler != null) + { + await _dataPackageHandler.ReceiveAsync(buffer, receiveToken); } } - catch (OperationCanceledException ex) + } + catch (OperationCanceledException ex) + { + if (token.IsCancellationRequested) { - Logger.LogWarning(ex, "TCP Socket receive operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); + Logger.LogWarning(ex, "TCP Socket receive operation canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); } - catch (Exception ex) + else { - Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); + Logger.LogWarning(ex, "TCP Socket receive operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); } } + catch (Exception ex) + { + Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); + } + return len; } public void Close() @@ -151,10 +237,11 @@ private void Dispose(bool disposing) { if (disposing) { + LocalEndPoint = null; _remoteEndPoint = null; // 取消接收数据的任务 - if (_receiveCancellationTokenSource is not null) + if (_receiveCancellationTokenSource != null) { _receiveCancellationTokenSource.Cancel(); _receiveCancellationTokenSource.Dispose(); diff --git a/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs b/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs index 12a46ee8299..695db43849c 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs @@ -18,17 +18,42 @@ public interface ITcpSocketClient : IDisposable int ReceiveBufferSize { get; set; } /// - /// Gets a value indicating whether the system is currently connected. + /// Gets a value indicating whether the system is currently connected. Default is false. /// bool IsConnected { get; } + /// + /// Gets or sets a value indicating whether automatic receiving data is enabled. Default is true. + /// + bool IsAutoReceive { get; set; } + + /// + /// Gets or sets the timeout duration, in milliseconds, for establishing a connection. + /// + int ConnectTimeout { get; set; } + + /// + /// Gets or sets the duration, in milliseconds, to wait for a send operation to complete before timing out. + /// + /// If the send operation does not complete within the specified timeout period, an exception may + /// be thrown. + int SendTimeout { get; set; } + + /// + /// Gets or sets the amount of time, in milliseconds, that the receiver will wait for a response before timing out. + /// + /// Use this property to configure the maximum wait time for receiving a response. Setting an + /// appropriate timeout can help prevent indefinite blocking in scenarios where responses may be delayed or + /// unavailable. + int ReceiveTimeout { get; set; } + /// /// Gets the local network endpoint that the socket is bound to. /// /// This property provides information about the local endpoint of the socket, which is typically /// used to identify the local address and port being used for communication. If the socket is not bound to a /// specific local endpoint, this property may return . - IPEndPoint LocalEndPoint { get; } + IPEndPoint? LocalEndPoint { get; } /// /// Gets or sets the callback function to handle received data. @@ -68,6 +93,17 @@ public interface ITcpSocketClient : IDisposable /// sent successfully; otherwise, . ValueTask SendAsync(ReadOnlyMemory data, CancellationToken token = default); + /// + /// Asynchronously receives a block of data from the underlying source. + /// + /// This method is non-blocking and completes when data is available or the operation is + /// canceled. If the operation is canceled, the returned task will be in a faulted state with a . + /// A cancellation token that can be used to cancel the operation. The default value is . + /// A containing a of bytes representing the received data. + /// The returned memory may be empty if no data is available. + ValueTask> ReceiveAsync(CancellationToken token = default); + /// /// Closes the current connection or resource, releasing any associated resources. /// diff --git a/test/UnitTest/Services/TcpSocketFactoryTest.cs b/test/UnitTest/Services/TcpSocketFactoryTest.cs index d0567beb6eb..a9fade5f4af 100644 --- a/test/UnitTest/Services/TcpSocketFactoryTest.cs +++ b/test/UnitTest/Services/TcpSocketFactoryTest.cs @@ -44,6 +44,16 @@ public void GetOrCreate_Ok() factory.Dispose(); } + [Fact] + public async Task ConnectAsync_Timeout() + { + var client = CreateClient(); + client.ConnectTimeout = 1000; + + var connect = await client.ConnectAsync("localhost", 9999); + Assert.False(connect); + } + [Fact] public async Task ConnectAsync_Cancel() { @@ -66,6 +76,23 @@ public async Task ConnectAsync_Failed() Assert.False(connect); } + [Fact] + public async Task Send_Timeout() + { + var port = 8887; + var server = StartTcpServer(port, MockSplitPackageAsync); + + var client = CreateClient(); + client.SendTimeout = 100; + client.SetDataHandler(new MockSendTimeoutHandler()); + + await client.ConnectAsync("localhost", port); + + var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); + var result = await client.SendAsync(data); + Assert.False(result); + } + [Fact] public async Task SendAsync_Error() { @@ -120,13 +147,103 @@ public async Task SendAsync_Cancel() StopTcpServer(server); } + [Fact] + public async Task ReceiveAsync_Timeout() + { + var port = 8888; + var server = StartTcpServer(port, MockSplitPackageAsync); + + var client = CreateClient(); + client.ReceiveTimeout = 100; + client.SetDataHandler(new MockReceiveTimeoutHandler()); + + await client.ConnectAsync("localhost", port); + + var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); + await client.SendAsync(data); + await Task.Delay(220); // 等待接收超时 + } + + [Fact] + public async Task ReceiveAsync_Cancel() + { + var port = 8889; + var server = StartTcpServer(port, MockSplitPackageAsync); + + var client = CreateClient(); + client.SetDataHandler(new MockReceiveTimeoutHandler()); + + await client.ConnectAsync("localhost", port); + + var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); + await client.SendAsync(data); + + // 通过反射取消令牌 + var fieldInfo = client.GetType().GetField("_receiveCancellationTokenSource", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + Assert.NotNull(fieldInfo); + var tokenSource = fieldInfo.GetValue(client) as CancellationTokenSource; + Assert.NotNull(tokenSource); + tokenSource.Cancel(); + await Task.Delay(50); + } + + [Fact] + public async Task ReceiveAsync_InvalidOperationException() + { + var port = 8890; + var server = StartTcpServer(port, MockSplitPackageAsync); + + var client = CreateClient(); + + //未连接 + var ex = await Assert.ThrowsAsync(async () => await client.ReceiveAsync()); + Assert.NotNull(ex); + + // 反射给 _client 赋值但是未连接 + var fieldInfo = client.GetType().GetField("_client", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + Assert.NotNull(fieldInfo); + fieldInfo.SetValue(client, new TcpClient()); + ex = null; + ex = await Assert.ThrowsAsync(async () => await client.ReceiveAsync()); + Assert.NotNull(ex); + + await client.ConnectAsync("localhost", port); + ex = null; + ex = await Assert.ThrowsAsync(async () => await client.ReceiveAsync()); + + client.Close(); + client.IsAutoReceive = false; + var connected = await client.ConnectAsync("localhost", port); + Assert.True(connected); + + var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); + await client.SendAsync(data); + var payload = await client.ReceiveAsync(); + Assert.Equal(payload.ToArray(), [1, 2, 3, 4, 5]); + } + + [Fact] + public async Task ReceiveAsync_Ok() + { + var port = 8891; + var server = StartTcpServer(port, MockSplitPackageAsync); + + var client = CreateClient(); + client.IsAutoReceive = false; + await client.ConnectAsync("localhost", port); + var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); + await client.SendAsync(data); + var payload = await client.ReceiveAsync(); + Assert.Equal(payload.ToArray(), [1, 2, 3, 4, 5]); + } + [Fact] public async Task ReceiveAsync_Error() { var client = CreateClient(); // 测试未建立连接前调用 ReceiveAsync 方法报异常逻辑 - var methodInfo = client.GetType().GetMethod("ReceiveAsync", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + var methodInfo = client.GetType().GetMethod("AutoReceiveAsync", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); Assert.NotNull(methodInfo); var task = (ValueTask)methodInfo.Invoke(client, null)!; @@ -136,7 +253,7 @@ public async Task ReceiveAsync_Error() var port = 8882; var server = StartTcpServer(port, MockSplitPackageAsync); - Assert.Equal(1024 * 10, client.ReceiveBufferSize); + Assert.Equal(1024 * 64, client.ReceiveBufferSize); client.ReceiveBufferSize = 1024 * 20; Assert.Equal(1024 * 20, client.ReceiveBufferSize); @@ -489,27 +606,47 @@ class MockSendErrorHandler : DataPackageHandlerBase { public ITcpSocketClient? Socket { get; set; } - public override async ValueTask> SendAsync(ReadOnlyMemory data) + public override async ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) { Socket?.Close(); - await Task.Delay(10); + await Task.Delay(10, token); return data; } } class MockReceiveErrorHandler : DataPackageHandlerBase { - public override ValueTask> SendAsync(ReadOnlyMemory data) + public override ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) { return ValueTask.FromResult(data); } - public override async ValueTask ReceiveAsync(ReadOnlyMemory data) + public override async ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default) { - await base.ReceiveAsync(data); + await base.ReceiveAsync(data, token); // 模拟接收数据时报错 throw new InvalidOperationException("Test Error"); } } + + class MockSendTimeoutHandler : DataPackageHandlerBase + { + public override async ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) + { + // 模拟发送超时 + await Task.Delay(200, token); + return data; + } + } + + class MockReceiveTimeoutHandler : DataPackageHandlerBase + { + public override async ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default) + { + // 模拟接收超时 + await Task.Delay(200, token); + await base.ReceiveAsync(data, token); + } + } }