Skip to content

Commit 62ed6e6

Browse files
authored
feat(DelimiterDataPackageHandler): add DelimiterDataPackageHandler class (#6260)
* feat: 增加分隔符数据处理器 * refactor: 更新分隔符数据处理器 * test: 更新单元测试 * test: 增加单元测试 * refactor: 支持多分隔符 * test: 更新单元测试
1 parent 9bd5946 commit 62ed6e6

File tree

2 files changed

+164
-2
lines changed

2 files changed

+164
-2
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the Apache 2.0 License
3+
// See the LICENSE file in the project root for more information.
4+
// Maintainer: Argo Zhang([email protected]) Website: https://www.blazor.zone
5+
6+
using System.Buffers;
7+
using System.Text;
8+
9+
namespace BootstrapBlazor.Components;
10+
11+
/// <summary>
12+
/// Handles data packages that are delimited by a specific sequence of bytes or characters.
13+
/// </summary>
14+
/// <remarks>This class provides functionality for processing data packages that are separated by a defined
15+
/// delimiter. The delimiter can be specified as a string with an optional encoding or as a byte array.</remarks>
16+
public class DelimiterDataPackageHandler : DataPackageHandlerBase
17+
{
18+
private readonly ReadOnlyMemory<byte> _delimiter;
19+
20+
/// <summary>
21+
/// Initializes a new instance of the <see cref="DelimiterDataPackageHandler"/> class with the specified delimiter
22+
/// and optional encoding.
23+
/// </summary>
24+
/// <param name="delimiter">The string delimiter used to separate data packages. This value cannot be null or empty.</param>
25+
/// <param name="encoding">The character encoding used to convert the delimiter to bytes. If null, <see cref="Encoding.UTF8"/> is used as
26+
/// the default.</param>
27+
/// <exception cref="ArgumentNullException">Thrown if <paramref name="delimiter"/> is null or empty.</exception>
28+
public DelimiterDataPackageHandler(string delimiter, Encoding? encoding = null)
29+
{
30+
if (string.IsNullOrEmpty(delimiter))
31+
{
32+
throw new ArgumentNullException(nameof(delimiter), "Delimiter cannot be null or empty.");
33+
}
34+
35+
encoding ??= Encoding.UTF8;
36+
_delimiter = encoding.GetBytes(delimiter);
37+
}
38+
39+
/// <summary>
40+
/// Initializes a new instance of the <see cref="DelimiterDataPackageHandler"/> class with the specified delimiters.
41+
/// </summary>
42+
/// <param name="delimiter">An array of bytes representing the delimiters used to parse data packages. Cannot be <see langword="null"/>.</param>
43+
/// <exception cref="ArgumentNullException">Thrown if <paramref name="delimiter"/> is <see langword="null"/>.</exception>
44+
public DelimiterDataPackageHandler(byte[] delimiter)
45+
{
46+
_delimiter = delimiter ?? throw new ArgumentNullException(nameof(delimiter), "Delimiter cannot be null.");
47+
}
48+
49+
/// <summary>
50+
/// <inheritdoc/>
51+
/// </summary>
52+
/// <param name="data"></param>
53+
/// <returns></returns>
54+
public override async Task ReceiveAsync(Memory<byte> data)
55+
{
56+
data = ConcatBuffer(data);
57+
58+
while (data.Length > 0)
59+
{
60+
var index = data.Span.IndexOfAny(_delimiter.Span);
61+
var segment = index == -1 ? data : data[..index];
62+
var length = segment.Length + _delimiter.Length;
63+
using var buffer = MemoryPool<byte>.Shared.Rent(length);
64+
segment.CopyTo(buffer.Memory);
65+
66+
if (index != -1)
67+
{
68+
SlicePackage(data, index + _delimiter.Length);
69+
70+
_delimiter.CopyTo(buffer.Memory[index..]);
71+
if (ReceivedCallBack != null)
72+
{
73+
await ReceivedCallBack(buffer.Memory[..length].ToArray());
74+
}
75+
76+
data = data[(index + _delimiter.Length)..];
77+
}
78+
else
79+
{
80+
SlicePackage(data, 0);
81+
break;
82+
}
83+
}
84+
}
85+
}

test/UnitTest/Services/TcpSocketFactoryTest.cs

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public async Task CloseByRemote_Ok()
177177
[Fact]
178178
public async Task FixLengthDataPackageHandler_Ok()
179179
{
180-
var port = 8888;
180+
var port = 8884;
181181
var server = StartTcpServer(port, MockSplitPackageAsync);
182182
var client = CreateClient();
183183

@@ -219,7 +219,7 @@ public async Task FixLengthDataPackageHandler_Ok()
219219
[Fact]
220220
public async Task FixLengthDataPackageHandler_Sticky()
221221
{
222-
var port = 8899;
222+
var port = 8885;
223223
var server = StartTcpServer(port, MockStickyPackageAsync);
224224
var client = CreateClient();
225225

@@ -263,6 +263,60 @@ public async Task FixLengthDataPackageHandler_Sticky()
263263
StopTcpServer(server);
264264
}
265265

266+
[Fact]
267+
public async Task DelimiterDataPackageHandler_Ok()
268+
{
269+
var port = 8886;
270+
var server = StartTcpServer(port, MockDelimiterPackageAsync);
271+
var client = CreateClient();
272+
273+
// 连接 TCP Server
274+
var connect = await client.ConnectAsync("localhost", port);
275+
276+
var tcs = new TaskCompletionSource();
277+
Memory<byte> receivedBuffer = Memory<byte>.Empty;
278+
279+
// 增加数据库处理适配器
280+
client.SetDataHandler(new DelimiterDataPackageHandler([0x13, 0x10])
281+
{
282+
ReceivedCallBack = buffer =>
283+
{
284+
receivedBuffer = buffer;
285+
tcs.SetResult();
286+
return Task.CompletedTask;
287+
}
288+
});
289+
290+
// 发送数据
291+
var data = new Memory<byte>([1, 2, 3, 4, 5]);
292+
await client.SendAsync(data);
293+
294+
// 等待接收数据处理完成
295+
await tcs.Task;
296+
297+
// 验证接收到的数据
298+
Assert.Equal(receivedBuffer.ToArray(), [1, 2, 3, 4, 5, 0x13, 0x10]);
299+
300+
// 等待第二次数据
301+
receivedBuffer = Memory<byte>.Empty;
302+
tcs = new TaskCompletionSource();
303+
await tcs.Task;
304+
305+
// 验证接收到的数据
306+
Assert.Equal(receivedBuffer.ToArray(), [5, 6, 0x13, 0x10]);
307+
308+
// 关闭连接
309+
client.Close();
310+
StopTcpServer(server);
311+
312+
var handler = new DelimiterDataPackageHandler("\r\n");
313+
var ex = Assert.Throws<ArgumentNullException>(() => new DelimiterDataPackageHandler(string.Empty));
314+
Assert.NotNull(ex);
315+
316+
ex = Assert.Throws<ArgumentNullException>(() => new DelimiterDataPackageHandler((byte[])null!));
317+
Assert.NotNull(ex);
318+
}
319+
266320
private static TcpListener StartTcpServer(int port, Func<TcpClient, Task> handler)
267321
{
268322
var server = new TcpListener(IPAddress.Loopback, port);
@@ -280,6 +334,29 @@ private static async Task AcceptClientsAsync(TcpListener server, Func<TcpClient,
280334
}
281335
}
282336

337+
private static async Task MockDelimiterPackageAsync(TcpClient client)
338+
{
339+
using var stream = client.GetStream();
340+
while (true)
341+
{
342+
var buffer = new byte[10240];
343+
var len = await stream.ReadAsync(buffer);
344+
if (len == 0)
345+
{
346+
break;
347+
}
348+
349+
// 回写数据到客户端
350+
var block = new Memory<byte>(buffer, 0, len);
351+
await stream.WriteAsync(block, CancellationToken.None);
352+
353+
await Task.Delay(20);
354+
355+
// 模拟拆包发送第二段数据
356+
await stream.WriteAsync(new byte[] { 0x13, 0x10, 0x5, 0x6, 0x13, 0x10 }, CancellationToken.None);
357+
}
358+
}
359+
283360
private static async Task MockSplitPackageAsync(TcpClient client)
284361
{
285362
using var stream = client.GetStream();

0 commit comments

Comments
 (0)