diff --git a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs new file mode 100644 index 00000000000..22e6a042495 --- /dev/null +++ b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs @@ -0,0 +1,85 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License +// See the LICENSE file in the project root for more information. +// Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone + +using System.Buffers; +using System.Text; + +namespace BootstrapBlazor.Components; + +/// +/// Handles data packages that are delimited by a specific sequence of bytes or characters. +/// +/// This class provides functionality for processing data packages that are separated by a defined +/// delimiter. The delimiter can be specified as a string with an optional encoding or as a byte array. +public class DelimiterDataPackageHandler : DataPackageHandlerBase +{ + private readonly ReadOnlyMemory _delimiter; + + /// + /// Initializes a new instance of the class with the specified delimiter + /// and optional encoding. + /// + /// The string delimiter used to separate data packages. This value cannot be null or empty. + /// The character encoding used to convert the delimiter to bytes. If null, is used as + /// the default. + /// Thrown if is null or empty. + public DelimiterDataPackageHandler(string delimiter, Encoding? encoding = null) + { + if (string.IsNullOrEmpty(delimiter)) + { + throw new ArgumentNullException(nameof(delimiter), "Delimiter cannot be null or empty."); + } + + encoding ??= Encoding.UTF8; + _delimiter = encoding.GetBytes(delimiter); + } + + /// + /// Initializes a new instance of the class with the specified delimiters. + /// + /// An array of bytes representing the delimiters used to parse data packages. Cannot be . + /// Thrown if is . + public DelimiterDataPackageHandler(byte[] delimiter) + { + _delimiter = delimiter ?? throw new ArgumentNullException(nameof(delimiter), "Delimiter cannot be null."); + } + + /// + /// + /// + /// + /// + public override async Task ReceiveAsync(Memory data) + { + data = ConcatBuffer(data); + + while (data.Length > 0) + { + var index = data.Span.IndexOfAny(_delimiter.Span); + var segment = index == -1 ? data : data[..index]; + var length = segment.Length + _delimiter.Length; + using var buffer = MemoryPool.Shared.Rent(length); + segment.CopyTo(buffer.Memory); + + if (index != -1) + { + SlicePackage(data, index + _delimiter.Length); + + _delimiter.CopyTo(buffer.Memory[index..]); + if (ReceivedCallBack != null) + { + await ReceivedCallBack(buffer.Memory[..length].ToArray()); + } + + data = data[(index + _delimiter.Length)..]; + } + else + { + SlicePackage(data, 0); + break; + } + } + } +} diff --git a/test/UnitTest/Services/TcpSocketFactoryTest.cs b/test/UnitTest/Services/TcpSocketFactoryTest.cs index cf3fe492878..3e610bb8599 100644 --- a/test/UnitTest/Services/TcpSocketFactoryTest.cs +++ b/test/UnitTest/Services/TcpSocketFactoryTest.cs @@ -177,7 +177,7 @@ public async Task CloseByRemote_Ok() [Fact] public async Task FixLengthDataPackageHandler_Ok() { - var port = 8888; + var port = 8884; var server = StartTcpServer(port, MockSplitPackageAsync); var client = CreateClient(); @@ -219,7 +219,7 @@ public async Task FixLengthDataPackageHandler_Ok() [Fact] public async Task FixLengthDataPackageHandler_Sticky() { - var port = 8899; + var port = 8885; var server = StartTcpServer(port, MockStickyPackageAsync); var client = CreateClient(); @@ -263,6 +263,60 @@ public async Task FixLengthDataPackageHandler_Sticky() StopTcpServer(server); } + [Fact] + public async Task DelimiterDataPackageHandler_Ok() + { + var port = 8886; + var server = StartTcpServer(port, MockDelimiterPackageAsync); + var client = CreateClient(); + + // 连接 TCP Server + var connect = await client.ConnectAsync("localhost", port); + + var tcs = new TaskCompletionSource(); + Memory receivedBuffer = Memory.Empty; + + // 增加数据库处理适配器 + client.SetDataHandler(new DelimiterDataPackageHandler([0x13, 0x10]) + { + ReceivedCallBack = buffer => + { + receivedBuffer = buffer; + tcs.SetResult(); + return Task.CompletedTask; + } + }); + + // 发送数据 + var data = new Memory([1, 2, 3, 4, 5]); + await client.SendAsync(data); + + // 等待接收数据处理完成 + await tcs.Task; + + // 验证接收到的数据 + Assert.Equal(receivedBuffer.ToArray(), [1, 2, 3, 4, 5, 0x13, 0x10]); + + // 等待第二次数据 + receivedBuffer = Memory.Empty; + tcs = new TaskCompletionSource(); + await tcs.Task; + + // 验证接收到的数据 + Assert.Equal(receivedBuffer.ToArray(), [5, 6, 0x13, 0x10]); + + // 关闭连接 + client.Close(); + StopTcpServer(server); + + var handler = new DelimiterDataPackageHandler("\r\n"); + var ex = Assert.Throws(() => new DelimiterDataPackageHandler(string.Empty)); + Assert.NotNull(ex); + + ex = Assert.Throws(() => new DelimiterDataPackageHandler((byte[])null!)); + Assert.NotNull(ex); + } + private static TcpListener StartTcpServer(int port, Func handler) { var server = new TcpListener(IPAddress.Loopback, port); @@ -280,6 +334,29 @@ private static async Task AcceptClientsAsync(TcpListener server, Func(buffer, 0, len); + await stream.WriteAsync(block, CancellationToken.None); + + await Task.Delay(20); + + // 模拟拆包发送第二段数据 + await stream.WriteAsync(new byte[] { 0x13, 0x10, 0x5, 0x6, 0x13, 0x10 }, CancellationToken.None); + } + } + private static async Task MockSplitPackageAsync(TcpClient client) { using var stream = client.GetStream();