diff --git a/src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClient.cs b/src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClient.cs new file mode 100644 index 00000000000..9b83c9197f0 --- /dev/null +++ b/src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClient.cs @@ -0,0 +1,88 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License +// See the LICENSE file in the project root for more information. +// Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone + +using System.Net; +using System.Net.Sockets; +using System.Runtime.Versioning; + +namespace BootstrapBlazor.Components; + +/// +/// TcpSocket 客户端默认实现 +/// +[UnsupportedOSPlatform("browser")] +class DefaultSocketClient(IPEndPoint localEndPoint) : ISocketClient +{ + private TcpClient? _client; + + /// + /// + /// + public bool IsConnected => _client?.Connected ?? false; + + /// + /// + /// + public IPEndPoint LocalEndPoint { get; set; } = localEndPoint; + + /// + /// + /// + public async ValueTask ConnectAsync(IPEndPoint endPoint, CancellationToken token = default) + { + _client = new TcpClient(LocalEndPoint); + await _client.ConnectAsync(endPoint, token); + if (_client.Connected) + { + if (_client.Client.LocalEndPoint is IPEndPoint localEndPoint) + { + LocalEndPoint = localEndPoint; + } + } + return _client.Connected; + } + + /// + /// + /// + public async ValueTask SendAsync(ReadOnlyMemory data, CancellationToken token = default) + { + var ret = false; + if (_client is { Connected: true }) + { + var stream = _client.GetStream(); + await stream.WriteAsync(data, token); + ret = true; + } + return ret; + } + + /// + /// + /// + public async ValueTask ReceiveAsync(Memory buffer, CancellationToken token = default) + { + var len = 0; + if (_client is { Connected: true }) + { + var stream = _client.GetStream(); + len = await stream.ReadAsync(buffer, token); + } + return len; + } + + /// + /// + /// + public ValueTask CloseAsync() + { + if (_client != null) + { + _client.Close(); + _client = null; + } + return ValueTask.CompletedTask; + } +} diff --git a/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs b/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs index 45837b5e12d..6ac0cc02d90 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs @@ -3,225 +3,32 @@ // See the LICENSE file in the project root for more information. // Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone -using Microsoft.Extensions.Logging; -using System.Buffers; using System.Net; -using System.Net.Sockets; using System.Runtime.Versioning; namespace BootstrapBlazor.Components; [UnsupportedOSPlatform("browser")] -sealed class DefaultTcpSocketClient(IPEndPoint localEndPoint) : TcpSocketClientBase +sealed class DefaultTcpSocketClient : TcpSocketClientBase { - private TcpClient? _client; - private CancellationTokenSource? _receiveCancellationTokenSource; - private IPEndPoint? _remoteEndPoint; - - public override bool IsConnected => _client?.Connected ?? false; - - [NotNull] - public ILogger? Logger { get; init; } - - public override async ValueTask ConnectAsync(IPEndPoint endPoint, CancellationToken token = default) - { - var ret = false; - try - { - // 释放资源 - await CloseAsync(); - - // 创建新的 TcpClient 实例 - _client ??= new TcpClient(localEndPoint); - LocalEndPoint = localEndPoint; - _remoteEndPoint = null; - - var connectionToken = token; - if (ConnectTimeout > 0) - { - // 设置连接超时时间 - var connectTokenSource = new CancellationTokenSource(ConnectTimeout); - connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token; - } - await _client.ConnectAsync(endPoint, connectionToken); - - if (_client.Connected) - { - _remoteEndPoint = endPoint; - - // 设置本地端点信息 - if (_client.Client.LocalEndPoint is IPEndPoint local) - { - LocalEndPoint = local; - } - if (IsAutoReceive) - { - _ = Task.Run(AutoReceiveAsync, token); - } - } - ret = _client.Connected; - } - catch (OperationCanceledException ex) - { - Logger.LogWarning(ex, token.IsCancellationRequested - ? "TCP Socket connect operation was canceled from {LocalEndPoint} to {RemoteEndPoint}" - : "TCP Socket connect operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint); - } - catch (Exception ex) - { - Logger.LogError(ex, "TCP Socket connection failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, endPoint); - } - return ret; - } - - public override async ValueTask SendAsync(ReadOnlyMemory data, CancellationToken token = default) + public DefaultTcpSocketClient(SocketClientOptions options) { - if (_client is not { Connected: true }) - { - throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}"); - } - - var ret = false; - try - { - var stream = _client.GetStream(); - - var sendToken = token; - if (SendTimeout > 0) - { - // 设置发送超时时间 - var sendTokenSource = new CancellationTokenSource(SendTimeout); - sendToken = CancellationTokenSource.CreateLinkedTokenSource(token, sendTokenSource.Token).Token; - } - - if (DataPackageHandler != null) - { - data = await DataPackageHandler.SendAsync(data, sendToken); - } - - await stream.WriteAsync(data, sendToken); - ret = true; - } - catch (OperationCanceledException ex) - { - Logger.LogWarning(ex, token.IsCancellationRequested - ? "TCP Socket send operation was canceled from {LocalEndPoint} to {RemoteEndPoint}" - : "TCP Socket send operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); - } - catch (Exception ex) - { - Logger.LogError(ex, "TCP Socket send failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); - } - return ret; + ReceiveBufferSize = Math.Max(1024, options.ReceiveBufferSize); + IsAutoReceive = options.IsAutoReceive; + ConnectTimeout = options.ConnectTimeout; + SendTimeout = options.SendTimeout; + ReceiveTimeout = options.ReceiveTimeout; + LocalEndPoint = options.LocalEndPoint; } - public override async ValueTask> ReceiveAsync(CancellationToken token = default) + /// + /// + /// + /// + /// + /// + protected override DefaultSocketClient CreateSocketClient(IPEndPoint localEndPoint) { - if (_client is not { Connected: true }) - { - throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}"); - } - - if (IsAutoReceive) - { - throw new InvalidOperationException("Cannot call ReceiveAsync when IsAutoReceive is true. Use the auto-receive mechanism instead."); - } - - using var block = MemoryPool.Shared.Rent(ReceiveBufferSize); - var buffer = block.Memory; - var len = await ReceiveCoreAsync(_client, buffer, token); - return buffer[..len]; - } - - private async ValueTask AutoReceiveAsync() - { - _receiveCancellationTokenSource ??= new(); - while (_receiveCancellationTokenSource is { IsCancellationRequested: false }) - { - if (_client is not { Connected: true }) - { - throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}"); - } - - using var block = MemoryPool.Shared.Rent(ReceiveBufferSize); - var buffer = block.Memory; - var len = await ReceiveCoreAsync(_client, buffer, _receiveCancellationTokenSource.Token); - if (len == 0) - { - break; - } - } - } - - private async ValueTask ReceiveCoreAsync(TcpClient client, Memory buffer, CancellationToken token) - { - var len = 0; - try - { - var stream = client.GetStream(); - - var receiveToken = token; - if (ReceiveTimeout > 0) - { - // 设置接收超时时间 - var receiveTokenSource = new CancellationTokenSource(ReceiveTimeout); - receiveToken = CancellationTokenSource.CreateLinkedTokenSource(receiveToken, receiveTokenSource.Token).Token; - } - len = await stream.ReadAsync(buffer, receiveToken); - if (len == 0) - { - // 远端主机关闭链路 - Logger.LogInformation("TCP Socket {LocalEndPoint} received 0 data closed by {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); - } - else - { - buffer = buffer[..len]; - - if (ReceivedCallBack != null) - { - await ReceivedCallBack(buffer); - } - - if (DataPackageHandler != null) - { - await DataPackageHandler.ReceiveAsync(buffer, receiveToken); - } - len = buffer.Length; - } - } - catch (OperationCanceledException ex) - { - Logger.LogWarning(ex, token.IsCancellationRequested - ? "TCP Socket receive operation canceled from {LocalEndPoint} to {RemoteEndPoint}" - : "TCP Socket receive operation timed out from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); - } - catch (Exception ex) - { - Logger.LogError(ex, "TCP Socket receive failed from {LocalEndPoint} to {RemoteEndPoint}", LocalEndPoint, _remoteEndPoint); - } - return len; - } - - protected override async ValueTask DisposeAsync(bool disposing) - { - await base.DisposeAsync(disposing); - - if (disposing) - { - // 取消接收数据的任务 - if (_receiveCancellationTokenSource != null) - { - _receiveCancellationTokenSource.Cancel(); - _receiveCancellationTokenSource.Dispose(); - _receiveCancellationTokenSource = null; - } - - // 释放 TcpClient 资源 - if (_client != null) - { - _client.Close(); - _client = null; - } - } + return new DefaultSocketClient(localEndPoint); } } diff --git a/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketFactory.cs b/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketFactory.cs index 3096441d04a..49037b345f8 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketFactory.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketFactory.cs @@ -6,7 +6,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System.Collections.Concurrent; -using System.Net; using System.Runtime.Versioning; namespace BootstrapBlazor.Components; @@ -16,12 +15,13 @@ sealed class DefaultTcpSocketFactory(IServiceProvider provider) : ITcpSocketFact { private readonly ConcurrentDictionary _pool = new(); - public ITcpSocketClient GetOrCreate(string name, Func valueFactory) + public ITcpSocketClient GetOrCreate(string name, Action valueFactory) { return _pool.GetOrAdd(name, key => { - var endPoint = valueFactory(key); - var client = new DefaultTcpSocketClient(endPoint) + var options = new SocketClientOptions(); + valueFactory(options); + var client = new DefaultTcpSocketClient(options) { Logger = provider.GetService>() }; diff --git a/src/BootstrapBlazor/Services/TcpSocket/ISocketClient.cs b/src/BootstrapBlazor/Services/TcpSocket/ISocketClient.cs new file mode 100644 index 00000000000..27962b3d6c4 --- /dev/null +++ b/src/BootstrapBlazor/Services/TcpSocket/ISocketClient.cs @@ -0,0 +1,76 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License +// See the LICENSE file in the project root for more information. +// Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone + +using System.Net; + +namespace BootstrapBlazor.Components; + +/// +/// Defines the contract for a socket client that provides asynchronous methods for connecting, sending, receiving, and +/// closing network connections. +/// +/// This interface is designed to facilitate network communication using sockets. It provides methods for +/// establishing connections, transmitting data, and receiving data asynchronously. Implementations of this interface +/// should ensure proper resource management, including closing connections and releasing resources when no longer +/// needed. +public interface ISocketClient +{ + /// + /// Gets a value indicating whether the connection is currently active. + /// + bool IsConnected { get; } + + /// + /// Gets the local network endpoint that the socket is bound to. + /// + /// This property provides information about the local endpoint of the socket, which is typically + /// used to identify the local address and port being used for communication. If the socket is not bound to a + /// specific local endpoint, this property may return . + IPEndPoint LocalEndPoint { get; } + + /// + /// Establishes an asynchronous connection to the specified endpoint. + /// + /// This method attempts to establish a connection to the specified endpoint. If the connection + /// fails, the method returns rather than throwing an exception. Ensure the endpoint is + /// valid and reachable before calling this method. + /// The representing the remote endpoint to connect to. + /// An optional to observe while waiting for the connection to complete. + /// A that represents the asynchronous operation. The result is if the connection was successfully established; otherwise, . + ValueTask ConnectAsync(IPEndPoint endPoint, CancellationToken token = default); + + /// + /// Sends the specified data asynchronously to the connected endpoint. + /// + /// This method performs a non-blocking operation to send data. If the operation is canceled via + /// the , the returned task will not complete successfully. Ensure the connected endpoint + /// is ready to receive data before calling this method. + /// The data to send, represented as a read-only memory block of bytes. + /// An optional cancellation token that can be used to cancel the operation. + /// A representing the asynchronous operation. The result is if the data was sent successfully; otherwise, . + ValueTask SendAsync(ReadOnlyMemory data, CancellationToken token = default); + + /// + /// Asynchronously receives data from a source and writes it into the specified buffer. + /// + /// This method does not guarantee that the buffer will be completely filled. The caller should + /// check the return value to determine the number of bytes received. + /// The memory buffer where the received data will be stored. Must be large enough to hold the incoming data. + /// A cancellation token that can be used to cancel the operation. Defaults to if not + /// provided. + /// A representing the asynchronous operation. The result is the number of bytes + /// successfully received and written into the buffer. Returns 0 if the end of the data stream is reached. + ValueTask ReceiveAsync(Memory buffer, CancellationToken token = default); + + /// + /// Closes the current connection or resource, releasing any associated resources. + /// + /// Once the connection or resource is closed, it cannot be reopened. Ensure that all necessary + /// operations are completed before calling this method. This method is typically used to clean up resources when + /// they are no longer needed. + ValueTask CloseAsync(); +} diff --git a/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs b/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs index 92bb9591378..e199d1744f0 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs @@ -53,7 +53,7 @@ public interface ITcpSocketClient : IAsyncDisposable /// This property provides information about the local endpoint of the socket, which is typically /// used to identify the local address and port being used for communication. If the socket is not bound to a /// specific local endpoint, this property may return . - IPEndPoint? LocalEndPoint { get; } + IPEndPoint LocalEndPoint { get; } /// /// Gets or sets the callback function to handle received data. diff --git a/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketFactory.cs b/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketFactory.cs index e1d4aa35720..4384477477c 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketFactory.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketFactory.cs @@ -3,8 +3,6 @@ // See the LICENSE file in the project root for more information. // Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone -using System.Net; - namespace BootstrapBlazor.Components; /// @@ -13,14 +11,14 @@ namespace BootstrapBlazor.Components; public interface ITcpSocketFactory : IAsyncDisposable { /// - /// Retrieves an existing TCP socket client by name or creates a new one using the specified factory function. + /// Retrieves an existing TCP socket client by name or creates a new one using the specified configuration. /// - /// The unique name identifying the TCP socket client. Cannot be null or empty. - /// A factory function that generates an for the client. The function is invoked if a - /// client with the specified name does not already exist. - /// An instance of associated with the specified name. If a client with the given - /// name already exists, the existing instance is returned; otherwise, a new instance is created. - ITcpSocketClient GetOrCreate(string name, Func valueFactory); + /// The unique name of the TCP socket client to retrieve or create. Cannot be null or empty. + /// A delegate used to configure the for the new TCP socket client if it does not + /// already exist. This delegate is invoked only when a new client is created. + /// An instance of corresponding to the specified name. If the client already exists, + /// the existing instance is returned; otherwise, a new instance is created and returned. + ITcpSocketClient GetOrCreate(string name, Action valueFactory); /// /// Removes the TCP socket client associated with the specified name. diff --git a/src/BootstrapBlazor/Services/TcpSocket/SocketClientOptions.cs b/src/BootstrapBlazor/Services/TcpSocket/SocketClientOptions.cs new file mode 100644 index 00000000000..95d7fecf6f6 --- /dev/null +++ b/src/BootstrapBlazor/Services/TcpSocket/SocketClientOptions.cs @@ -0,0 +1,53 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License +// See the LICENSE file in the project root for more information. +// Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone + +using System.Net; + +namespace BootstrapBlazor.Components; + +/// +/// Represents configuration options for a socket client, including buffer sizes, timeouts, and endpoints. +/// +/// Use this class to configure various settings for a socket client, such as connection timeouts, +/// buffer sizes, and local or remote endpoints. These options allow fine-tuning of socket behavior to suit specific +/// networking scenarios. +public class SocketClientOptions +{ + /// + /// Gets or sets the size, in bytes, of the receive buffer used by the connection. + /// + public int ReceiveBufferSize { get; set; } = 1024 * 64; + + /// + /// Gets or sets a value indicating whether automatic receiving data is enabled. Default is true. + /// + public bool IsAutoReceive { get; set; } = true; + + /// + /// Gets or sets the timeout duration, in milliseconds, for establishing a connection. + /// + public int ConnectTimeout { get; set; } + + /// + /// Gets or sets the duration, in milliseconds, to wait for a send operation to complete before timing out. + /// + /// If the send operation does not complete within the specified timeout period, an exception may + /// be thrown. + public int SendTimeout { get; set; } + + /// + /// Gets or sets the amount of time, in milliseconds, that the receiver will wait for a response before timing out. + /// + /// Use this property to configure the maximum wait time for receiving a response. Setting an + /// appropriate timeout can help prevent indefinite blocking in scenarios where responses may be delayed or + /// unavailable. + public int ReceiveTimeout { get; set; } + + /// + /// Gets or sets the local endpoint for the socket client. Default value is + /// + /// This property specifies the local network endpoint that the socket client will bind to when establishing a connection. + public IPEndPoint LocalEndPoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 0); +} diff --git a/src/BootstrapBlazor/Services/TcpSocket/TcpSocketClientBase.cs b/src/BootstrapBlazor/Services/TcpSocket/TcpSocketClientBase.cs index 0017109defe..9bd992bfc02 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/TcpSocketClientBase.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/TcpSocketClientBase.cs @@ -3,6 +3,8 @@ // See the LICENSE file in the project root for more information. // Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone +using Microsoft.Extensions.Logging; +using System.Buffers; using System.Net; namespace BootstrapBlazor.Components; @@ -14,17 +16,27 @@ namespace BootstrapBlazor.Components; /// This abstract class serves as a foundation for implementing TCP socket clients. It provides methods /// for connecting to a remote endpoint, sending and receiving data, and managing connection state. Derived classes can /// extend or customize the behavior as needed. -public abstract class TcpSocketClientBase : ITcpSocketClient +public abstract class TcpSocketClientBase : ITcpSocketClient where TSocketClient : class, ISocketClient { + /// + /// Gets or sets the underlying socket client used for network communication. + /// + protected TSocketClient? Client { get; set; } + /// /// /// - public abstract bool IsConnected { get; } + public ILogger? Logger { get; set; } /// /// /// - public IPEndPoint? LocalEndPoint { get; set; } + public bool IsConnected => Client?.IsConnected ?? false; + + /// + /// + /// + public IPEndPoint LocalEndPoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 0); /// /// @@ -61,6 +73,17 @@ public abstract class TcpSocketClientBase : ITcpSocketClient /// public IDataPackageHandler? DataPackageHandler { get; protected set; } + private IPEndPoint? _remoteEndPoint; + private IPEndPoint? _localEndPoint; + private CancellationTokenSource? _receiveCancellationTokenSource; + + /// + /// Creates and initializes a new instance of the socket client for the specified endpoint. + /// + /// The network endpoint to which the socket client will connect. Cannot be null. + /// An instance of configured for the specified endpoint. + protected abstract TSocketClient CreateSocketClient(IPEndPoint localEndPoint); + /// /// /// @@ -75,7 +98,53 @@ public virtual void SetDataHandler(IDataPackageHandler handler) /// /// /// - public abstract ValueTask ConnectAsync(IPEndPoint endPoint, CancellationToken token = default); + public async ValueTask ConnectAsync(IPEndPoint endPoint, CancellationToken token = default) + { + var ret = false; + try + { + // 释放资源 + await CloseAsync(); + + // 创建新的 TcpClient 实例 + Client ??= CreateSocketClient(LocalEndPoint); + _localEndPoint = LocalEndPoint; + _remoteEndPoint = null; + + var connectionToken = token; + if (ConnectTimeout > 0) + { + // 设置连接超时时间 + var connectTokenSource = new CancellationTokenSource(ConnectTimeout); + connectionToken = CancellationTokenSource.CreateLinkedTokenSource(token, connectTokenSource.Token).Token; + } + await Client.ConnectAsync(endPoint, connectionToken); + + if (Client.IsConnected) + { + _localEndPoint = Client.LocalEndPoint; + _remoteEndPoint = endPoint; + + if (IsAutoReceive) + { + _ = Task.Run(AutoReceiveAsync, token); + } + } + ret = Client.IsConnected; + } + catch (OperationCanceledException ex) + { + var message = token.IsCancellationRequested + ? $"TCP Socket connect operation was canceled from {LocalEndPoint} to {endPoint}" + : $"TCP Socket connect operation timed out from {LocalEndPoint} to {endPoint}"; + Log(LogLevel.Warning, ex, message); + } + catch (Exception ex) + { + Log(LogLevel.Error, ex, $"TCP Socket connection failed from {LocalEndPoint} to {endPoint}"); + } + return ret; + } /// /// @@ -83,14 +152,142 @@ public virtual void SetDataHandler(IDataPackageHandler handler) /// /// /// - public abstract ValueTask SendAsync(ReadOnlyMemory data, CancellationToken token = default); + public virtual async ValueTask SendAsync(ReadOnlyMemory data, CancellationToken token = default) + { + if (Client is not { IsConnected: true }) + { + throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}"); + } + + var ret = false; + try + { + var sendToken = token; + if (SendTimeout > 0) + { + // 设置发送超时时间 + var sendTokenSource = new CancellationTokenSource(SendTimeout); + sendToken = CancellationTokenSource.CreateLinkedTokenSource(token, sendTokenSource.Token).Token; + } + + if (DataPackageHandler != null) + { + data = await DataPackageHandler.SendAsync(data, sendToken); + } + + ret = await Client.SendAsync(data, sendToken); + } + catch (OperationCanceledException ex) + { + Log(LogLevel.Warning, ex, token.IsCancellationRequested + ? $"TCP Socket send operation was canceled from {_localEndPoint} to {_remoteEndPoint}" + : $"TCP Socket send operation timed out from {_localEndPoint} to {_remoteEndPoint}"); + } + catch (Exception ex) + { + Log(LogLevel.Error, ex, $"TCP Socket send failed from {_localEndPoint} to {_remoteEndPoint}"); + } + return ret; + } /// /// /// /// /// - public abstract ValueTask> ReceiveAsync(CancellationToken token = default); + public virtual async ValueTask> ReceiveAsync(CancellationToken token = default) + { + if (Client is not { IsConnected: true }) + { + throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}"); + } + + if (IsAutoReceive) + { + throw new InvalidOperationException("Cannot call ReceiveAsync when IsAutoReceive is true. Use the auto-receive mechanism instead."); + } + + using var block = MemoryPool.Shared.Rent(ReceiveBufferSize); + var buffer = block.Memory; + var len = await ReceiveCoreAsync(Client, buffer, token); + return buffer[..len]; + } + + private async ValueTask AutoReceiveAsync() + { + _receiveCancellationTokenSource ??= new(); + while (_receiveCancellationTokenSource is { IsCancellationRequested: false }) + { + if (Client is not { IsConnected: true }) + { + throw new InvalidOperationException($"TCP Socket is not connected {LocalEndPoint}"); + } + + using var block = MemoryPool.Shared.Rent(ReceiveBufferSize); + var buffer = block.Memory; + var len = await ReceiveCoreAsync(Client, buffer, _receiveCancellationTokenSource.Token); + if (len == 0) + { + break; + } + } + } + + private async ValueTask ReceiveCoreAsync(ISocketClient client, Memory buffer, CancellationToken token) + { + var len = 0; + try + { + var receiveToken = token; + if (ReceiveTimeout > 0) + { + // 设置接收超时时间 + var receiveTokenSource = new CancellationTokenSource(ReceiveTimeout); + receiveToken = CancellationTokenSource.CreateLinkedTokenSource(receiveToken, receiveTokenSource.Token).Token; + } + + len = await client.ReceiveAsync(buffer, receiveToken); + if (len == 0) + { + // 远端主机关闭链路 + Log(LogLevel.Information, null, $"TCP Socket {_localEndPoint} received 0 data closed by {_remoteEndPoint}"); + } + else + { + buffer = buffer[..len]; + + if (ReceivedCallBack != null) + { + await ReceivedCallBack(buffer); + } + + if (DataPackageHandler != null) + { + await DataPackageHandler.ReceiveAsync(buffer, receiveToken); + } + len = buffer.Length; + } + } + catch (OperationCanceledException ex) + { + Log(LogLevel.Warning, ex, token.IsCancellationRequested + ? $"TCP Socket receive operation canceled from {_localEndPoint} to {_remoteEndPoint}" + : $"TCP Socket receive operation timed out from {_localEndPoint} to {_remoteEndPoint}"); + } + catch (Exception ex) + { + Log(LogLevel.Error, ex, $"TCP Socket receive failed from {_localEndPoint} to {_remoteEndPoint}"); + } + return len; + } + + /// + /// Logs a message with the specified log level, exception, and additional context. + /// + protected void Log(LogLevel logLevel, Exception? ex, string? message) + { + Logger?.Log(logLevel, ex, "{Message}", message); + } /// /// @@ -108,9 +305,23 @@ public virtual ValueTask CloseAsync() /// unmanaged resources. Override this method in a derived class to provide custom cleanup logic. /// to release both managed and unmanaged resources; to release only /// unmanaged resources. - protected virtual ValueTask DisposeAsync(bool disposing) + protected virtual async ValueTask DisposeAsync(bool disposing) { - return ValueTask.CompletedTask; + if (disposing) + { + // 取消接收数据的任务 + if (_receiveCancellationTokenSource != null) + { + _receiveCancellationTokenSource.Cancel(); + _receiveCancellationTokenSource.Dispose(); + _receiveCancellationTokenSource = null; + } + + if (Client != null) + { + await Client.CloseAsync(); + } + } } /// diff --git a/test/UnitTest/Services/DefaultSocketClientTest.cs b/test/UnitTest/Services/DefaultSocketClientTest.cs new file mode 100644 index 00000000000..4fcdf0b5c96 --- /dev/null +++ b/test/UnitTest/Services/DefaultSocketClientTest.cs @@ -0,0 +1,231 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License +// See the LICENSE file in the project root for more information. +// Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone + +using Microsoft.Extensions.Logging; +using System.Net; +using System.Net.Sockets; +using System.Reflection; + +namespace UnitTest.Services; + +public class DefaultSocketClientTest +{ + [Fact] + public void Logger_Null() + { + // 测试 Logger 为 null 的情况 + var client = CreateClient(); + var baseType = client.GetType().BaseType; + Assert.NotNull(baseType); + + // 获取 Logger 字段设置为 null 测试 Log 不会抛出异常 + var propertyInfo = baseType.GetProperty("Logger", BindingFlags.Public | BindingFlags.Instance); + Assert.NotNull(propertyInfo); + + propertyInfo.SetValue(client, null); + + var methodInfo = baseType.GetMethod("Log", BindingFlags.NonPublic | BindingFlags.Instance); + Assert.NotNull(methodInfo); + methodInfo.Invoke(client, [LogLevel.Information, null!, "Test log message"]); + } + + [Fact] + public async Task DefaultSocketClient_Ok() + { + var port = 8894; + var server = StartTcpServer(port, MockDelimiterPackageAsync); + var client = CreateClient(); + + // 获得 Client 泛型属性 + var baseType = client.GetType().BaseType; + Assert.NotNull(baseType); + + // 建立连接 + var connect = await client.ConnectAsync("localhost", port); + Assert.True(connect); + + var propertyInfo = baseType.GetProperty("Client", BindingFlags.NonPublic | BindingFlags.Instance); + Assert.NotNull(propertyInfo); + var instance = propertyInfo.GetValue(client); + Assert.NotNull(instance); + + ISocketClient socketClient = (ISocketClient)instance; + Assert.NotNull(socketClient); + Assert.True(socketClient.IsConnected); + + await socketClient.CloseAsync(); + Assert.False(socketClient.IsConnected); + + var buffer = new byte[10]; + var len = await socketClient.ReceiveAsync(buffer); + Assert.Equal(0, len); + } + + [Fact] + public void SocketClientOptions_Ok() + { + var options = new SocketClientOptions + { + ReceiveBufferSize = 1024 * 64, + IsAutoReceive = true, + ConnectTimeout = 1000, + SendTimeout = 500, + ReceiveTimeout = 500, + LocalEndPoint = new IPEndPoint(IPAddress.Loopback, 0) + }; + Assert.Equal(1024 * 64, options.ReceiveBufferSize); + Assert.True(options.IsAutoReceive); + Assert.Equal(1000, options.ConnectTimeout); + Assert.Equal(500, options.SendTimeout); + Assert.Equal(500, options.ReceiveTimeout); + Assert.Equal(new IPEndPoint(IPAddress.Loopback, 0), options.LocalEndPoint); + } + + private static TcpListener StartTcpServer(int port, Func handler) + { + var server = new TcpListener(IPAddress.Loopback, port); + server.Start(); + Task.Run(() => AcceptClientsAsync(server, handler)); + return server; + } + + private static async Task AcceptClientsAsync(TcpListener server, Func handler) + { + while (true) + { + var client = await server.AcceptTcpClientAsync(); + _ = Task.Run(() => handler(client)); + } + } + + private static async Task MockDelimiterPackageAsync(TcpClient client) + { + using var stream = client.GetStream(); + while (true) + { + var buffer = new byte[10240]; + var len = await stream.ReadAsync(buffer); + if (len == 0) + { + break; + } + + // 回写数据到客户端 + var block = new ReadOnlyMemory(buffer, 0, len); + await stream.WriteAsync(block, CancellationToken.None); + + await Task.Delay(20); + + // 模拟拆包发送第二段数据 + await stream.WriteAsync(new byte[] { 0x13, 0x10, 0x5, 0x6, 0x13, 0x10 }, CancellationToken.None); + } + } + + private static ITcpSocketClient CreateClient() + { + var sc = new ServiceCollection(); + sc.AddLogging(builder => + { + builder.AddProvider(new MockLoggerProvider()); + }); + sc.AddBootstrapBlazorTcpSocketFactory(); + var provider = sc.BuildServiceProvider(); + var factory = provider.GetRequiredService(); + var client = factory.GetOrCreate("test", op => op.LocalEndPoint = Utility.ConvertToIpEndPoint("localhost", 0)); + return client; + } + + class MockLoggerProvider : ILoggerProvider + { + public ILogger CreateLogger(string categoryName) + { + return new MockLogger(); + } + + public void Dispose() + { + + } + } + + class MockLogger : ILogger + { + public IDisposable? BeginScope(TState state) where TState : notnull + { + return null; + } + + public bool IsEnabled(LogLevel logLevel) + { + return true; + } + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + { + + } + } + + class MockSendErrorHandler : DataPackageHandlerBase + { + public ITcpSocketClient? Socket { get; set; } + + public override ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) + { + throw new Exception("Mock send failed"); + } + } + + class MockSendCancelHandler : DataPackageHandlerBase + { + public ITcpSocketClient? Socket { get; set; } + + public override async ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) + { + if (Socket != null) + { + await Socket.CloseAsync(); + } + await Task.Delay(10, token); + return data; + } + } + + class MockReceiveErrorHandler : DataPackageHandlerBase + { + public override ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) + { + return ValueTask.FromResult(data); + } + + public override async ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default) + { + await base.ReceiveAsync(data, token); + + // 模拟接收数据时报错 + throw new InvalidOperationException("Test Error"); + } + } + + class MockSendTimeoutHandler : DataPackageHandlerBase + { + public override async ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) + { + // 模拟发送超时 + await Task.Delay(200, token); + return data; + } + } + + class MockReceiveTimeoutHandler : DataPackageHandlerBase + { + public override async ValueTask ReceiveAsync(ReadOnlyMemory data, CancellationToken token = default) + { + // 模拟接收超时 + await Task.Delay(200, token); + await base.ReceiveAsync(data, token); + } + } +} diff --git a/test/UnitTest/Services/TcpSocketFactoryTest.cs b/test/UnitTest/Services/TcpSocketFactoryTest.cs index 3eba6680e6b..e9d4b1de43a 100644 --- a/test/UnitTest/Services/TcpSocketFactoryTest.cs +++ b/test/UnitTest/Services/TcpSocketFactoryTest.cs @@ -6,6 +6,7 @@ using Microsoft.Extensions.Logging; using System.Net; using System.Net.Sockets; +using System.Reflection; using System.Text; namespace UnitTest.Services; @@ -24,17 +25,17 @@ public async Task GetOrCreate_Ok() sc.AddBootstrapBlazorTcpSocketFactory(); var provider = sc.BuildServiceProvider(); var factory = provider.GetRequiredService(); - var client1 = factory.GetOrCreate("demo", key => Utility.ConvertToIpEndPoint("localhost", 0)); + var client1 = factory.GetOrCreate("demo", op => op.LocalEndPoint = Utility.ConvertToIpEndPoint("localhost", 0)); await client1.CloseAsync(); - var client2 = factory.GetOrCreate("demo", key => Utility.ConvertToIpEndPoint("localhost", 0)); + var client2 = factory.GetOrCreate("demo", op => op.LocalEndPoint = Utility.ConvertToIpEndPoint("localhost", 0)); Assert.Equal(client1, client2); var ip = Dns.GetHostAddresses(Dns.GetHostName(), AddressFamily.InterNetwork).FirstOrDefault() ?? IPAddress.Loopback; - var client3 = factory.GetOrCreate("demo1", key => Utility.ConvertToIpEndPoint(ip.ToString(), 0)); + var client3 = factory.GetOrCreate("demo1", op => op.LocalEndPoint = Utility.ConvertToIpEndPoint(ip.ToString(), 0)); // 测试不合格 IP 地址 - var client4 = factory.GetOrCreate("demo2", key => Utility.ConvertToIpEndPoint("256.0.0.1", 0)); + var client4 = factory.GetOrCreate("demo2", op => op.LocalEndPoint = Utility.ConvertToIpEndPoint("256.0.0.1", 0)); var client5 = factory.Remove("demo2"); Assert.Equal(client4, client5); @@ -48,7 +49,7 @@ public async Task GetOrCreate_Ok() public async Task ConnectAsync_Timeout() { var client = CreateClient(); - client.ConnectTimeout = 1000; + client.ConnectTimeout = 100; var connect = await client.ConnectAsync("localhost", 9999); Assert.False(connect); @@ -102,6 +103,17 @@ public async Task SendAsync_Error() var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); var ex = await Assert.ThrowsAsync(async () => await client.SendAsync(data)); Assert.NotNull(ex); + + // 测试发送失败 + var port = 8892; + var server = StartTcpServer(port, MockSplitPackageAsync); + + client.SetDataHandler(new MockSendErrorHandler()); + await client.ConnectAsync("localhost", port); + Assert.True(client.IsConnected); + + // 内部生成异常日志 + await client.SendAsync(data); } [Fact] @@ -126,7 +138,7 @@ public async Task SendAsync_Cancel() // 设置延时发送适配器 // 延时发送期间关闭 Socket 连接导致内部报错 - client.SetDataHandler(new MockSendErrorHandler() + client.SetDataHandler(new MockSendCancelHandler() { Socket = client }); @@ -179,7 +191,10 @@ public async Task ReceiveAsync_Cancel() await client.SendAsync(data); // 通过反射取消令牌 - var fieldInfo = client.GetType().GetField("_receiveCancellationTokenSource", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + var baseType = client.GetType().BaseType; + Assert.NotNull(baseType); + + var fieldInfo = baseType.GetField("_receiveCancellationTokenSource", BindingFlags.NonPublic | BindingFlags.Instance); Assert.NotNull(fieldInfo); var tokenSource = fieldInfo.GetValue(client) as CancellationTokenSource; Assert.NotNull(tokenSource); @@ -190,36 +205,21 @@ public async Task ReceiveAsync_Cancel() [Fact] public async Task ReceiveAsync_InvalidOperationException() { - var port = 8890; - var server = StartTcpServer(port, MockSplitPackageAsync); - + // 未连接时调用 ReceiveAsync 方法会抛出 InvalidOperationException 异常 var client = CreateClient(); - - //未连接 var ex = await Assert.ThrowsAsync(async () => await client.ReceiveAsync()); Assert.NotNull(ex); - // 反射给 _client 赋值但是未连接 - var fieldInfo = client.GetType().GetField("_client", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); - Assert.NotNull(fieldInfo); - fieldInfo.SetValue(client, new TcpClient()); - ex = null; - ex = await Assert.ThrowsAsync(async () => await client.ReceiveAsync()); - Assert.NotNull(ex); - - await client.ConnectAsync("localhost", port); - ex = null; - ex = await Assert.ThrowsAsync(async () => await client.ReceiveAsync()); + // 已连接但是启用了自动接收功能时调用 ReceiveAsync 方法会抛出 InvalidOperationException 异常 + var port = 8893; + var server = StartTcpServer(port, MockSplitPackageAsync); - await client.CloseAsync(); - client.IsAutoReceive = false; + client.IsAutoReceive = true; var connected = await client.ConnectAsync("localhost", port); Assert.True(connected); - var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); - await client.SendAsync(data); - var payload = await client.ReceiveAsync(); - Assert.Equal(payload.ToArray(), [1, 2, 3, 4, 5]); + ex = await Assert.ThrowsAsync(async () => await client.ReceiveAsync()); + Assert.NotNull(ex); } [Fact] @@ -230,9 +230,13 @@ public async Task ReceiveAsync_Ok() var client = CreateClient(); client.IsAutoReceive = false; - await client.ConnectAsync("localhost", port); + var connected = await client.ConnectAsync("localhost", port); + Assert.True(connected); + var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); - await client.SendAsync(data); + var send = await client.SendAsync(data); + Assert.True(send); + var payload = await client.ReceiveAsync(); Assert.Equal(payload.ToArray(), [1, 2, 3, 4, 5]); } @@ -243,7 +247,10 @@ public async Task ReceiveAsync_Error() var client = CreateClient(); // 测试未建立连接前调用 ReceiveAsync 方法报异常逻辑 - var methodInfo = client.GetType().GetMethod("AutoReceiveAsync", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + var baseType = client.GetType().BaseType; + Assert.NotNull(baseType); + + var methodInfo = baseType.GetMethod("AutoReceiveAsync", BindingFlags.NonPublic | BindingFlags.Instance); Assert.NotNull(methodInfo); var task = (ValueTask)methodInfo.Invoke(client, null)!; @@ -567,7 +574,7 @@ private static ITcpSocketClient CreateClient() sc.AddBootstrapBlazorTcpSocketFactory(); var provider = sc.BuildServiceProvider(); var factory = provider.GetRequiredService(); - var client = factory.GetOrCreate("test", key => Utility.ConvertToIpEndPoint("localhost", 0)); + var client = factory.GetOrCreate("test", op => op.LocalEndPoint = Utility.ConvertToIpEndPoint("localhost", 0)); return client; } @@ -606,6 +613,16 @@ class MockSendErrorHandler : DataPackageHandlerBase { public ITcpSocketClient? Socket { get; set; } + public override ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) + { + throw new Exception("Mock send failed"); + } + } + + class MockSendCancelHandler : DataPackageHandlerBase + { + public ITcpSocketClient? Socket { get; set; } + public override async ValueTask> SendAsync(ReadOnlyMemory data, CancellationToken token = default) { if (Socket != null)