diff --git a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DataPackageHandlerBase.cs b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DataPackageHandlerBase.cs index f4d41cecacc..89a760fa93a 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DataPackageHandlerBase.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DataPackageHandlerBase.cs @@ -26,21 +26,7 @@ public abstract class DataPackageHandlerBase : IDataPackageHandler /// /// /// - public virtual ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) - { - return ValueTask.FromResult(data); - } - - /// - /// - /// - /// - /// - /// - public virtual ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default) - { - return ValueTask.CompletedTask; - } + public abstract ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default); /// /// Handles the processing of a sticky package by adjusting the provided buffer and length. diff --git a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/IDataPackageHandler.cs b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/IDataPackageHandler.cs index 705bb70f45e..74da06af81b 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/IDataPackageHandler.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/IDataPackageHandler.cs @@ -18,18 +18,6 @@ public interface IDataPackageHandler /// Func, ValueTask>? ReceivedCallBack { get; set; } - /// - /// Sends the specified data asynchronously to the target destination. - /// - /// The method performs an asynchronous operation to send the provided data. The caller must - /// ensure that the data is valid and non-empty. The returned memory block may contain a response or acknowledgment - /// depending on the implementation of the target destination. - /// The data to be sent, represented as a block of memory. - /// An optional to observe while waiting for the operation to complete. - /// A task that represents the asynchronous operation. The task result contains a of representing the response or acknowledgment received from the target destination. - ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default); - /// /// Asynchronously receives data and processes it. /// diff --git a/src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClientProvider.cs b/src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClientProvider.cs index df9da26960e..8307c26ae50 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClientProvider.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClientProvider.cs @@ -50,7 +50,7 @@ public async ValueTask ConnectAsync(IPEndPoint endPoint, CancellationToken public async ValueTask SendAsync(ReadOnlyMemory data, CancellationToken token = default) { var ret = false; - if (_client is { Connected: true }) + if (_client != null) { var stream = _client.GetStream(); await stream.WriteAsync(data, token); diff --git a/src/BootstrapBlazor/Services/TcpSocket/TcpSocketClientBase.cs b/src/BootstrapBlazor/Services/TcpSocket/TcpSocketClientBase.cs index 7dacfc7e435..5329eec3549 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/TcpSocketClientBase.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/TcpSocketClientBase.cs @@ -184,8 +184,9 @@ public virtual async ValueTask> ReceiveAsync(CancellationToken toke private async ValueTask AutoReceiveAsync() { + // 自动接收方法 _receiveCancellationTokenSource ??= new(); - while (_receiveCancellationTokenSource is { IsCancellationRequested: false }) + while (true) { if (SocketClientProvider is not { IsConnected: true }) { @@ -197,6 +198,7 @@ private async ValueTask AutoReceiveAsync() var len = await ReceiveCoreAsync(SocketClientProvider, buffer, _receiveCancellationTokenSource.Token); if (len == 0) { + // 远端关闭或者 DisposeAsync 方法被调用时退出 break; } } diff --git a/test/UnitTest/Services/TcpSocketFactoryTest.cs b/test/UnitTest/Services/TcpSocketFactoryTest.cs index 9fdcffeee53..d38e1d4544a 100644 --- a/test/UnitTest/Services/TcpSocketFactoryTest.cs +++ b/test/UnitTest/Services/TcpSocketFactoryTest.cs @@ -103,9 +103,12 @@ public async Task Send_Timeout() var port = 8887; var server = StartTcpServer(port, MockSplitPackageAsync); - var client = CreateClient(); - client.Options.SendTimeout = 100; - client.SetDataHandler(new MockSendTimeoutHandler()); + var client = CreateClient(builder => + { + // 增加发送报错 MockSocket + builder.AddTransient(); + }); + client.Options.SendTimeout = 10; await client.ConnectAsync("localhost", port); @@ -117,7 +120,11 @@ public async Task Send_Timeout() [Fact] public async Task SendAsync_Error() { - var client = CreateClient(); + var client = CreateClient(builder => + { + // 增加发送报错 MockSocket + builder.AddTransient(); + }); // 测试未建立连接前调用 SendAsync 方法报异常逻辑 var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); @@ -128,7 +135,6 @@ public async Task SendAsync_Error() var port = 8892; var server = StartTcpServer(port, MockSplitPackageAsync); - client.SetDataHandler(new MockSendErrorHandler()); await client.ConnectAsync("localhost", port); Assert.True(client.IsConnected); @@ -156,24 +162,8 @@ public async Task SendAsync_Cancel() var result = await client.SendAsync("test", null, cst.Token); Assert.False(result); - // 设置延时发送适配器 - // 延时发送期间关闭 Socket 连接导致内部报错 - client.SetDataHandler(new MockSendCancelHandler() - { - Socket = client - }); - - var tcs = new TaskCompletionSource(); - bool? sendResult = null; - // 测试发送失败逻辑 - _ = Task.Run(async () => - { - sendResult = await client.SendAsync("test", Encoding.UTF8); - tcs.SetResult(); - }); - - await tcs.Task; - Assert.False(sendResult); + result = await client.SendAsync("test", Encoding.UTF8, cst.Token); + Assert.False(result); // 关闭连接 StopTcpServer(server); @@ -187,7 +177,6 @@ public async Task ReceiveAsync_Timeout() var client = CreateClient(); client.Options.ReceiveTimeout = 100; - client.SetDataHandler(new MockReceiveTimeoutHandler()); await client.ConnectAsync("localhost", port); @@ -203,8 +192,6 @@ public async Task ReceiveAsync_Cancel() var server = StartTcpServer(port, MockSplitPackageAsync); var client = CreateClient(); - client.SetDataHandler(new MockReceiveTimeoutHandler()); - await client.ConnectAsync("localhost", port); var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); @@ -285,8 +272,6 @@ public async Task ReceiveAsync_Error() client.Options.ReceiveBufferSize = 1024 * 20; Assert.Equal(1024 * 20, client.Options.ReceiveBufferSize); - client.SetDataHandler(new MockReceiveErrorHandler()); - ReadOnlyMemory buffer = ReadOnlyMemory.Empty; var tcs = new TaskCompletionSource(); @@ -311,51 +296,39 @@ public async Task ReceiveAsync_Error() StopTcpServer(server); } - [Fact] - public async Task CloseByRemote_Ok() - { - var client = CreateClient(); - - var port = 8883; - var server = StartTcpServer(port, MockAutoClosePackageAsync); - - client.SetDataHandler(new MockReceiveErrorHandler()); - - // 连接 TCP Server - await client.ConnectAsync("localhost", port); - - // 发送数据 - await client.SendAsync(new ReadOnlyMemory([1, 2, 3, 4, 5])); - - // 关闭连接 - StopTcpServer(server); - } - [Fact] public async Task FixLengthDataPackageHandler_Ok() { var port = 8884; var server = StartTcpServer(port, MockSplitPackageAsync); var client = CreateClient(); - - // 测试 ConnectAsync 方法 - var connect = await client.ConnectAsync("localhost", port); - Assert.True(connect); - Assert.True(client.IsConnected); - var tcs = new TaskCompletionSource(); - ReadOnlyMemory receivedBuffer = ReadOnlyMemory.Empty; + var receivedBuffer = new byte[1024]; - // 增加数据处理适配器 - client.SetDataHandler(new FixLengthDataPackageHandler(7) + // 设置数据适配器 + var adapter = new DataPackageAdapter { + DataPackageHandler = new FixLengthDataPackageHandler(7), ReceivedCallBack = buffer => { - receivedBuffer = buffer; + // buffer 即是接收到的数据 + buffer.CopyTo(receivedBuffer); + receivedBuffer = receivedBuffer[..buffer.Length]; tcs.SetResult(); return ValueTask.CompletedTask; } - }); + }; + + client.ReceivedCallBack = async buffer => + { + // 将接收到的数据传递给 DataPackageAdapter + await adapter.ReceiveAsync(buffer); + }; + + // 测试 ConnectAsync 方法 + var connect = await client.ConnectAsync("localhost", port); + Assert.True(connect); + Assert.True(client.IsConnected); // 测试 SendAsync 方法 var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); @@ -365,9 +338,6 @@ public async Task FixLengthDataPackageHandler_Ok() await tcs.Task; Assert.Equal(receivedBuffer.ToArray(), [1, 2, 3, 4, 5, 3, 4]); - // 模拟延时等待内部继续读取逻辑完成,测试内部 _receiveCancellationTokenSource 取消逻辑 - await Task.Delay(10); - // 关闭连接 await client.CloseAsync(); StopTcpServer(server); @@ -379,23 +349,31 @@ public async Task FixLengthDataPackageHandler_Sticky() var port = 8885; var server = StartTcpServer(port, MockStickyPackageAsync); var client = CreateClient(); + var tcs = new TaskCompletionSource(); + var receivedBuffer = new byte[1024]; // 连接 TCP Server var connect = await client.ConnectAsync("localhost", port); - var tcs = new TaskCompletionSource(); - ReadOnlyMemory receivedBuffer = ReadOnlyMemory.Empty; - - // 增加数据库处理适配器 - client.SetDataHandler(new FixLengthDataPackageHandler(7) + // 设置数据适配器 + var adapter = new DataPackageAdapter { + DataPackageHandler = new FixLengthDataPackageHandler(7), ReceivedCallBack = buffer => { - receivedBuffer = buffer; + // buffer 即是接收到的数据 + buffer.CopyTo(receivedBuffer); + receivedBuffer = receivedBuffer[..buffer.Length]; tcs.SetResult(); return ValueTask.CompletedTask; } - }); + }; + + client.ReceivedCallBack = async buffer => + { + // 将接收到的数据传递给 DataPackageAdapter + await adapter.ReceiveAsync(buffer); + }; // 发送数据 var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); @@ -406,7 +384,9 @@ public async Task FixLengthDataPackageHandler_Sticky() // 验证接收到的数据 Assert.Equal(receivedBuffer.ToArray(), [1, 2, 3, 4, 5, 3, 4]); - receivedBuffer = ReadOnlyMemory.Empty; + + // 重置接收缓冲区 + receivedBuffer = new byte[1024]; tcs = new TaskCompletionSource(); // 等待第二次数据 @@ -431,23 +411,31 @@ public async Task DelimiterDataPackageHandler_Ok() var port = 8886; var server = StartTcpServer(port, MockDelimiterPackageAsync); var client = CreateClient(); - - // 连接 TCP Server - var connect = await client.ConnectAsync("localhost", port); - var tcs = new TaskCompletionSource(); - ReadOnlyMemory receivedBuffer = ReadOnlyMemory.Empty; + var receivedBuffer = new byte[1024]; - // 增加数据库处理适配器 - client.SetDataHandler(new DelimiterDataPackageHandler([0x13, 0x10]) + // 设置数据适配器 + var adapter = new DataPackageAdapter { + DataPackageHandler = new DelimiterDataPackageHandler(new byte[] { 13, 10 }), ReceivedCallBack = buffer => { - receivedBuffer = buffer; + // buffer 即是接收到的数据 + buffer.CopyTo(receivedBuffer); + receivedBuffer = receivedBuffer[..buffer.Length]; tcs.SetResult(); return ValueTask.CompletedTask; } - }); + }; + + client.ReceivedCallBack = async buffer => + { + // 将接收到的数据传递给 DataPackageAdapter + await adapter.ReceiveAsync(buffer); + }; + + // 连接 TCP Server + var connect = await client.ConnectAsync("localhost", port); // 发送数据 var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); @@ -457,15 +445,15 @@ public async Task DelimiterDataPackageHandler_Ok() await tcs.Task; // 验证接收到的数据 - Assert.Equal(receivedBuffer.ToArray(), [1, 2, 3, 4, 5, 0x13, 0x10]); + Assert.Equal(receivedBuffer.ToArray(), [1, 2, 3, 4, 5, 13, 10]); // 等待第二次数据 - receivedBuffer = ReadOnlyMemory.Empty; + receivedBuffer = new byte[1024]; tcs = new TaskCompletionSource(); await tcs.Task; // 验证接收到的数据 - Assert.Equal(receivedBuffer.ToArray(), [5, 6, 0x13, 0x10]); + Assert.Equal(receivedBuffer.ToArray(), [5, 6, 13, 10]); // 关闭连接 await client.CloseAsync(); @@ -515,7 +503,7 @@ private static async Task MockDelimiterPackageAsync(TcpClient client) await Task.Delay(20); // 模拟拆包发送第二段数据 - await stream.WriteAsync(new byte[] { 0x13, 0x10, 0x5, 0x6, 0x13, 0x10 }, CancellationToken.None); + await stream.WriteAsync(new byte[] { 13, 10, 0x5, 0x6, 13, 10 }, CancellationToken.None); } } @@ -573,18 +561,12 @@ private static async Task MockStickyPackageAsync(TcpClient client) } } - private static Task MockAutoClosePackageAsync(TcpClient client) - { - client.Close(); - return Task.CompletedTask; - } - private static void StopTcpServer(TcpListener server) { server?.Stop(); } - private static ITcpSocketClient CreateClient() + private static ITcpSocketClient CreateClient(Action? builder = null) { var sc = new ServiceCollection(); sc.AddLogging(builder => @@ -592,6 +574,10 @@ private static ITcpSocketClient CreateClient() builder.AddProvider(new MockLoggerProvider()); }); sc.AddBootstrapBlazorTcpSocketFactory(); + if (builder != null) + { + builder(sc); + } var provider = sc.BuildServiceProvider(); var factory = provider.GetRequiredService(); var client = factory.GetOrCreate("test", op => op.LocalEndPoint = Utility.ConvertToIpEndPoint("localhost", 0)); @@ -629,64 +615,61 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except } } - class MockSendErrorHandler : DataPackageHandlerBase + class MockSendErrorSocketProvider : ISocketClientProvider { - public ITcpSocketClient? Socket { get; set; } + public bool IsConnected { get; private set; } + + public IPEndPoint LocalEndPoint { get; set; } - public override ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) + public ValueTask CloseAsync() { - throw new Exception("Mock send failed"); + return ValueTask.CompletedTask; } - } - class MockSendCancelHandler : DataPackageHandlerBase - { - public ITcpSocketClient? Socket { get; set; } + public ValueTask ConnectAsync(IPEndPoint endPoint, CancellationToken token = default) + { + IsConnected = true; + return ValueTask.FromResult(true); + } - public override async ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) + public ValueTask ReceiveAsync(Memory buffer, CancellationToken token = default) { - if (Socket != null) - { - await Socket.CloseAsync(); - } - await Task.Delay(10, token); - return data; + return ValueTask.FromResult(0); + } + + public ValueTask SendAsync(ReadOnlyMemory data, CancellationToken token = default) + { + throw new Exception("Mock send error"); } } - class MockReceiveErrorHandler : DataPackageHandlerBase + class MockSendTimeoutSocketProvider : ISocketClientProvider { - public override ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) + public bool IsConnected { get; private set; } + + public IPEndPoint LocalEndPoint { get; set; } + + public ValueTask CloseAsync() { - return ValueTask.FromResult(data); + return ValueTask.CompletedTask; } - public override async ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default) + public ValueTask ConnectAsync(IPEndPoint endPoint, CancellationToken token = default) { - await base.ReceiveAsync(data, token); - - // 模拟接收数据时报错 - throw new InvalidOperationException("Test Error"); + IsConnected = true; + return ValueTask.FromResult(true); } - } - class MockSendTimeoutHandler : DataPackageHandlerBase - { - public override async ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) + public ValueTask ReceiveAsync(Memory buffer, CancellationToken token = default) { - // 模拟发送超时 - await Task.Delay(200, token); - return data; + return ValueTask.FromResult(0); } - } - class MockReceiveTimeoutHandler : DataPackageHandlerBase - { - public override async ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default) + public async ValueTask SendAsync(ReadOnlyMemory data, CancellationToken token = default) { - // 模拟接收超时 - await Task.Delay(200, token); - await base.ReceiveAsync(data, token); + // 模拟超时发送 + await Task.Delay(100, token); + return false; } } }