From 897c551d8cb9ef0e6d47380f4a0460c108730b31 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=99=88=E5=9B=BD=E4=BC=9F?= <366193849@qq.com>
Date: Thu, 7 Nov 2024 21:37:36 +0800
Subject: [PATCH 01/85] Use IConnectionFactory to create ConnectionContext to
replace SocketConnection
---
Samples/MQTTnet.Samples.csproj | 2 +-
.../MQTTnet.AspTestApp.csproj | 2 +-
.../MQTTnet.AspNetCore.csproj | 2 +-
.../MqttClientConnectionContextFactory.cs | 39 ++-
.../MqttConnectionContext.cs | 38 +--
.../MqttConnectionHandler.cs | 10 +-
.../ServiceCollectionExtensions.cs | 20 +-
Source/MQTTnet.AspnetCore/SocketAwaitable.cs | 77 ------
Source/MQTTnet.AspnetCore/SocketConnection.cs | 261 ------------------
Source/MQTTnet.AspnetCore/SocketReceiver.cs | 36 ---
Source/MQTTnet.AspnetCore/SocketSender.cs | 93 -------
.../MQTTnet.Benchmarks/AsyncLockBenchmark.cs | 2 +-
Source/MQTTnet.Benchmarks/LoggerBenchmark.cs | 2 +-
.../MQTTnet.Benchmarks.csproj | 2 +-
.../MQTTnet.Benchmarks/MemoryCopyBenchmark.cs | 2 +-
.../MessageProcessingBenchmark.cs | 2 +-
...rocessingMqttConnectionContextBenchmark.cs | 23 +-
.../MqttBufferReaderBenchmark.cs | 2 +-
.../MqttPacketReaderWriterBenchmark.cs | 2 +-
.../MqttTcpChannelBenchmark.cs | 2 +-
.../ReaderExtensionsBenchmark.cs | 2 +-
.../RoundtripProcessingBenchmark.cs | 2 +-
.../SendPacketAsyncBenchmark.cs | 2 +-
.../MQTTnet.Benchmarks/SerializerBenchmark.cs | 2 +-
.../ServerProcessingBenchmark.cs | 2 +-
.../MQTTnet.Benchmarks/TcpPipesBenchmark.cs | 35 ++-
.../TopicFilterComparerBenchmark.cs | 2 +-
.../MQTTnet.Extensions.Rpc.csproj | 2 +-
Source/MQTTnet.Server/MQTTnet.Server.csproj | 2 +-
Source/MQTTnet.TestApp/MQTTnet.TestApp.csproj | 2 +-
Source/MQTTnet.Tests/MQTTnet.Tests.csproj | 2 +-
.../Adapter/IMqttClientAdapterFactory.cs | 3 +-
.../MqttClientAdapterFactory.cs | 10 +-
.../LowLevelClient/LowLevelMqttClient.cs | 2 +-
Source/MQTTnet/MQTTnet.csproj | 2 +-
Source/MQTTnet/MqttClient.cs | 114 ++++----
36 files changed, 199 insertions(+), 606 deletions(-)
delete mode 100644 Source/MQTTnet.AspnetCore/SocketAwaitable.cs
delete mode 100644 Source/MQTTnet.AspnetCore/SocketConnection.cs
delete mode 100644 Source/MQTTnet.AspnetCore/SocketReceiver.cs
delete mode 100644 Source/MQTTnet.AspnetCore/SocketSender.cs
diff --git a/Samples/MQTTnet.Samples.csproj b/Samples/MQTTnet.Samples.csproj
index 5fe84d380..2441f1c9c 100644
--- a/Samples/MQTTnet.Samples.csproj
+++ b/Samples/MQTTnet.Samples.csproj
@@ -14,7 +14,7 @@
all
true
low
- latest-Recommended
+
diff --git a/Source/MQTTnet.AspTestApp/MQTTnet.AspTestApp.csproj b/Source/MQTTnet.AspTestApp/MQTTnet.AspTestApp.csproj
index 254069b60..ede29a49f 100644
--- a/Source/MQTTnet.AspTestApp/MQTTnet.AspTestApp.csproj
+++ b/Source/MQTTnet.AspTestApp/MQTTnet.AspTestApp.csproj
@@ -13,7 +13,7 @@
all
true
low
- latest-Recommended
+
diff --git a/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj b/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj
index 5357dd702..6b7f60ce2 100644
--- a/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj
+++ b/Source/MQTTnet.AspnetCore/MQTTnet.AspNetCore.csproj
@@ -37,7 +37,7 @@
true
low
low
- latest-Recommended
+
diff --git a/Source/MQTTnet.AspnetCore/MqttClientConnectionContextFactory.cs b/Source/MQTTnet.AspnetCore/MqttClientConnectionContextFactory.cs
index 0ddbbd8f1..a8405cefa 100644
--- a/Source/MQTTnet.AspnetCore/MqttClientConnectionContextFactory.cs
+++ b/Source/MQTTnet.AspnetCore/MqttClientConnectionContextFactory.cs
@@ -2,16 +2,28 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using System;
+using Microsoft.AspNetCore.Connections;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics.Logger;
using MQTTnet.Formatter;
+using System;
+using System.Linq;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading.Tasks;
namespace MQTTnet.AspNetCore
{
public sealed class MqttClientConnectionContextFactory : IMqttClientAdapterFactory
{
- public IMqttChannelAdapter CreateClientAdapter(MqttClientOptions options, MqttPacketInspector packetInspector, IMqttNetLogger logger)
+ private readonly IConnectionFactory connectionFactory;
+
+ public MqttClientConnectionContextFactory(IConnectionFactory connectionFactory)
+ {
+ this.connectionFactory = connectionFactory;
+ }
+
+ public async ValueTask CreateClientAdapterAsync(MqttClientOptions options, MqttPacketInspector packetInspector, IMqttNetLogger logger)
{
if (options == null) throw new ArgumentNullException(nameof(options));
@@ -19,8 +31,8 @@ public IMqttChannelAdapter CreateClientAdapter(MqttClientOptions options, MqttPa
{
case MqttClientTcpOptions tcpOptions:
{
- var tcpConnection = new SocketConnection(tcpOptions.RemoteEndpoint);
-
+ var endPoint = await CreateIPEndPointAsync(tcpOptions.RemoteEndpoint);
+ var tcpConnection = await connectionFactory.ConnectAsync(endPoint);
var formatter = new MqttPacketFormatterAdapter(options.ProtocolVersion, new MqttBufferWriter(4096, 65535));
return new MqttConnectionContext(formatter, tcpConnection);
}
@@ -30,5 +42,24 @@ public IMqttChannelAdapter CreateClientAdapter(MqttClientOptions options, MqttPa
}
}
}
+
+ private static async ValueTask CreateIPEndPointAsync(EndPoint endpoint)
+ {
+ if (endpoint is IPEndPoint ipEndPoint)
+ {
+ return ipEndPoint;
+ }
+
+ if (endpoint is DnsEndPoint dnsEndPoint)
+ {
+ var hostEntry = await Dns.GetHostEntryAsync(dnsEndPoint.Host);
+ var address = hostEntry.AddressList.OrderBy(item => item.AddressFamily).FirstOrDefault();
+ return address == null
+ ? throw new SocketException((int)SocketError.HostNotFound)
+ : new IPEndPoint(address, dnsEndPoint.Port);
+ }
+
+ throw new NotSupportedException("Only supports IPEndPoint or DnsEndPoint for now.");
+ }
}
}
diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
index c16e5f483..b7b1c775d 100644
--- a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
+++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
@@ -2,13 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using System;
-using System.Buffers;
-using System.IO.Pipelines;
-using System.Net;
-using System.Security.Cryptography.X509Certificates;
-using System.Threading;
-using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Connections.Features;
using Microsoft.AspNetCore.Http.Features;
@@ -17,6 +10,13 @@
using MQTTnet.Formatter;
using MQTTnet.Internal;
using MQTTnet.Packets;
+using System;
+using System.Buffers;
+using System.IO.Pipelines;
+using System.Net;
+using System.Security.Cryptography.X509Certificates;
+using System.Threading;
+using System.Threading.Tasks;
namespace MQTTnet.AspNetCore;
@@ -25,19 +25,15 @@ public sealed class MqttConnectionContext : IMqttChannelAdapter
readonly ConnectionContext _connection;
readonly AsyncLock _writerLock = new();
- PipeReader _input;
- PipeWriter _output;
+ readonly PipeReader _input;
+ readonly PipeWriter _output;
public MqttConnectionContext(MqttPacketFormatterAdapter packetFormatterAdapter, ConnectionContext connection)
{
PacketFormatterAdapter = packetFormatterAdapter ?? throw new ArgumentNullException(nameof(packetFormatterAdapter));
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
-
- if (!(_connection is SocketConnection tcp) || tcp.IsConnected)
- {
- _input = connection.Transport.Input;
- _output = connection.Transport.Output;
- }
+ _input = connection.Transport.Input;
+ _output = connection.Transport.Output;
}
public long BytesReceived { get; private set; }
@@ -106,15 +102,9 @@ public bool IsSecureConnection
public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }
- public async Task ConnectAsync(CancellationToken cancellationToken)
+ public Task ConnectAsync(CancellationToken cancellationToken)
{
- if (_connection is SocketConnection tcp && !tcp.IsConnected)
- {
- await tcp.StartAsync().ConfigureAwait(false);
- }
-
- _input = _connection.Transport.Input;
- _output = _connection.Transport.Output;
+ return Task.CompletedTask;
}
public Task DisconnectAsync(CancellationToken cancellationToken)
@@ -126,7 +116,7 @@ public Task DisconnectAsync(CancellationToken cancellationToken)
}
public void Dispose()
- {
+ {
_writerLock.Dispose();
}
diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs
index b4cbc42a8..dfeb2f3f7 100644
--- a/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs
+++ b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs
@@ -33,13 +33,11 @@ public override async Task OnConnectedAsync(ConnectionContext connection)
}
var formatter = new MqttPacketFormatterAdapter(new MqttBufferWriter(_serverOptions.WriterBufferSize, _serverOptions.WriterBufferSizeMax));
- using (var adapter = new MqttConnectionContext(formatter, connection))
+ using var adapter = new MqttConnectionContext(formatter, connection);
+ var clientHandler = ClientHandler;
+ if (clientHandler != null)
{
- var clientHandler = ClientHandler;
- if (clientHandler != null)
- {
- await clientHandler(adapter).ConfigureAwait(false);
- }
+ await clientHandler(adapter).ConfigureAwait(false);
}
}
diff --git a/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs b/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs
index 915f6791c..bcb8b38e7 100644
--- a/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs
+++ b/Source/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs
@@ -2,18 +2,25 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using System;
+using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
+using MQTTnet.Adapter;
using MQTTnet.Diagnostics.Logger;
using MQTTnet.Server;
using MQTTnet.Server.Internal.Adapter;
+using System;
+using System.Diagnostics.CodeAnalysis;
+using System.Reflection;
namespace MQTTnet.AspNetCore;
public static class ServiceCollectionExtensions
{
+ const string SocketConnectionFactoryTypeName = "Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactory";
+ const string SocketConnectionFactoryAssemblyName = "Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets";
+
public static IServiceCollection AddHostedMqttServer(this IServiceCollection services, MqttServerOptions options)
{
ArgumentNullException.ThrowIfNull(services);
@@ -106,4 +113,15 @@ public static IServiceCollection AddMqttWebSocketServerAdapter(this IServiceColl
return services;
}
+
+
+ [DynamicDependency(DynamicallyAccessedMemberTypes.All, SocketConnectionFactoryTypeName, SocketConnectionFactoryAssemblyName)]
+ public static IServiceCollection AddMqttClientConnectionContextFactory(this IServiceCollection services)
+ {
+ var socketConnectionFactoryType = Assembly.Load(SocketConnectionFactoryAssemblyName).GetType(SocketConnectionFactoryTypeName);
+ services.AddSingleton(typeof(IConnectionFactory), socketConnectionFactoryType);
+ services.TryAddSingleton();
+ services.TryAddSingleton(serviceProvider => serviceProvider.GetRequiredService());
+ return services;
+ }
}
\ No newline at end of file
diff --git a/Source/MQTTnet.AspnetCore/SocketAwaitable.cs b/Source/MQTTnet.AspnetCore/SocketAwaitable.cs
deleted file mode 100644
index 2c9607279..000000000
--- a/Source/MQTTnet.AspnetCore/SocketAwaitable.cs
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-// See the LICENSE file in the project root for more information.
-
-using System;
-using System.Diagnostics;
-using System.IO.Pipelines;
-using System.Net.Sockets;
-using System.Runtime.CompilerServices;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace MQTTnet.AspNetCore;
-
-public class SocketAwaitable : ICriticalNotifyCompletion
-{
- static readonly Action _callbackCompleted = () =>
- {
- };
-
- readonly PipeScheduler _ioScheduler;
- int _bytesTransferred;
-
- Action _callback;
- SocketError _error;
-
- public SocketAwaitable(PipeScheduler ioScheduler)
- {
- _ioScheduler = ioScheduler;
- }
-
- public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
-
- public void Complete(int bytesTransferred, SocketError socketError)
- {
- _error = socketError;
- _bytesTransferred = bytesTransferred;
- var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
-
- if (continuation != null)
- {
- _ioScheduler.Schedule(state => ((Action)state)(), continuation);
- }
- }
-
- public SocketAwaitable GetAwaiter()
- {
- return this;
- }
-
- public int GetResult()
- {
- Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));
-
- _callback = null;
-
- if (_error != SocketError.Success)
- {
- throw new SocketException((int)_error);
- }
-
- return _bytesTransferred;
- }
-
- public void OnCompleted(Action continuation)
- {
- if (ReferenceEquals(_callback, _callbackCompleted) || ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
- {
- Task.Run(continuation);
- }
- }
-
- public void UnsafeOnCompleted(Action continuation)
- {
- OnCompleted(continuation);
- }
-}
\ No newline at end of file
diff --git a/Source/MQTTnet.AspnetCore/SocketConnection.cs b/Source/MQTTnet.AspnetCore/SocketConnection.cs
deleted file mode 100644
index 2021eccac..000000000
--- a/Source/MQTTnet.AspnetCore/SocketConnection.cs
+++ /dev/null
@@ -1,261 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-// See the LICENSE file in the project root for more information.
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.IO.Pipelines;
-using System.Net;
-using System.Net.Sockets;
-using System.Threading.Tasks;
-using Microsoft.AspNetCore.Connections;
-using Microsoft.AspNetCore.Http.Features;
-using MQTTnet.Exceptions;
-
-namespace MQTTnet.AspNetCore;
-
-public sealed class SocketConnection : ConnectionContext
-{
- readonly EndPoint _endPoint;
- volatile bool _aborted;
- IDuplexPipe _application;
- SocketReceiver _receiver;
- SocketSender _sender;
-
- Socket _socket;
-
- public SocketConnection(EndPoint endPoint)
- {
- _endPoint = endPoint;
- }
-
- public SocketConnection(Socket socket)
- {
- _socket = socket;
- _endPoint = socket.RemoteEndPoint;
-
- _sender = new SocketSender(_socket, PipeScheduler.ThreadPool);
- _receiver = new SocketReceiver(_socket, PipeScheduler.ThreadPool);
- }
-
- public override string ConnectionId { get; set; }
- public override IFeatureCollection Features { get; }
-
- public bool IsConnected { get; private set; }
- public override IDictionary