Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,28 @@ public abstract class DataPackageHandlerBase : IDataPackageHandler
private Memory<byte> _lastReceiveBuffer = Memory<byte>.Empty;

/// <summary>
/// Gets or sets the callback function to handle received data.
/// <inheritdoc/>
/// </summary>
/// <remarks>The callback function should be designed to handle the received data efficiently and
/// asynchronously. Ensure that the implementation does not block or perform long-running operations, as this may
/// impact performance.</remarks>
public Func<ReadOnlyMemory<byte>, ValueTask>? ReceivedCallBack { get; set; }

/// <summary>
/// Sends the specified data asynchronously to the target destination.
/// <inheritdoc/>
/// </summary>
/// <remarks>The method performs an asynchronous operation to send the provided data. The caller must
/// ensure that the data is valid and non-empty. The returned memory block may contain a response or acknowledgment
/// depending on the implementation of the target destination.</remarks>
/// <param name="data">The data to be sent, represented as a block of memory.</param>
/// <returns>A task that represents the asynchronous operation. The task result contains a <see cref="Memory{T}"/> of <see
/// cref="byte"/> representing the response or acknowledgment received from the target destination.</returns>
public virtual ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data)
/// <param name="data"></param>
/// <param name="token"></param>
/// <returns></returns>
public virtual ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
{
return ValueTask.FromResult(data);
}

/// <summary>
/// Processes the received data asynchronously.
/// <inheritdoc/>
/// </summary>
/// <param name="data">A memory buffer containing the data to be processed. The buffer must not be empty.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public virtual ValueTask ReceiveAsync(ReadOnlyMemory<byte> data)
/// <param name="data"></param>
/// <param name="token"></param>
/// <returns></returns>
public virtual ValueTask ReceiveAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
{
return ValueTask.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ public DelimiterDataPackageHandler(byte[] delimiter)
/// <inheritdoc/>
/// </summary>
/// <param name="data"></param>
/// <param name="token"></param>
/// <returns></returns>
public override async ValueTask ReceiveAsync(ReadOnlyMemory<byte> data)
public override async ValueTask ReceiveAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
{
data = ConcatBuffer(data);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ public class FixLengthDataPackageHandler(int length) : DataPackageHandlerBase
/// <inheritdoc/>
/// </summary>
/// <param name="data"></param>
/// <param name="token"></param>
/// <returns></returns>
public override async ValueTask ReceiveAsync(ReadOnlyMemory<byte> data)
public override async ValueTask ReceiveAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
{
while (data.Length > 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ public interface IDataPackageHandler
/// ensure that the data is valid and non-empty. The returned memory block may contain a response or acknowledgment
/// depending on the implementation of the target destination.</remarks>
/// <param name="data">The data to be sent, represented as a block of memory.</param>
/// <param name="token">An optional <see cref="CancellationToken"/> to observe while waiting for the operation to complete.</param>
/// <returns>A task that represents the asynchronous operation. The task result contains a <see cref="Memory{T}"/> of <see
/// cref="byte"/> representing the response or acknowledgment received from the target destination.</returns>
ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data);
ValueTask<ReadOnlyMemory<byte>> SendAsync(ReadOnlyMemory<byte> data, CancellationToken token = default);

/// <summary>
/// Asynchronously receives data from a source and writes it into the provided memory buffer.
/// </summary>
/// <remarks>This method does not guarantee that the entire buffer will be filled. The number of bytes
/// written depends on the availability of data.</remarks>
/// <param name="data">The memory buffer to store the received data. The buffer must be writable and have sufficient capacity.</param>
/// <param name="token">A cancellation token that can be used to cancel the operation. The default value is <see langword="default"/>.</param>
/// <returns>A task that represents the asynchronous operation. The task result contains the number of bytes written to the
/// buffer. Returns 0 if the end of the data stream is reached.</returns>
ValueTask ReceiveAsync(ReadOnlyMemory<byte> data);
ValueTask ReceiveAsync(ReadOnlyMemory<byte> data, CancellationToken token = default);
}
165 changes: 126 additions & 39 deletions src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace BootstrapBlazor.Components;

[UnsupportedOSPlatform("browser")]
sealed class DefaultTcpSocketClient(IPEndPoint endPoint) : ITcpSocketClient
sealed class DefaultTcpSocketClient(IPEndPoint localEndPoint) : ITcpSocketClient
{
private TcpClient? _client;
private IDataPackageHandler? _dataPackageHandler;
Expand All @@ -21,15 +21,23 @@ sealed class DefaultTcpSocketClient(IPEndPoint endPoint) : ITcpSocketClient

public bool IsConnected => _client?.Connected ?? false;

public IPEndPoint LocalEndPoint { get; set; } = endPoint;
public IPEndPoint? LocalEndPoint { get; set; }

[NotNull]
public ILogger<DefaultTcpSocketClient>? Logger { get; set; }

public int ReceiveBufferSize { get; set; } = 1024 * 10;
public int ReceiveBufferSize { get; set; } = 1024 * 64;

public bool IsAutoReceive { get; set; } = true;

public Func<ReadOnlyMemory<byte>, ValueTask>? ReceivedCallBack { get; set; }

public int ConnectTimeout { get; set; }

public int SendTimeout { get; set; }

public int ReceiveTimeout { get; set; }

public void SetDataHandler(IDataPackageHandler handler)
{
_dataPackageHandler = handler;
Expand All @@ -44,19 +52,37 @@ public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken
Close();

// 创建新的 TcpClient 实例
_client ??= new TcpClient(LocalEndPoint);
await _client.ConnectAsync(endPoint, token);
_client ??= new TcpClient(localEndPoint);

// 开始接收数据
_ = Task.Run(ReceiveAsync, token);
var connectionToken = token;
if (ConnectTimeout > 0)
{
// 设置连接超时时间
var connectTokenSource = new CancellationTokenSource(ConnectTimeout);
connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token;
}
await _client.ConnectAsync(endPoint, connectionToken);

// 设置本地以及远端端点信息
LocalEndPoint = (IPEndPoint)_client.Client.LocalEndPoint!;
_remoteEndPoint = endPoint;

if (IsAutoReceive)
{
_ = Task.Run(AutoReceiveAsync);
}
ret = true;
}
catch (OperationCanceledException ex)
{
Logger.LogWarning(ex, "TCP Socket connect operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint);
if (token.IsCancellationRequested)
{
Logger.LogWarning(ex, "TCP Socket connect operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint);
}
else
{
Logger.LogWarning(ex, "TCP Socket connect operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint);
}
}
catch (Exception ex)
{
Expand All @@ -75,17 +101,34 @@ public async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationTo
var ret = false;
try
{
var stream = _client.GetStream();

var sendToken = token;
if (SendTimeout > 0)
{
// 设置发送超时时间
var sendTokenSource = new CancellationTokenSource(SendTimeout);
sendToken = CancellationTokenSource.CreateLinkedTokenSource(token, sendTokenSource.Token).Token;
}

if (_dataPackageHandler != null)
{
data = await _dataPackageHandler.SendAsync(data);
data = await _dataPackageHandler.SendAsync(data, sendToken);
}
var stream = _client.GetStream();
await stream.WriteAsync(data, token);

await stream.WriteAsync(data, sendToken);
ret = true;
}
catch (OperationCanceledException ex)
{
Logger.LogWarning(ex, "TCP Socket send operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
if (token.IsCancellationRequested)
{
Logger.LogWarning(ex, "TCP Socket send operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
}
else
{
Logger.LogWarning(ex, "TCP Socket send operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
}
}
catch (Exception ex)
{
Expand All @@ -94,7 +137,25 @@ public async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationTo
return ret;
}

private async ValueTask ReceiveAsync()
public async ValueTask<Memory<byte>> ReceiveAsync(CancellationToken token = default)
{
if (_client == null || !_client.Connected)
{
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
}

if (IsAutoReceive)
{
throw new InvalidOperationException("Cannot call ReceiveAsync when IsAutoReceive is true. Use the auto-receive mechanism instead.");
}

using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
var buffer = block.Memory;
var len = await ReceiveCoreAsync(_client, buffer, token);
return buffer[0..len];
}

private async ValueTask AutoReceiveAsync()
{
_receiveCancellationTokenSource ??= new();
while (_receiveCancellationTokenSource is { IsCancellationRequested: false })
Expand All @@ -104,42 +165,67 @@ private async ValueTask ReceiveAsync()
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
}

try
using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
var buffer = block.Memory;
var len = await ReceiveCoreAsync(_client, buffer, _receiveCancellationTokenSource.Token);
if (len == 0)
{
break;
}
}
}

private async ValueTask<int> ReceiveCoreAsync(TcpClient client, Memory<byte> buffer, CancellationToken token)
{
var len = 0;
try
{
var stream = client.GetStream();

var receiveToken = token;
if (ReceiveTimeout > 0)
{
using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
var buffer = block.Memory;
var stream = _client.GetStream();
var len = await stream.ReadAsync(buffer, _receiveCancellationTokenSource.Token);
if (len == 0)
// 设置接收超时时间
var receiveTokenSource = new CancellationTokenSource(ReceiveTimeout);
receiveToken = CancellationTokenSource.CreateLinkedTokenSource(receiveToken, receiveTokenSource.Token).Token;
}
len = await stream.ReadAsync(buffer, receiveToken);
if (len == 0)
{
// 远端主机关闭链路
Logger.LogInformation("TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
}
else
{
buffer = buffer[..len];

if (ReceivedCallBack != null)
{
// 远端主机关闭链路
Logger.LogInformation("TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
break;
await ReceivedCallBack(buffer);
}
else
{
buffer = buffer[..len];

if (ReceivedCallBack != null)
{
await ReceivedCallBack(buffer);
}

if (_dataPackageHandler != null)
{
await _dataPackageHandler.ReceiveAsync(buffer);
}
if (_dataPackageHandler != null)
{
await _dataPackageHandler.ReceiveAsync(buffer, receiveToken);
}
}
catch (OperationCanceledException ex)
}
catch (OperationCanceledException ex)
{
if (token.IsCancellationRequested)
{
Logger.LogWarning(ex, "TCP Socket receive operation was canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
Logger.LogWarning(ex, "TCP Socket receive operation canceled from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
}
catch (Exception ex)
else
{
Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
Logger.LogWarning(ex, "TCP Socket receive operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
}
}
catch (Exception ex)
{
Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
}
return len;
}

public void Close()
Expand All @@ -151,10 +237,11 @@ private void Dispose(bool disposing)
{
if (disposing)
{
LocalEndPoint = null;
_remoteEndPoint = null;

// 取消接收数据的任务
if (_receiveCancellationTokenSource is not null)
if (_receiveCancellationTokenSource != null)
{
_receiveCancellationTokenSource.Cancel();
_receiveCancellationTokenSource.Dispose();
Expand Down
Loading
Loading