Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License
// See the LICENSE file in the project root for more information.
// Maintainer: Argo Zhang([email protected]) Website: https://www.blazor.zone

using System.Net;
using System.Net.Sockets;
using System.Runtime.Versioning;

namespace BootstrapBlazor.Components;

/// <summary>
/// TcpSocket 客户端默认实现
/// </summary>
[UnsupportedOSPlatform("browser")]
class DefaultSocketClient(IPEndPoint localEndPoint) : ISocketClient
{
private TcpClient? _client;

/// <summary>
/// <inheritdoc/>
/// </summary>
public bool IsConnected => _client?.Connected ?? false;

/// <summary>
/// <inheritdoc/>
/// </summary>
public IPEndPoint LocalEndPoint { get; set; } = localEndPoint;

/// <summary>
/// <inheritdoc/>
/// </summary>
public async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken token = default)
{
_client = new TcpClient(LocalEndPoint);
await _client.ConnectAsync(endPoint, token);
if (_client.Connected)
{
if (_client.Client.LocalEndPoint is IPEndPoint localEndPoint)
{
LocalEndPoint = localEndPoint;
}
}
return _client.Connected;
}

/// <summary>
/// <inheritdoc/>
/// </summary>
public async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
{
var ret = false;
if (_client is { Connected: true })
{
var stream = _client.GetStream();
await stream.WriteAsync(data, token);
ret = true;
}
return ret;
}

/// <summary>
/// <inheritdoc/>
/// </summary>
public async ValueTask<int> ReceiveAsync(Memory<byte> buffer, CancellationToken token = default)
{
var len = 0;
if (_client is { Connected: true })
{
var stream = _client.GetStream();
len = await stream.ReadAsync(buffer, token);
}
return len;
}

/// <summary>
/// <inheritdoc/>
/// </summary>
public ValueTask CloseAsync()
{
if (_client != null)
{
_client.Close();
_client = null;
}
return ValueTask.CompletedTask;
}
}
225 changes: 16 additions & 209 deletions src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,225 +3,32 @@
// See the LICENSE file in the project root for more information.
// Maintainer: Argo Zhang([email protected]) Website: https://www.blazor.zone

using Microsoft.Extensions.Logging;
using System.Buffers;
using System.Net;
using System.Net.Sockets;
using System.Runtime.Versioning;

namespace BootstrapBlazor.Components;

[UnsupportedOSPlatform("browser")]
sealed class DefaultTcpSocketClient(IPEndPoint localEndPoint) : TcpSocketClientBase
sealed class DefaultTcpSocketClient : TcpSocketClientBase<DefaultSocketClient>
{
private TcpClient? _client;
private CancellationTokenSource? _receiveCancellationTokenSource;
private IPEndPoint? _remoteEndPoint;

public override bool IsConnected => _client?.Connected ?? false;

[NotNull]
public ILogger<DefaultTcpSocketClient>? Logger { get; init; }

public override async ValueTask<bool> ConnectAsync(IPEndPoint endPoint, CancellationToken token = default)
{
var ret = false;
try
{
// 释放资源
await CloseAsync();

// 创建新的 TcpClient 实例
_client ??= new TcpClient(localEndPoint);
LocalEndPoint = localEndPoint;
_remoteEndPoint = null;

var connectionToken = token;
if (ConnectTimeout > 0)
{
// 设置连接超时时间
var connectTokenSource = new CancellationTokenSource(ConnectTimeout);
connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token;
}
await _client.ConnectAsync(endPoint, connectionToken);

if (_client.Connected)
{
_remoteEndPoint = endPoint;

// 设置本地端点信息
if (_client.Client.LocalEndPoint is IPEndPoint local)
{
LocalEndPoint = local;
}
if (IsAutoReceive)
{
_ = Task.Run(AutoReceiveAsync, token);
}
}
ret = _client.Connected;
}
catch (OperationCanceledException ex)
{
Logger.LogWarning(ex, token.IsCancellationRequested
? "TCP Socket connect operation was canceled from {LocalEndPoint} to {RemoteEndPoint}"
: "TCP Socket connect operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint);
}
catch (Exception ex)
{
Logger.LogError(ex, "TCP Socket connection failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint);
}
return ret;
}

public override async ValueTask<bool> SendAsync(ReadOnlyMemory<byte> data, CancellationToken token = default)
public DefaultTcpSocketClient(SocketClientOptions options)
{
if (_client is not { Connected: true })
{
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
}

var ret = false;
try
{
var stream = _client.GetStream();

var sendToken = token;
if (SendTimeout > 0)
{
// 设置发送超时时间
var sendTokenSource = new CancellationTokenSource(SendTimeout);
sendToken = CancellationTokenSource.CreateLinkedTokenSource(token, sendTokenSource.Token).Token;
}

if (DataPackageHandler != null)
{
data = await DataPackageHandler.SendAsync(data, sendToken);
}

await stream.WriteAsync(data, sendToken);
ret = true;
}
catch (OperationCanceledException ex)
{
Logger.LogWarning(ex, token.IsCancellationRequested
? "TCP Socket send operation was canceled from {LocalEndPoint} to {RemoteEndPoint}"
: "TCP Socket send operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
}
catch (Exception ex)
{
Logger.LogError(ex, "TCP Socket send failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
}
return ret;
ReceiveBufferSize = Math.Max(1024, options.ReceiveBufferSize);
IsAutoReceive = options.IsAutoReceive;
ConnectTimeout = options.ConnectTimeout;
SendTimeout = options.SendTimeout;
ReceiveTimeout = options.ReceiveTimeout;
LocalEndPoint = options.LocalEndPoint;
}

public override async ValueTask<Memory<byte>> ReceiveAsync(CancellationToken token = default)
/// <summary>
/// <inheritdoc/>
/// </summary>
/// <param name="localEndPoint"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
protected override DefaultSocketClient CreateSocketClient(IPEndPoint localEndPoint)
{
if (_client is not { Connected: true })
{
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
}

if (IsAutoReceive)
{
throw new InvalidOperationException("Cannot call ReceiveAsync when IsAutoReceive is true. Use the auto-receive mechanism instead.");
}

using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
var buffer = block.Memory;
var len = await ReceiveCoreAsync(_client, buffer, token);
return buffer[..len];
}

private async ValueTask AutoReceiveAsync()
{
_receiveCancellationTokenSource ??= new();
while (_receiveCancellationTokenSource is { IsCancellationRequested: false })
{
if (_client is not { Connected: true })
{
throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}");
}

using var block = MemoryPool<byte>.Shared.Rent(ReceiveBufferSize);
var buffer = block.Memory;
var len = await ReceiveCoreAsync(_client, buffer, _receiveCancellationTokenSource.Token);
if (len == 0)
{
break;
}
}
}

private async ValueTask<int> ReceiveCoreAsync(TcpClient client, Memory<byte> buffer, CancellationToken token)
{
var len = 0;
try
{
var stream = client.GetStream();

var receiveToken = token;
if (ReceiveTimeout > 0)
{
// 设置接收超时时间
var receiveTokenSource = new CancellationTokenSource(ReceiveTimeout);
receiveToken = CancellationTokenSource.CreateLinkedTokenSource(receiveToken, receiveTokenSource.Token).Token;
}
len = await stream.ReadAsync(buffer, receiveToken);
if (len == 0)
{
// 远端主机关闭链路
Logger.LogInformation("TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
}
else
{
buffer = buffer[..len];

if (ReceivedCallBack != null)
{
await ReceivedCallBack(buffer);
}

if (DataPackageHandler != null)
{
await DataPackageHandler.ReceiveAsync(buffer, receiveToken);
}
len = buffer.Length;
}
}
catch (OperationCanceledException ex)
{
Logger.LogWarning(ex, token.IsCancellationRequested
? "TCP Socket receive operation canceled from {LocalEndPoint} to {RemoteEndPoint}"
: "TCP Socket receive operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
}
catch (Exception ex)
{
Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint);
}
return len;
}

protected override async ValueTask DisposeAsync(bool disposing)
{
await base.DisposeAsync(disposing);

if (disposing)
{
// 取消接收数据的任务
if (_receiveCancellationTokenSource != null)
{
_receiveCancellationTokenSource.Cancel();
_receiveCancellationTokenSource.Dispose();
_receiveCancellationTokenSource = null;
}

// 释放 TcpClient 资源
if (_client != null)
{
_client.Close();
_client = null;
}
}
return new DefaultSocketClient(localEndPoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Net;
using System.Runtime.Versioning;

namespace BootstrapBlazor.Components;
Expand All @@ -16,12 +15,13 @@ sealed class DefaultTcpSocketFactory(IServiceProvider provider) : ITcpSocketFact
{
private readonly ConcurrentDictionary<string, ITcpSocketClient> _pool = new();

public ITcpSocketClient GetOrCreate(string name, Func<string, IPEndPoint> valueFactory)
public ITcpSocketClient GetOrCreate(string name, Action<SocketClientOptions> valueFactory)
{
return _pool.GetOrAdd(name, key =>
{
var endPoint = valueFactory(key);
var client = new DefaultTcpSocketClient(endPoint)
var options = new SocketClientOptions();
valueFactory(options);
var client = new DefaultTcpSocketClient(options)
{
Logger = provider.GetService<ILogger<DefaultTcpSocketClient>>()
};
Expand Down
Loading