From edae8c8004478c70b90a3e7e00d18f6be794ac95 Mon Sep 17 00:00:00 2001 From: Argo Zhang Date: Thu, 19 Jun 2025 17:46:23 +0800 Subject: [PATCH 1/6] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=88=86?= =?UTF-8?q?=E9=9A=94=E7=AC=A6=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DelimiterDataPackageHandler.cs | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs diff --git a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs new file mode 100644 index 00000000000..e72c30d78fb --- /dev/null +++ b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs @@ -0,0 +1,79 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License +// See the LICENSE file in the project root for more information. +// Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone + +using System.Buffers; +using System.Text; + +namespace BootstrapBlazor.Components; + +/// +/// Handles data packages that are delimited by a specific sequence of bytes or characters. +/// +/// This class provides functionality for processing data packages that are separated by a defined +/// delimiter. The delimiter can be specified as a string with an optional encoding or as a byte array. +public class DelimiterDataPackageHandler : DataPackageHandlerBase +{ + private readonly ReadOnlyMemory _delimiter; + + private Memory _lastReceiveBuffer = Memory.Empty; + + /// + /// Initializes a new instance of the class with the specified delimiter + /// and optional encoding. + /// + /// The string delimiter used to separate data packages. This value cannot be null or empty. + /// The character encoding used to convert the delimiter to bytes. If null, is used as + /// the default. + /// Thrown if is null or empty. + public DelimiterDataPackageHandler(string delimiter, Encoding? encoding = null) + { + if (string.IsNullOrEmpty(delimiter)) + { + throw new ArgumentNullException(nameof(delimiter), "Delimiter cannot be null or empty."); + } + + encoding ??= Encoding.UTF8; + _delimiter = encoding.GetBytes(delimiter); + } + + /// + /// Initializes a new instance of the class with the specified delimiters. + /// + /// An array of bytes representing the delimiters used to parse data packages. Cannot be . + /// Thrown if is . + public DelimiterDataPackageHandler(byte[] delimiter) + { + _delimiter = delimiter ?? throw new ArgumentNullException(nameof(delimiter), "Delimiter cannot be null."); + } + + /// + /// + /// + /// + /// + public override async Task ReceiveAsync(Memory data) + { + var index = data.Span.IndexOfAny(_delimiter.Span); + var segment = index == -1 ? data : data[..index]; + + var total = _lastReceiveBuffer.Length + segment.Length; + using var buffer = MemoryPool.Shared.Rent(total); + + if (_lastReceiveBuffer.Length > 0) + { + _lastReceiveBuffer.CopyTo(buffer.Memory); + } + segment.CopyTo(buffer.Memory[_lastReceiveBuffer.Length..]); + + if (index != -1) + { + _lastReceiveBuffer = data[(index + _delimiter.Length)..].ToArray(); + if (ReceivedCallBack != null) + { + await ReceivedCallBack(buffer.Memory[..total].ToArray()); + } + } + } +} From 9d8204611b6d96daffd25106ed7d6cd132478c3d Mon Sep 17 00:00:00 2001 From: Argo Zhang Date: Thu, 19 Jun 2025 18:19:43 +0800 Subject: [PATCH 2/6] =?UTF-8?q?refactor:=20=E6=9B=B4=E6=96=B0=E5=88=86?= =?UTF-8?q?=E9=9A=94=E7=AC=A6=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DelimiterDataPackageHandler.cs | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs index e72c30d78fb..916df0ba5c4 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs @@ -17,8 +17,6 @@ public class DelimiterDataPackageHandler : DataPackageHandlerBase { private readonly ReadOnlyMemory _delimiter; - private Memory _lastReceiveBuffer = Memory.Empty; - /// /// Initializes a new instance of the class with the specified delimiter /// and optional encoding. @@ -55,25 +53,28 @@ public DelimiterDataPackageHandler(byte[] delimiter) /// public override async Task ReceiveAsync(Memory data) { + data = ConcatBuffer(data); + var index = data.Span.IndexOfAny(_delimiter.Span); var segment = index == -1 ? data : data[..index]; - var total = _lastReceiveBuffer.Length + segment.Length; - using var buffer = MemoryPool.Shared.Rent(total); - - if (_lastReceiveBuffer.Length > 0) - { - _lastReceiveBuffer.CopyTo(buffer.Memory); - } - segment.CopyTo(buffer.Memory[_lastReceiveBuffer.Length..]); + var length = segment.Length + _delimiter.Length; + using var buffer = MemoryPool.Shared.Rent(length); + segment.CopyTo(buffer.Memory); if (index != -1) { - _lastReceiveBuffer = data[(index + _delimiter.Length)..].ToArray(); + SlicePackage(data, index + _delimiter.Length); + + _delimiter.CopyTo(buffer.Memory[index..]); if (ReceivedCallBack != null) { - await ReceivedCallBack(buffer.Memory[..total].ToArray()); + await ReceivedCallBack(buffer.Memory[..length].ToArray()); } } + else + { + SlicePackage(data, 0); + } } } From 17f77cedc58f651632255bd7afbdd20b477ff5f2 Mon Sep 17 00:00:00 2001 From: Argo Zhang Date: Thu, 19 Jun 2025 18:19:51 +0800 Subject: [PATCH 3/6] =?UTF-8?q?test:=20=E6=9B=B4=E6=96=B0=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../UnitTest/Services/TcpSocketFactoryTest.cs | 64 ++++++++++++++++++- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/test/UnitTest/Services/TcpSocketFactoryTest.cs b/test/UnitTest/Services/TcpSocketFactoryTest.cs index cf3fe492878..1fefc711851 100644 --- a/test/UnitTest/Services/TcpSocketFactoryTest.cs +++ b/test/UnitTest/Services/TcpSocketFactoryTest.cs @@ -177,7 +177,7 @@ public async Task CloseByRemote_Ok() [Fact] public async Task FixLengthDataPackageHandler_Ok() { - var port = 8888; + var port = 8884; var server = StartTcpServer(port, MockSplitPackageAsync); var client = CreateClient(); @@ -219,7 +219,7 @@ public async Task FixLengthDataPackageHandler_Ok() [Fact] public async Task FixLengthDataPackageHandler_Sticky() { - var port = 8899; + var port = 8885; var server = StartTcpServer(port, MockStickyPackageAsync); var client = CreateClient(); @@ -263,6 +263,45 @@ public async Task FixLengthDataPackageHandler_Sticky() StopTcpServer(server); } + [Fact] + public async Task DelimiterDataPackageHandler_Ok() + { + var port = 8886; + var server = StartTcpServer(port, MockDelimiterPackageAsync); + var client = CreateClient(); + + // 连接 TCP Server + var connect = await client.ConnectAsync("localhost", port); + + var tcs = new TaskCompletionSource(); + Memory receivedBuffer = Memory.Empty; + + // 增加数据库处理适配器 + client.SetDataHandler(new DelimiterDataPackageHandler(new byte[] { 0x13, 0x10 }) + { + ReceivedCallBack = buffer => + { + receivedBuffer = buffer; + tcs.SetResult(); + return Task.CompletedTask; + } + }); + + // 发送数据 + var data = new Memory([1, 2, 3, 4, 5]); + await client.SendAsync(data); + + // 等待接收数据处理完成 + await tcs.Task; + + // 验证接收到的数据 + Assert.Equal(receivedBuffer.ToArray(), [1, 2, 3, 4, 5, 0x13, 0x10]); + + // 关闭连接 + client.Close(); + StopTcpServer(server); + } + private static TcpListener StartTcpServer(int port, Func handler) { var server = new TcpListener(IPAddress.Loopback, port); @@ -280,6 +319,27 @@ private static async Task AcceptClientsAsync(TcpListener server, Func(buffer, 0, len); + await stream.WriteAsync(block, CancellationToken.None); + + // 模拟拆包发送第二段数据 + await stream.WriteAsync(new byte[] { 0x13, 0x10, 0x5, 0x6 }, CancellationToken.None); + } + } + private static async Task MockSplitPackageAsync(TcpClient client) { using var stream = client.GetStream(); From ac5c33544f5951e3c69121b0e04491123fc4bc57 Mon Sep 17 00:00:00 2001 From: Argo Zhang Date: Thu, 19 Jun 2025 18:40:34 +0800 Subject: [PATCH 4/6] =?UTF-8?q?test:=20=E5=A2=9E=E5=8A=A0=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/UnitTest/Services/TcpSocketFactoryTest.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/test/UnitTest/Services/TcpSocketFactoryTest.cs b/test/UnitTest/Services/TcpSocketFactoryTest.cs index 1fefc711851..16a7ba403e6 100644 --- a/test/UnitTest/Services/TcpSocketFactoryTest.cs +++ b/test/UnitTest/Services/TcpSocketFactoryTest.cs @@ -277,7 +277,7 @@ public async Task DelimiterDataPackageHandler_Ok() Memory receivedBuffer = Memory.Empty; // 增加数据库处理适配器 - client.SetDataHandler(new DelimiterDataPackageHandler(new byte[] { 0x13, 0x10 }) + client.SetDataHandler(new DelimiterDataPackageHandler([0x13, 0x10]) { ReceivedCallBack = buffer => { @@ -300,6 +300,14 @@ public async Task DelimiterDataPackageHandler_Ok() // 关闭连接 client.Close(); StopTcpServer(server); + + var handler = new DelimiterDataPackageHandler("\r\n"); + + var ex = Assert.Throws(() => new DelimiterDataPackageHandler(string.Empty)); + Assert.NotNull(ex); + + ex = Assert.Throws(() => new DelimiterDataPackageHandler((byte[])null!)); + Assert.NotNull(ex); } private static TcpListener StartTcpServer(int port, Func handler) From d76fd36ea4a0792494a5e1bfdba3d73591110ee2 Mon Sep 17 00:00:00 2001 From: Argo Zhang Date: Thu, 19 Jun 2025 19:08:46 +0800 Subject: [PATCH 5/6] =?UTF-8?q?refactor:=20=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E5=88=86=E9=9A=94=E7=AC=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DelimiterDataPackageHandler.cs | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs index 916df0ba5c4..22e6a042495 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DataPackage/DelimiterDataPackageHandler.cs @@ -55,26 +55,31 @@ public override async Task ReceiveAsync(Memory data) { data = ConcatBuffer(data); - var index = data.Span.IndexOfAny(_delimiter.Span); - var segment = index == -1 ? data : data[..index]; + while (data.Length > 0) + { + var index = data.Span.IndexOfAny(_delimiter.Span); + var segment = index == -1 ? data : data[..index]; + var length = segment.Length + _delimiter.Length; + using var buffer = MemoryPool.Shared.Rent(length); + segment.CopyTo(buffer.Memory); - var length = segment.Length + _delimiter.Length; - using var buffer = MemoryPool.Shared.Rent(length); - segment.CopyTo(buffer.Memory); + if (index != -1) + { + SlicePackage(data, index + _delimiter.Length); - if (index != -1) - { - SlicePackage(data, index + _delimiter.Length); + _delimiter.CopyTo(buffer.Memory[index..]); + if (ReceivedCallBack != null) + { + await ReceivedCallBack(buffer.Memory[..length].ToArray()); + } - _delimiter.CopyTo(buffer.Memory[index..]); - if (ReceivedCallBack != null) + data = data[(index + _delimiter.Length)..]; + } + else { - await ReceivedCallBack(buffer.Memory[..length].ToArray()); + SlicePackage(data, 0); + break; } } - else - { - SlicePackage(data, 0); - } } } From bf9e10f216b34e519781c8529b27a5ee667e351f Mon Sep 17 00:00:00 2001 From: Argo Zhang Date: Thu, 19 Jun 2025 19:08:57 +0800 Subject: [PATCH 6/6] =?UTF-8?q?test:=20=E6=9B=B4=E6=96=B0=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/UnitTest/Services/TcpSocketFactoryTest.cs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/test/UnitTest/Services/TcpSocketFactoryTest.cs b/test/UnitTest/Services/TcpSocketFactoryTest.cs index 16a7ba403e6..3e610bb8599 100644 --- a/test/UnitTest/Services/TcpSocketFactoryTest.cs +++ b/test/UnitTest/Services/TcpSocketFactoryTest.cs @@ -297,12 +297,19 @@ public async Task DelimiterDataPackageHandler_Ok() // 验证接收到的数据 Assert.Equal(receivedBuffer.ToArray(), [1, 2, 3, 4, 5, 0x13, 0x10]); + // 等待第二次数据 + receivedBuffer = Memory.Empty; + tcs = new TaskCompletionSource(); + await tcs.Task; + + // 验证接收到的数据 + Assert.Equal(receivedBuffer.ToArray(), [5, 6, 0x13, 0x10]); + // 关闭连接 client.Close(); StopTcpServer(server); var handler = new DelimiterDataPackageHandler("\r\n"); - var ex = Assert.Throws(() => new DelimiterDataPackageHandler(string.Empty)); Assert.NotNull(ex); @@ -343,8 +350,10 @@ private static async Task MockDelimiterPackageAsync(TcpClient client) var block = new Memory(buffer, 0, len); await stream.WriteAsync(block, CancellationToken.None); + await Task.Delay(20); + // 模拟拆包发送第二段数据 - await stream.WriteAsync(new byte[] { 0x13, 0x10, 0x5, 0x6 }, CancellationToken.None); + await stream.WriteAsync(new byte[] { 0x13, 0x10, 0x5, 0x6, 0x13, 0x10 }, CancellationToken.None); } }