diff --git a/src/BootstrapBlazor.Server/Components/Samples/Sockets/Adapters.razor b/src/BootstrapBlazor.Server/Components/Samples/Sockets/Adapters.razor index 8bf08597bda..e1402fcef03 100644 --- a/src/BootstrapBlazor.Server/Components/Samples/Sockets/Adapters.razor +++ b/src/BootstrapBlazor.Server/Components/Samples/Sockets/Adapters.razor @@ -39,7 +39,11 @@
  • 响应头为 4 字节定长,响应体为 8 个字节定长
  • 响应体为字符串类型数据
  • -

    本示例服务器端模拟了数据分包即响应数据实际是两次写入所以实际接收端是要通过两次接收才能得到一个完整的响应数据包,可通过 数据适配器 来简化接收逻辑。通过切换下方 是否使用数据适配器 控制开关进行测试查看实际数据接收情况

    +

    本示例服务器端模拟了数据分包即响应数据实际是两次写入所以实际接收端是要通过两次接收才能得到一个完整的响应数据包,可通过 数据适配器 来简化接收逻辑。通过切换下方 是否使用数据适配器 控制开关进行测试查看实际数据接收情况。

    +
    private readonly DataPackageAdapter _dataAdapter = new()
     {
         // 数据适配器内部使用固定长度数据处理器
    diff --git a/src/BootstrapBlazor.Server/Components/Samples/Sockets/AutoReconnects.razor b/src/BootstrapBlazor.Server/Components/Samples/Sockets/AutoReconnects.razor
    new file mode 100644
    index 00000000000..f9e78a9d3b1
    --- /dev/null
    +++ b/src/BootstrapBlazor.Server/Components/Samples/Sockets/AutoReconnects.razor
    @@ -0,0 +1,39 @@
    +@page "/socket/auto-connect"
    +@inject IStringLocalizer Localizer
    +
    +

    @Localizer["AutoReconnectsTitle"]

    +

    @Localizer["AutoReconnectsDescription"]

    + + + + +

    本例中模拟自动重连的业务场景,在实际应用中我们可能建立的链路可能由于种种原因断开,所以就有自动重连的业务需求

    +

    例如:我们与一个远端节点建立连接后,不停地接收远端发送过来的数据,如果断开连接后需要自动重连后继续接收数据

    +

    通过 SocketClientOptions 配置类来开启本功能

    +
    var client = factory.GetOrCreate("demo-reconnect", op =>
    +{
    +    op.LocalEndPoint = Utility.ConvertToIpEndPoint("localhost", 0);
    +    options.IsAutoReconnect = true;
    +    options.ReconnectInterval = 5000;
    +});
    +

    参数说明:

    +
      +
    • IsAutoReconnect 是否开启自动重连功能
    • +
    • ReconnectInterval 自动重连等待间隔 默认 5000 毫秒
    • +
    +

    本例中点击 连接 按钮后程序连接到一个发送数据后自动关闭的模拟服务端,通过输出日志查看运行情况,点击 断开 按钮后程序停止自动重连

    +
    +
    + + +
    +
    + +
    +
    +
    diff --git a/src/BootstrapBlazor.Server/Components/Samples/Sockets/AutoReconnects.razor.cs b/src/BootstrapBlazor.Server/Components/Samples/Sockets/AutoReconnects.razor.cs new file mode 100644 index 00000000000..4525134c0fc --- /dev/null +++ b/src/BootstrapBlazor.Server/Components/Samples/Sockets/AutoReconnects.razor.cs @@ -0,0 +1,109 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License +// See the LICENSE file in the project root for more information. +// Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone + +using System.Net; + +namespace BootstrapBlazor.Server.Components.Samples.Sockets; + +/// +/// 自动重连示例组件 +/// +public partial class AutoReconnects : IDisposable +{ + [Inject, NotNull] + private ITcpSocketFactory? TcpSocketFactory { get; set; } + + private ITcpSocketClient _client = null!; + + private List _items = []; + + private readonly IPEndPoint _serverEndPoint = new(IPAddress.Loopback, 8901); + + /// + /// + /// + protected override void OnInitialized() + { + base.OnInitialized(); + + // 从服务中获取 Socket 实例 + _client = TcpSocketFactory.GetOrCreate("demo-auto-connect", options => + { + options.LocalEndPoint = new IPEndPoint(IPAddress.Loopback, 0); + options.IsAutoReconnect = true; + options.ReconnectInterval = 5000; + }); + _client.ReceivedCallBack += OnReceivedAsync; + _client.OnConnecting = async () => + { + _items.Add(new ConsoleMessageItem { Message = $"{DateTime.Now} 正在连接到 {_serverEndPoint},请稍候..." }); + await InvokeAsync(StateHasChanged); + }; + _client.OnConnected = async () => + { + _items.Add(new ConsoleMessageItem { Message = $"{DateTime.Now} 已连接到 {_serverEndPoint},等待接收数据", Color = Color.Success }); + await InvokeAsync(StateHasChanged); + }; + } + + private async Task OnConnectAsync() + { + if (_client is { IsConnected: false }) + { + await _client.ConnectAsync(_serverEndPoint, CancellationToken.None); + } + } + + private async Task OnCloseAsync() + { + if (_client is { IsConnected: true }) + { + await _client.CloseAsync(); + } + } + + private Task OnClear() + { + _items = []; + return Task.CompletedTask; + } + + private async ValueTask OnReceivedAsync(ReadOnlyMemory data) + { + // 将数据显示为十六进制字符串 + var payload = System.Text.Encoding.UTF8.GetString(data.Span); + _items.Add(data.IsEmpty + ? new ConsoleMessageItem { Message = $"{DateTime.Now} 当前连接已关闭,5s 后自动重连", Color = Color.Danger } + : new ConsoleMessageItem { Message = $"{DateTime.Now} 接收到来自站点的数据为 {payload}" }); + + // 保持队列中最大数量为 50 + while (_items.Count > 50) + { + _items.RemoveAt(0); + } + + await InvokeAsync(StateHasChanged); + } + + private void Dispose(bool disposing) + { + if (disposing) + { + if (_client is { IsConnected: true }) + { + _client.ReceivedCallBack -= OnReceivedAsync; + } + } + } + + /// + /// + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } +} diff --git a/src/BootstrapBlazor.Server/Extensions/MenusLocalizerExtensions.cs b/src/BootstrapBlazor.Server/Extensions/MenusLocalizerExtensions.cs index f145b2ccf70..719494a3aa6 100644 --- a/src/BootstrapBlazor.Server/Extensions/MenusLocalizerExtensions.cs +++ b/src/BootstrapBlazor.Server/Extensions/MenusLocalizerExtensions.cs @@ -224,6 +224,12 @@ void AddSocket(DemoMenuItem item) IsNew = true, Text = Localizer["DataPackageAdapter"], Url = "socket/adapter" + }, + new() + { + IsNew = true, + Text = Localizer["SocketAutoConnect"], + Url = "socket/auto-connect" } }; AddBadge(item, count: 1); diff --git a/src/BootstrapBlazor.Server/Extensions/ServiceCollectionExtensions.cs b/src/BootstrapBlazor.Server/Extensions/ServiceCollectionExtensions.cs index 5f4aba8bc95..bb1daacc3a7 100644 --- a/src/BootstrapBlazor.Server/Extensions/ServiceCollectionExtensions.cs +++ b/src/BootstrapBlazor.Server/Extensions/ServiceCollectionExtensions.cs @@ -48,6 +48,7 @@ void Invoke(BootstrapBlazorOptions option) services.AddHostedService(); services.AddHostedService(); services.AddHostedService(); + services.AddHostedService(); // 增加通用服务 services.AddBootstrapBlazorServices(); diff --git a/src/BootstrapBlazor.Server/Locales/en-US.json b/src/BootstrapBlazor.Server/Locales/en-US.json index d23e649bcd0..6a2429b427d 100644 --- a/src/BootstrapBlazor.Server/Locales/en-US.json +++ b/src/BootstrapBlazor.Server/Locales/en-US.json @@ -4834,7 +4834,8 @@ "SocketComponents": "ITcpSocketFactory", "SocketAutoReceive": "Auto Receive", "SocketManualReceive": "Manual Receive", - "DataPackageAdapter": "DataPackageAdapter" + "DataPackageAdapter": "DataPackageAdapter", + "SocketAutoConnect": "Reconnect" }, "BootstrapBlazor.Server.Components.Samples.Table.TablesHeader": { "TablesHeaderTitle": "Header grouping function", @@ -7107,5 +7108,11 @@ "AdaptersDescription": "Receive data through the data adapter and display", "NormalTitle": "Basic usage", "NormalIntro": "After the connection is established, the timestamp data sent by the server is received through the ReceivedCallBack callback method of the DataPackageAdapter data adapter." + }, + "BootstrapBlazor.Server.Components.Samples.Sockets.AutoReconnects": { + "AutoReconnectsTitle": "DataPackageAdapter", + "AutoReconnectsDescription": "Receive data through the data adapter and display", + "NormalTitle": "Basic usage", + "NormalIntro": "Enable automatic reconnection by setting IsAutoReconnect" } } diff --git a/src/BootstrapBlazor.Server/Locales/zh-CN.json b/src/BootstrapBlazor.Server/Locales/zh-CN.json index 8dae547ab56..61a1fe407f7 100644 --- a/src/BootstrapBlazor.Server/Locales/zh-CN.json +++ b/src/BootstrapBlazor.Server/Locales/zh-CN.json @@ -4834,7 +4834,8 @@ "SocketComponents": "Socket 服务", "SocketAutoReceive": "自动接收数据", "SocketManualReceive": "手动接收数据", - "DataPackageAdapter": "数据处理器" + "DataPackageAdapter": "数据处理器", + "SocketAutoConnect": "自动重连" }, "BootstrapBlazor.Server.Components.Samples.Table.TablesHeader": { "TablesHeaderTitle": "表头分组功能", @@ -7107,5 +7108,11 @@ "AdaptersDescription": "通过数据适配器接收数据并且显示", "NormalTitle": "基本用法", "NormalIntro": "连接后通过 DataPackageAdapter 数据适配器的 ReceivedCallBack 回调方法接收服务端发送来的时间戳数据" + }, + "BootstrapBlazor.Server.Components.Samples.Sockets.AutoReconnects": { + "AutoReconnectsTitle": "Socket 自动重连示例", + "AutoReconnectsDescription": "链路断开后自动重连示例", + "NormalTitle": "基本用法", + "NormalIntro": "通过设置 IsAutoReconnect 开启自动重连机制" } } diff --git a/src/BootstrapBlazor.Server/Services/MockDisconnectService.cs b/src/BootstrapBlazor.Server/Services/MockDisconnectService.cs new file mode 100644 index 00000000000..ae26367c8db --- /dev/null +++ b/src/BootstrapBlazor.Server/Services/MockDisconnectService.cs @@ -0,0 +1,64 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the Apache 2.0 License +// See the LICENSE file in the project root for more information. +// Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone + +using System.Net; +using System.Net.Sockets; +using System.Text; + +namespace Longbow.Tasks.Services; + +/// +/// 模拟 Socket 自动断开服务端服务类 +/// +internal class MockDisconnectServerService(ILogger logger) : BackgroundService +{ + /// + /// 运行任务 + /// + /// + /// + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var server = new TcpListener(IPAddress.Loopback, 8901); + server.Start(); + while (stoppingToken is { IsCancellationRequested: false }) + { + try + { + var client = await server.AcceptTcpClientAsync(stoppingToken); + _ = Task.Run(() => OnDataHandlerAsync(client, stoppingToken), stoppingToken); + } + catch { } + } + } + + private async Task OnDataHandlerAsync(TcpClient client, CancellationToken stoppingToken) + { + // 方法目的: + // 收到消息后发送自定义通讯协议的响应数据 + // 响应头 + 响应体 + await using var stream = client.GetStream(); + while (stoppingToken is { IsCancellationRequested: false }) + { + try + { + // 发送数据 + await stream.WriteAsync(Encoding.UTF8.GetBytes(DateTime.Now.ToString("yyyyMMddHHmmss")), stoppingToken); + await Task.Delay(2000, stoppingToken); + + // 主动关闭连接 + client.Close(); + } + catch (OperationCanceledException) { break; } + catch (IOException) { break; } + catch (SocketException) { break; } + catch (Exception ex) + { + logger.LogError(ex, "MockDisconnectServerService encountered an error while sending data."); + break; + } + } + } +} diff --git a/src/BootstrapBlazor.Server/docs.json b/src/BootstrapBlazor.Server/docs.json index 662d1eaed1c..f0824c531be 100644 --- a/src/BootstrapBlazor.Server/docs.json +++ b/src/BootstrapBlazor.Server/docs.json @@ -245,7 +245,8 @@ "office-viewer": "OfficeViewers", "socket/manual-receive": "Sockets\\ManualReceives", "socket/auto-receive": "Sockets\\AutoReceives", - "socket/adapter": "Sockets\\Adapters" + "socket/adapter": "Sockets\\Adapters", + "socket/auto-connect": "Sockets\\AutoReconnects" }, "video": { "table": "BV1ap4y1x7Qn?p=1", diff --git a/src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClientProvider.cs b/src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClientProvider.cs index 74b36c32e22..b07cca71607 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClientProvider.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DefaultSocketClientProvider.cs @@ -69,6 +69,11 @@ public async ValueTask ReceiveAsync(Memory buffer, CancellationToken { var stream = _client.GetStream(); len = await stream.ReadAsync(buffer, token).ConfigureAwait(false); + + if (len == 0) + { + _client.Close(); + } } return len; } diff --git a/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs b/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs index 5d552b471a7..dbbb863d05d 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/DefaultTcpSocketClient.cs @@ -49,6 +49,16 @@ class DefaultTcpSocketClient(SocketClientOptions options) : ITcpSocketClient /// public Func, ValueTask>? ReceivedCallBack { get; set; } + /// + /// + /// + public Func? OnConnecting { get; set; } + + /// + /// + /// + public Func? OnConnected { get; set; } + private IPEndPoint? _remoteEndPoint; private IPEndPoint? _localEndPoint; private CancellationTokenSource? _receiveCancellationTokenSource; @@ -94,7 +104,15 @@ public async ValueTask ConnectAsync(IPEndPoint endPoint, CancellationToken try { + if (OnConnecting != null) + { + await OnConnecting(); + } ret = await ConnectCoreAsync(SocketClientProvider, endPoint, connectionToken); + if (OnConnected != null) + { + await OnConnected(); + } } catch (OperationCanceledException ex) { @@ -116,23 +134,26 @@ public async ValueTask ConnectAsync(IPEndPoint endPoint, CancellationToken // 释放信号量 _semaphoreSlim.Release(); - if (!ret && reconnect) + if (reconnect) { - Reconnect(); - } + _autoConnectTokenSource = new(); + if (!ret) + { + Reconnect(); + } + } return ret; } private void Reconnect() { - if (options.IsAutoReconnect && _remoteEndPoint != null) + if (_autoConnectTokenSource != null && options.IsAutoReconnect && _remoteEndPoint != null) { Task.Run(async () => { try { - _autoConnectTokenSource ??= new(); await Task.Delay(options.ReconnectInterval, _autoConnectTokenSource.Token).ConfigureAwait(false); await ConnectAsync(_remoteEndPoint, _autoConnectTokenSource.Token).ConfigureAwait(false); } @@ -144,7 +165,7 @@ private void Reconnect() private async ValueTask ConnectCoreAsync(ISocketClientProvider provider, IPEndPoint endPoint, CancellationToken token) { // 释放资源 - await CloseAsync(); + await CloseCoreAsync(); // 创建新的 TcpClient 实例 provider.LocalEndPoint = Options.LocalEndPoint; @@ -221,10 +242,7 @@ public async ValueTask SendAsync(ReadOnlyMemory data, CancellationTo Log(LogLevel.Error, ex, $"TCP Socket send failed from {_localEndPoint} to {_remoteEndPoint}"); } - if (options.EnableLog) - { - Log(LogLevel.Information, null, $"Sending data from {_localEndPoint} to {_remoteEndPoint}, Data Length: {data.Length} Data Content: {BitConverter.ToString(data.ToArray())} Result: {ret}"); - } + Log(LogLevel.Information, null, $"Sending data from {_localEndPoint} to {_remoteEndPoint}, Data Length: {data.Length} Data Content: {BitConverter.ToString(data.ToArray())} Result: {ret}"); if (!ret && reconnect) { @@ -334,10 +352,7 @@ private async ValueTask ReceiveCoreAsync(ISocketClientProvider client, Memo Log(LogLevel.Error, ex, $"TCP Socket receive failed from {_localEndPoint} to {_remoteEndPoint}"); } - if (options.EnableLog) - { - Log(LogLevel.Information, null, $"Receiving data from {_localEndPoint} to {_remoteEndPoint}, Data Length: {len} Data Content: {BitConverter.ToString(buffer.ToArray())}"); - } + Log(LogLevel.Information, null, $"Receiving data from {_localEndPoint} to {_remoteEndPoint}, Data Length: {len} Data Content: {BitConverter.ToString(buffer.ToArray())}"); if (len == 0 && reconnect) { @@ -352,14 +367,30 @@ private async ValueTask ReceiveCoreAsync(ISocketClientProvider client, Memo /// private void Log(LogLevel logLevel, Exception? ex, string? message) { - Logger ??= ServiceProvider?.GetRequiredService>(); - Logger?.Log(logLevel, ex, "{Message}", message); + if (options.EnableLog) + { + Logger ??= ServiceProvider?.GetRequiredService>(); + Logger?.Log(logLevel, ex, "{Message}", message); + } } /// /// /// public async ValueTask CloseAsync() + { + // 取消重连任务 + if (_autoConnectTokenSource != null) + { + _autoConnectTokenSource.Cancel(); + _autoConnectTokenSource.Dispose(); + _autoConnectTokenSource = null; + } + + await CloseCoreAsync(); + } + + private async ValueTask CloseCoreAsync() { // 取消接收数据的任务 if (_receiveCancellationTokenSource != null) @@ -368,6 +399,7 @@ public async ValueTask CloseAsync() _receiveCancellationTokenSource.Dispose(); _receiveCancellationTokenSource = null; } + if (SocketClientProvider != null) { await SocketClientProvider.CloseAsync(); @@ -386,14 +418,6 @@ private async ValueTask DisposeAsync(bool disposing) { if (disposing) { - // 取消重连任务 - if (_autoConnectTokenSource != null) - { - _autoConnectTokenSource.Cancel(); - _autoConnectTokenSource.Dispose(); - _autoConnectTokenSource = null; - } - await CloseAsync(); } } diff --git a/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs b/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs index c764661c93b..ae965fb1a63 100644 --- a/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs +++ b/src/BootstrapBlazor/Services/TcpSocket/ITcpSocketClient.cs @@ -38,6 +38,16 @@ public interface ITcpSocketClient : IAsyncDisposable /// impact performance. Func, ValueTask>? ReceivedCallBack { get; set; } + /// + /// Gets or sets the callback function that is invoked when a connection attempt is initiated. + /// + Func? OnConnecting { get; set; } + + /// + /// Gets or sets the delegate to be invoked when a connection is successfully established. + /// + Func? OnConnected { get; set; } + /// /// Establishes an asynchronous connection to the specified endpoint. /// diff --git a/test/UnitTest/Services/DefaultSocketClientProviderTest.cs b/test/UnitTest/Services/DefaultSocketClientProviderTest.cs index 8e77c51a936..51918355063 100644 --- a/test/UnitTest/Services/DefaultSocketClientProviderTest.cs +++ b/test/UnitTest/Services/DefaultSocketClientProviderTest.cs @@ -3,7 +3,9 @@ // See the LICENSE file in the project root for more information. // Maintainer: Argo Zhang(argo@live.ca) Website: https://www.blazor.zone +using Microsoft.Extensions.Logging; using System.Net; +using System.Net.Sockets; namespace UnitTest.Services; @@ -26,6 +28,35 @@ public async Task DefaultSocketClient_Ok() Assert.Equal(0, len); } + [Fact] + public async Task ReceiveAsync_Ok() + { + var port = 8100; + // 测试接收数据时服务器断开未连接的情况 + StartTcpServer(port); + + var sc = new ServiceCollection(); + sc.AddBootstrapBlazorTcpSocketFactory(); + var provider = sc.BuildServiceProvider(); + var factory = provider.GetRequiredService(); + var client = factory.GetOrCreate("provider", op => + { + op.LocalEndPoint = Utility.ConvertToIpEndPoint("localhost", 0); + op.IsAutoReceive = false; + op.EnableLog = false; + }); + + await client.ConnectAsync("127.0.0.1", port); + Assert.True(client.IsConnected); + + var buffer = await client.ReceiveAsync(); + Assert.Equal(2, buffer.Length); + + await Task.Delay(50); + buffer = await client.ReceiveAsync(); + Assert.False(client.IsConnected); + } + [Fact] public void SocketClientOptions_Ok() { @@ -45,4 +76,35 @@ public void SocketClientOptions_Ok() Assert.Equal(500, options.ReceiveTimeout); Assert.Equal(new IPEndPoint(IPAddress.Loopback, 0), options.LocalEndPoint); } + + private static TcpListener StartTcpServer(int port) + { + var server = new TcpListener(IPAddress.Loopback, port); + server.Start(); + Task.Run(() => AcceptClientsAsync(server)); + return server; + } + + private static async Task AcceptClientsAsync(TcpListener server) + { + while (true) + { + var client = await server.AcceptTcpClientAsync(); + _ = Task.Run(async () => + { + using var stream = client.GetStream(); + while (true) + { + var buffer = new byte[1024]; + + // 模拟拆包发送第二段数据 + await stream.WriteAsync(new byte[] { 0x3, 0x4 }, CancellationToken.None); + + // 等待 20ms + await Task.Delay(20); + client.Close(); + } + }); + } + } } diff --git a/test/UnitTest/Services/TcpSocketFactoryTest.cs b/test/UnitTest/Services/TcpSocketFactoryTest.cs index b18f2f74494..9a07d464d3d 100644 --- a/test/UnitTest/Services/TcpSocketFactoryTest.cs +++ b/test/UnitTest/Services/TcpSocketFactoryTest.cs @@ -274,13 +274,27 @@ public async Task ReceiveAsync_InvalidOperationException() [Fact] public async Task ReceiveAsync_Ok() { + var onConnecting = false; + var onConnected = false; var port = 8891; var server = StartTcpServer(port, MockSplitPackageAsync); var client = CreateClient(); client.Options.IsAutoReceive = false; + client.OnConnecting = () => + { + onConnecting = true; + return Task.CompletedTask; + }; + client.OnConnected = () => + { + onConnected = true; + return Task.CompletedTask; + }; var connected = await client.ConnectAsync("localhost", port); Assert.True(connected); + Assert.True(onConnecting); + Assert.True(onConnected); var data = new ReadOnlyMemory([1, 2, 3, 4, 5]); var send = await client.SendAsync(data);