Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public abstract class DataPackageHandlerBase : IDataPackageHandler
/// <summary>
/// 当接收数据处理完成后,回调该函数执行接收
/// </summary>
public Func<Memory<byte>, Task>? ReceivedCallBack { get; set; }
public Func<ReadOnlyMemory<byte>, ValueTask>? ReceivedCallBack { get; set; }

/// <summary>
/// Sends the specified data asynchronously to the target destination.
Expand All @@ -29,19 +29,19 @@ public abstract class DataPackageHandlerBase : IDataPackageHandler
/// <param name="data">The data to be sent, represented as a block of memory.</param>
/// <returns>A task that represents the asynchronous operation. The task result contains a <see cref="Memory{T}"/> of <see
/// cref="byte"/> representing the response or acknowledgment received from the target destination.</returns>
public virtual Task<Memory<byte>> SendAsync(Memory<byte> data)
public virtual ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data)
{
return Task.FromResult(data);
return ValueTask.FromResult(data);
}

/// <summary>
/// Processes the received data asynchronously.
/// </summary>
/// <param name="data">A memory buffer containing the data to be processed. The buffer must not be empty.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public virtual Task ReceiveAsync(Memory<byte> data)
public virtual ValueTask ReceiveAsync(ReadOnlyMemory<byte> data)
{
return Task.CompletedTask;
return ValueTask.CompletedTask;
}

/// <summary>
Expand All @@ -52,7 +52,7 @@ public virtual Task ReceiveAsync(Memory<byte> data)
/// for the specified <paramref name="length"/>.</remarks>
/// <param name="buffer">The memory buffer containing the data to process.</param>
/// <param name="length">The length of the valid data within the buffer.</param>
protected void SlicePackage(Memory<byte> buffer, int length)
protected void SlicePackage(ReadOnlyMemory<byte> buffer, int length)
{
_lastReceiveBuffer = buffer[length..].ToArray().AsMemory();
}
Expand All @@ -66,7 +66,7 @@ protected void SlicePackage(Memory<byte> buffer, int length)
/// <param name="buffer">The buffer to concatenate with the previously stored data. Must not be empty.</param>
/// <returns>A <see cref="Memory{T}"/> instance containing the concatenated data. If no previously stored data exists, the
/// method returns the input <paramref name="buffer"/>.</returns>
protected Memory<byte> ConcatBuffer(Memory<byte> buffer)
protected ReadOnlyMemory<byte> ConcatBuffer(ReadOnlyMemory<byte> buffer)
{
if (_lastReceiveBuffer.IsEmpty)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public DelimiterDataPackageHandler(byte[] delimiter)
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
public override async Task ReceiveAsync(Memory<byte> data)
public override async ValueTask ReceiveAsync(ReadOnlyMemory<byte> data)
{
data = ConcatBuffer(data);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class FixLengthDataPackageHandler(int length) : DataPackageHandlerBase
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
public override async Task ReceiveAsync(Memory<byte> data)
public override async ValueTask ReceiveAsync(ReadOnlyMemory<byte> data)
{
while (data.Length > 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface IDataPackageHandler
/// <summary>
/// Gets or sets the callback function to be invoked when data is received asynchronously.
/// </summary>
Func<Memory<byte>, Task>? ReceivedCallBack { get; set; }
Func<ReadOnlyMemory<byte>, ValueTask>? ReceivedCallBack { get; set; }

/// <summary>
/// Sends the specified data asynchronously to the target destination.
Expand All @@ -27,7 +27,7 @@ public interface IDataPackageHandler
/// <param name="data">The data to be sent, represented as a block of memory.</param>
/// <returns>A task that represents the asynchronous operation. The task result contains a <see cref="Memory{T}"/> of <see
/// cref="byte"/> representing the response or acknowledgment received from the target destination.</returns>
Task<Memory<byte>> SendAsync(Memory<byte> data);
ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data);

/// <summary>
/// Asynchronously receives data from a source and writes it into the provided memory buffer.
Expand All @@ -37,5 +37,5 @@ public interface IDataPackageHandler
/// <param name="data">The memory buffer to store the received data. The buffer must be writable and have sufficient capacity.</param>
/// <returns>A task that represents the asynchronous operation. The task result contains the number of bytes written to the
/// buffer. Returns 0 if the end of the data stream is reached.</returns>
Task ReceiveAsync(Memory<byte> data);
ValueTask ReceiveAsync(ReadOnlyMemory<byte> data);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ public void SetDataHandler(IDataPackageHandler handler)
_dataPackageHandler = handler;
}

public Task<bool> ConnectAsync(string host, int port, CancellationToken token = default)
public ValueTask<bool> ConnectAsync(string host, int port, CancellationToken token = default)
{
var endPoint = new IPEndPoint(GetIPAddress(host), port);
return ConnectAsync(endPoint, token);
}

public async Task<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken token = default)
public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken token = default)
{
var ret = false;
try
Expand Down Expand Up @@ -81,7 +81,7 @@ public async Task<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken toke
return ret;
}

public async Task<bool> SendAsync(Memory<byte> data, CancellationToken token = default)
public async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
{
if (_client is not { Connected: true })
{
Expand Down Expand Up @@ -110,7 +110,7 @@ public async Task<bool> SendAsync(Memory<byte> data, CancellationToken token = d
return ret;
}

private async Task ReceiveAsync()
private async ValueTask ReceiveAsync()
{
_receiveCancellationTokenSource ??= new();
while (_receiveCancellationTokenSource is { IsCancellationRequested: false })
Expand Down
6 changes: 3 additions & 3 deletions src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface ITcpSocketClient : IDisposable
/// langword="default"/> if not provided.</param>
/// <returns>A task that represents the asynchronous operation. The task result is <see langword="true"/> if the connection
/// is successfully established; otherwise, <see langword="false"/>.</returns>
Task<bool> ConnectAsync(string host, int port, CancellationToken token = default);
ValueTask<bool> ConnectAsync(string host, int port, CancellationToken token = default);

/// <summary>
/// Establishes an asynchronous connection to the specified endpoint.
Expand All @@ -57,7 +57,7 @@ public interface ITcpSocketClient : IDisposable
/// langword="default"/> if not provided.</param>
/// <returns>A task that represents the asynchronous operation. The task result is <see langword="true"/> if the connection
/// is successfully established; otherwise, <see langword="false"/>.</returns>
Task<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken token = default);
ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken token = default);

/// <summary>
/// Sends the specified data asynchronously to the target endpoint.
Expand All @@ -69,7 +69,7 @@ public interface ITcpSocketClient : IDisposable
/// <param name="token">An optional <see cref="CancellationToken"/> to observe while waiting for the operation to complete.</param>
/// <returns>A task that represents the asynchronous operation. The task result is <see langword="true"/> if the data was
/// sent successfully; otherwise, <see langword="false"/>.</returns>
Task<bool> SendAsync(Memory<byte> data, CancellationToken token = default);
ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationToken token = default);

/// <summary>
/// Closes the current connection or resource, releasing any associated resources.
Expand Down
49 changes: 24 additions & 25 deletions test/UnitTest/Services/TcpSocketFactoryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public void GetOrCreate_Ok()
Assert.NotNull(client5);

client5.Dispose();

factory.Dispose();
}

Expand Down Expand Up @@ -73,8 +72,8 @@ public async Task SendAsync_Error()
var client = CreateClient();

// 测试未建立连接前调用 SendAsync 方法报异常逻辑
var data = new Memory<byte>([1, 2, 3, 4, 5]);
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => client.SendAsync(data));
var data = new ReadOnlyMemory<byte>([1, 2, 3, 4, 5]);
var ex = await Assert.ThrowsAsync<InvalidOperationException>(async () => await client.SendAsync(data));
Assert.Equal("TCP Socket is not connected 127.0.0.1:0", ex.Message);
}

Expand All @@ -95,7 +94,7 @@ public async Task SendAsync_Cancel()
var cst = new CancellationTokenSource();
cst.Cancel();

var data = new Memory<byte>([1, 2, 3, 4, 5]);
var data = new ReadOnlyMemory<byte>([1, 2, 3, 4, 5]);
var result = await client.SendAsync(data, cst.Token);
Assert.False(result);

Expand Down Expand Up @@ -131,7 +130,7 @@ public async Task ReceiveAsync_Error()
var methodInfo = client.GetType().GetMethod("ReceiveAsync", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
Assert.NotNull(methodInfo);

var task = (Task)methodInfo.Invoke(client, null)!;
var task = (ValueTask)methodInfo.Invoke(client, null)!;
var ex = await Assert.ThrowsAsync<InvalidOperationException>(async () => await task);
Assert.NotNull(ex);

Expand All @@ -147,7 +146,7 @@ public async Task ReceiveAsync_Error()
await client.ConnectAsync("localhost", port);

// 发送数据导致接收数据异常
var data = new Memory<byte>([1, 2, 3, 4, 5]);
var data = new ReadOnlyMemory<byte>([1, 2, 3, 4, 5]);
await client.SendAsync(data);

// 关闭连接
Expand All @@ -168,7 +167,7 @@ public async Task CloseByRemote_Ok()
await client.ConnectAsync("localhost", port);

// 发送数据
await client.SendAsync(new Memory<byte>([1, 2, 3, 4, 5]));
await client.SendAsync(new ReadOnlyMemory<byte>([1, 2, 3, 4, 5]));

// 关闭连接
StopTcpServer(server);
Expand All @@ -187,7 +186,7 @@ public async Task FixLengthDataPackageHandler_Ok()
Assert.True(client.IsConnected);

var tcs = new TaskCompletionSource();
Memory<byte> receivedBuffer = Memory<byte>.Empty;
ReadOnlyMemory<byte> receivedBuffer = ReadOnlyMemory<byte>.Empty;

// 增加数据处理适配器
client.SetDataHandler(new FixLengthDataPackageHandler(7)
Expand All @@ -196,12 +195,12 @@ public async Task FixLengthDataPackageHandler_Ok()
{
receivedBuffer = buffer;
tcs.SetResult();
return Task.CompletedTask;
return ValueTask.CompletedTask;
}
});

// 测试 SendAsync 方法
var data = new Memory<byte>([1, 2, 3, 4, 5]);
var data = new ReadOnlyMemory<byte>([1, 2, 3, 4, 5]);
var result = await client.SendAsync(data);
Assert.True(result);

Expand All @@ -227,7 +226,7 @@ public async Task FixLengthDataPackageHandler_Sticky()
var connect = await client.ConnectAsync("localhost", port);

var tcs = new TaskCompletionSource();
Memory<byte> receivedBuffer = Memory<byte>.Empty;
ReadOnlyMemory<byte> receivedBuffer = ReadOnlyMemory<byte>.Empty;

// 增加数据库处理适配器
client.SetDataHandler(new FixLengthDataPackageHandler(7)
Expand All @@ -236,20 +235,20 @@ public async Task FixLengthDataPackageHandler_Sticky()
{
receivedBuffer = buffer;
tcs.SetResult();
return Task.CompletedTask;
return ValueTask.CompletedTask;
}
});

// 发送数据
var data = new Memory<byte>([1, 2, 3, 4, 5]);
var data = new ReadOnlyMemory<byte>([1, 2, 3, 4, 5]);
await client.SendAsync(data);

// 等待接收数据处理完成
await tcs.Task;

// 验证接收到的数据
Assert.Equal(receivedBuffer.ToArray(), [1, 2, 3, 4, 5, 3, 4]);
receivedBuffer = Memory<byte>.Empty;
receivedBuffer = ReadOnlyMemory<byte>.Empty;
tcs = new TaskCompletionSource();

// 等待第二次数据
Expand Down Expand Up @@ -282,7 +281,7 @@ public async Task DelimiterDataPackageHandler_Ok()
var connect = await client.ConnectAsync("localhost", port);

var tcs = new TaskCompletionSource();
Memory<byte> receivedBuffer = Memory<byte>.Empty;
ReadOnlyMemory<byte> receivedBuffer = ReadOnlyMemory<byte>.Empty;

// 增加数据库处理适配器
client.SetDataHandler(new DelimiterDataPackageHandler([0x13, 0x10])
Expand All @@ -291,12 +290,12 @@ public async Task DelimiterDataPackageHandler_Ok()
{
receivedBuffer = buffer;
tcs.SetResult();
return Task.CompletedTask;
return ValueTask.CompletedTask;
}
});

// 发送数据
var data = new Memory<byte>([1, 2, 3, 4, 5]);
var data = new ReadOnlyMemory<byte>([1, 2, 3, 4, 5]);
await client.SendAsync(data);

// 等待接收数据处理完成
Expand All @@ -306,7 +305,7 @@ public async Task DelimiterDataPackageHandler_Ok()
Assert.Equal(receivedBuffer.ToArray(), [1, 2, 3, 4, 5, 0x13, 0x10]);

// 等待第二次数据
receivedBuffer = Memory<byte>.Empty;
receivedBuffer = ReadOnlyMemory<byte>.Empty;
tcs = new TaskCompletionSource();
await tcs.Task;

Expand Down Expand Up @@ -355,7 +354,7 @@ private static async Task MockDelimiterPackageAsync(TcpClient client)
}

// 回写数据到客户端
var block = new Memory<byte>(buffer, 0, len);
var block = new ReadOnlyMemory<byte>(buffer, 0, len);
await stream.WriteAsync(block, CancellationToken.None);

await Task.Delay(20);
Expand All @@ -378,7 +377,7 @@ private static async Task MockSplitPackageAsync(TcpClient client)
}

// 回写数据到客户端
var block = new Memory<byte>(buffer, 0, len);
var block = new ReadOnlyMemory<byte>(buffer, 0, len);
await stream.WriteAsync(block, CancellationToken.None);

// 模拟延时
Expand All @@ -402,7 +401,7 @@ private static async Task MockStickyPackageAsync(TcpClient client)
}

// 回写数据到客户端
var block = new Memory<byte>(buffer, 0, len);
var block = new ReadOnlyMemory<byte>(buffer, 0, len);
await stream.WriteAsync(block, CancellationToken.None);

// 模拟延时
Expand Down Expand Up @@ -479,7 +478,7 @@ class MockSendErrorHandler : DataPackageHandlerBase
{
public ITcpSocketClient? Socket { get; set; }

public override async Task<Memory<byte>> SendAsync(Memory<byte> data)
public override async ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data)
{
Socket?.Close();
await Task.Delay(10);
Expand All @@ -489,12 +488,12 @@ public override async Task<Memory<byte>> SendAsync(Memory<byte> data)

class MockReceiveErrorHandler : DataPackageHandlerBase
{
public override Task<Memory<byte>> SendAsync(Memory<byte> data)
public override ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data)
{
return Task.FromResult(data);
return ValueTask.FromResult(data);
}

public override async Task ReceiveAsync(Memory<byte> data)
public override async ValueTask ReceiveAsync(ReadOnlyMemory<byte> data)
{
await base.ReceiveAsync(data);

Expand Down