From c8f19f596eb00949288b21864b0523a817d4df70 Mon Sep 17 00:00:00 2001 From: Rico Suter Date: Mon, 24 Nov 2025 22:43:17 +0100 Subject: [PATCH] performance: Add batch InjectApplicationMessages to support high throughput --- .../Internal/MqttClientSessionsManager.cs | 169 ++++++++++++++++-- .../Internal/MqttConnectedClient.cs | 81 ++++++--- Source/MQTTnet.Server/Internal/MqttSession.cs | 5 + Source/MQTTnet.Server/MqttServer.cs | 30 ++++ Source/MQTTnet.Tests/Server/Load_Tests.cs | 5 +- .../Server/Subscription_TopicHash_Tests.cs | 2 +- Source/MQTTnet/Internal/MqttPacketBus.cs | 33 ++++ Source/MQTTnet/Internal/MqttPacketBusItem.cs | 10 +- 8 files changed, 289 insertions(+), 46 deletions(-) diff --git a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs index 9ea017349..a2e554322 100644 --- a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs @@ -32,6 +32,9 @@ public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification readonly MqttSessionsStorage _sessionsStorage = new(); readonly HashSet _subscriberSessions = []; + // Cached array to avoid allocations on every message dispatch + MqttSession[] _subscriberSessionsCache; + public MqttClientSessionsManager(MqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, MqttServerEventContainer eventContainer, IMqttNetLogger logger) { ArgumentNullException.ThrowIfNull(logger); @@ -78,6 +81,7 @@ public async Task DeleteSessionAsync(string clientId) if (_sessionsStorage.TryRemoveSession(clientId, out session)) { _subscriberSessions.Remove(session); + _subscriberSessionsCache = null; } } finally @@ -161,11 +165,12 @@ public async Task DispatchApplicationMessage( await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false); } - List subscriberSessions; + MqttSession[] subscriberSessions; _sessionsManagementLock.EnterReadLock(); try { - subscriberSessions = _subscriberSessions.ToList(); + // Use cached array to avoid allocation on every message + subscriberSessions = _subscriberSessionsCache ??= [.. _subscriberSessions]; } finally { @@ -175,6 +180,9 @@ public async Task DispatchApplicationMessage( // Calculate application message topic hash once for subscription checks MqttTopicHash.Calculate(applicationMessage.Topic, out var topicHash, out _, out _); + // For QoS 0 messages, we can reuse the same packet for all subscribers + MqttPublishPacket sharedQos0Packet = null; + foreach (var session in subscriberSessions) { if (!session.TryCheckSubscriptions(applicationMessage.Topic, topicHash, applicationMessage.QualityOfServiceLevel, senderId, out var checkSubscriptionsResult)) @@ -194,28 +202,46 @@ public async Task DispatchApplicationMessage( continue; } - var publishPacketCopy = MqttPublishPacketFactory.Create(applicationMessage); - publishPacketCopy.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel; - publishPacketCopy.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers; - - if (publishPacketCopy.QualityOfServiceLevel > 0) - { - publishPacketCopy.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier(); - } + MqttPublishPacket publishPacket; - if (checkSubscriptionsResult.RetainAsPublished) + // For QoS 0, reuse the same packet to avoid allocations + if (checkSubscriptionsResult.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce && + checkSubscriptionsResult.SubscriptionIdentifiers == null && + !checkSubscriptionsResult.RetainAsPublished) { - // Transfer the original retain state from the publisher. This is a MQTTv5 feature. - publishPacketCopy.Retain = applicationMessage.Retain; + if (sharedQos0Packet == null) + { + sharedQos0Packet = MqttPublishPacketFactory.Create(applicationMessage); + sharedQos0Packet.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; + sharedQos0Packet.Retain = false; + } + publishPacket = sharedQos0Packet; } else { - publishPacketCopy.Retain = false; + publishPacket = MqttPublishPacketFactory.Create(applicationMessage); + publishPacket.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel; + publishPacket.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers; + + if (publishPacket.QualityOfServiceLevel > 0) + { + publishPacket.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier(); + } + + if (checkSubscriptionsResult.RetainAsPublished) + { + // Transfer the original retain state from the publisher. This is a MQTTv5 feature. + publishPacket.Retain = applicationMessage.Retain; + } + else + { + publishPacket.Retain = false; + } } matchingSubscribersCount++; - var result = session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy)); + var result = session.EnqueueDataPacket(new MqttPacketBusItem(publishPacket)); if (_eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.HasHandlers) { @@ -437,6 +463,116 @@ public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter } } + /// + /// Dispatch multiple application messages at once for better performance. + /// + public async Task DispatchApplicationMessages( + IReadOnlyList messages, + IDictionary defaultSessionItems, + CancellationToken cancellationToken) + { + // Get subscriber sessions once for all messages + MqttSession[] subscriberSessions; + _sessionsManagementLock.EnterReadLock(); + try + { + subscriberSessions = _subscriberSessionsCache ??= [.. _subscriberSessions]; + } + finally + { + _sessionsManagementLock.ExitReadLock(); + } + + if (subscriberSessions.Length == 0) + { + return; + } + + // Process each message + foreach (var injectedMessage in messages) + { + var applicationMessage = injectedMessage.ApplicationMessage; + var senderId = injectedMessage.SenderClientId; + var sessionItems = injectedMessage.CustomSessionItems ?? defaultSessionItems; + + // Check interception + if (_eventContainer.InterceptingPublishEvent.HasHandlers) + { + var interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, senderId, injectedMessage.SenderUserName, sessionItems, cancellationToken); + await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(false); + + if (!interceptingPublishEventArgs.ProcessPublish) + { + continue; + } + + applicationMessage = interceptingPublishEventArgs.ApplicationMessage; + } + + if (applicationMessage == null) + { + continue; + } + + // Handle retained messages + if (applicationMessage.Retain) + { + await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false); + } + + // Calculate topic hash once + MqttTopicHash.Calculate(applicationMessage.Topic, out var topicHash, out _, out _); + + // For QoS 0 messages, we can reuse the same packet for all subscribers + MqttPublishPacket sharedQos0Packet = null; + + // Dispatch to all subscribers + foreach (var session in subscriberSessions) + { + if (!session.TryCheckSubscriptions(applicationMessage.Topic, topicHash, applicationMessage.QualityOfServiceLevel, senderId, out var checkSubscriptionsResult)) + { + continue; + } + + if (!checkSubscriptionsResult.IsSubscribed) + { + continue; + } + + MqttPublishPacket publishPacket; + + // For QoS 0, reuse the same packet to avoid allocations + if (checkSubscriptionsResult.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce && + checkSubscriptionsResult.SubscriptionIdentifiers == null && + !checkSubscriptionsResult.RetainAsPublished) + { + if (sharedQos0Packet == null) + { + sharedQos0Packet = Formatter.MqttPublishPacketFactory.Create(applicationMessage); + sharedQos0Packet.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; + sharedQos0Packet.Retain = false; + } + publishPacket = sharedQos0Packet; + } + else + { + publishPacket = Formatter.MqttPublishPacketFactory.Create(applicationMessage); + publishPacket.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel; + publishPacket.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers; + + if (publishPacket.QualityOfServiceLevel > 0) + { + publishPacket.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier(); + } + + publishPacket.Retain = checkSubscriptionsResult.RetainAsPublished && applicationMessage.Retain; + } + + session.EnqueueDataPacket(new MqttPacketBusItem(publishPacket)); + } + } + } + public void OnSubscriptionsAdded(MqttSession clientSession, List subscriptionsTopics) { _sessionsManagementLock.EnterWriteLock(); @@ -446,6 +582,7 @@ public void OnSubscriptionsAdded(MqttSession clientSession, List subscri { // first subscribed topic _subscriberSessions.Add(clientSession); + _subscriberSessionsCache = null; } foreach (var topic in subscriptionsTopics) @@ -473,6 +610,7 @@ public void OnSubscriptionsRemoved(MqttSession clientSession, List subsc { // last subscription removed _subscriberSessions.Remove(clientSession); + _subscriberSessionsCache = null; } } finally @@ -563,6 +701,7 @@ async Task CreateClientConnection( { _logger.Verbose("Deleting existing session of client '{0}' due to clean start", connectPacket.ClientId); _subscriberSessions.Remove(oldSession); + _subscriberSessionsCache = null; session = CreateSession(connectPacket, validatingConnectionEventArgs); } else diff --git a/Source/MQTTnet.Server/Internal/MqttConnectedClient.cs b/Source/MQTTnet.Server/Internal/MqttConnectedClient.cs index 48e6bd2c7..e8a3178e0 100644 --- a/Source/MQTTnet.Server/Internal/MqttConnectedClient.cs +++ b/Source/MQTTnet.Server/Internal/MqttConnectedClient.cs @@ -489,40 +489,73 @@ async Task ReceivePackagesLoop(CancellationToken cancellationToken) async Task SendPacketsLoop(CancellationToken cancellationToken) { MqttPacketBusItem packetBusItem = null; + var batchBuffer = new MqttPacketBusItem[64]; try { while (!cancellationToken.IsCancellationRequested && !IsTakenOver && IsRunning) { - packetBusItem = await Session.DequeuePacketAsync(cancellationToken).ConfigureAwait(false); - - // Also check the cancellation token here because the dequeue is blocking and may take some time. - if (cancellationToken.IsCancellationRequested) + var batchCount = Session.DequeuePackets(batchBuffer, batchBuffer.Length); + if (batchCount == 0) { - return; - } + // Queue is empty, wait for next packet + packetBusItem = await Session.DequeuePacketAsync(cancellationToken).ConfigureAwait(false); - if (IsTakenOver || !IsRunning) - { - return; - } + // Also check the cancellation token here because the dequeue is blocking and may take some time. + if (cancellationToken.IsCancellationRequested) + { + return; + } - try - { - await SendPacketAsync(packetBusItem.Packet, cancellationToken).ConfigureAwait(false); - packetBusItem.Complete(); - } - catch (OperationCanceledException) - { - packetBusItem.Cancel(); - } - catch (Exception exception) - { - packetBusItem.Fail(exception); + if (IsTakenOver || !IsRunning) + { + return; + } + + try + { + await SendPacketAsync(packetBusItem.Packet, cancellationToken).ConfigureAwait(false); + packetBusItem.Complete(); + } + catch (OperationCanceledException) + { + packetBusItem.Cancel(); + } + catch (Exception exception) + { + packetBusItem.Fail(exception); + } } - finally + else { - await Task.Yield(); + // Process batch + for (var i = 0; i < batchCount; i++) + { + if (cancellationToken.IsCancellationRequested || IsTakenOver || !IsRunning) + { + // Cancel remaining items + for (var j = i; j < batchCount; j++) + { + batchBuffer[j].Cancel(); + } + return; + } + + packetBusItem = batchBuffer[i]; + try + { + await SendPacketAsync(packetBusItem.Packet, cancellationToken).ConfigureAwait(false); + packetBusItem.Complete(); + } + catch (OperationCanceledException) + { + packetBusItem.Cancel(); + } + catch (Exception exception) + { + packetBusItem.Fail(exception); + } + } } } } diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs index 7e3ab1279..b6aa2abfe 100644 --- a/Source/MQTTnet.Server/Internal/MqttSession.cs +++ b/Source/MQTTnet.Server/Internal/MqttSession.cs @@ -99,6 +99,11 @@ public Task DequeuePacketAsync(CancellationToken cancellation return _packetBus.DequeueItemAsync(cancellationToken); } + public int DequeuePackets(MqttPacketBusItem[] buffer, int maxCount) + { + return _packetBus.DequeueItems(buffer, maxCount); + } + public void Dispose() { _packetBus.Dispose(); diff --git a/Source/MQTTnet.Server/MqttServer.cs b/Source/MQTTnet.Server/MqttServer.cs index 8f1807cff..319a6c0f5 100644 --- a/Source/MQTTnet.Server/MqttServer.cs +++ b/Source/MQTTnet.Server/MqttServer.cs @@ -268,6 +268,36 @@ public Task InjectApplicationMessage(InjectedMqttApplicationMessage injectedAppl cancellationToken); } + /// + /// Injects multiple application messages at once for better performance. + /// + public Task InjectApplicationMessages(IReadOnlyList injectedApplicationMessages, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(injectedApplicationMessages); + + if (injectedApplicationMessages.Count == 0) + { + return Task.CompletedTask; + } + + ThrowIfNotStarted(); + + foreach (var injectedApplicationMessage in injectedApplicationMessages) + { + ArgumentNullException.ThrowIfNull(injectedApplicationMessage); + ArgumentNullException.ThrowIfNull(injectedApplicationMessage.ApplicationMessage); + + MqttTopicValidator.ThrowIfInvalid(injectedApplicationMessage.ApplicationMessage.Topic); + + if (string.IsNullOrEmpty(injectedApplicationMessage.ApplicationMessage.Topic)) + { + throw new NotSupportedException("Injected application messages must contain a topic (topic alias is not supported)"); + } + } + + return _clientSessionsManager.DispatchApplicationMessages(injectedApplicationMessages, ServerSessionItems, cancellationToken); + } + public async Task StartAsync() { ThrowIfStarted(); diff --git a/Source/MQTTnet.Tests/Server/Load_Tests.cs b/Source/MQTTnet.Tests/Server/Load_Tests.cs index 7573e50ea..75490860f 100644 --- a/Source/MQTTnet.Tests/Server/Load_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Load_Tests.cs @@ -1,5 +1,6 @@ #if DEBUG +using System.Globalization; using MQTTnet.Internal; using MQTTnet.Packets; using MQTTnet.Protocol; @@ -52,7 +53,7 @@ await client.SendAsync( for (var j = 0; j < 1000; j++) { - publishPacket.Topic = j.ToString(); + publishPacket.Topic = j.ToString(CultureInfo.InvariantCulture); await client.SendAsync(publishPacket, CancellationToken.None); } @@ -139,7 +140,7 @@ public async Task Handle_100_000_Messages_In_Server() for (var j = 0; j < 1000; j++) { - var message = applicationMessageBuilder.WithTopic(j.ToString()).Build(); + var message = applicationMessageBuilder.WithTopic(j.ToString(CultureInfo.InvariantCulture)).Build(); await client.PublishAsync(message); } diff --git a/Source/MQTTnet.Tests/Server/Subscription_TopicHash_Tests.cs b/Source/MQTTnet.Tests/Server/Subscription_TopicHash_Tests.cs index 604b6d4b2..df0e6f100 100644 --- a/Source/MQTTnet.Tests/Server/Subscription_TopicHash_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Subscription_TopicHash_Tests.cs @@ -523,7 +523,7 @@ static byte[] GetBytes(ulong value) // Ensure that highest byte comes first for comparison left to right if (BitConverter.IsLittleEndian) { - return bytes.Reverse().ToArray(); + return ((IEnumerable)bytes).Reverse().ToArray(); } return bytes; diff --git a/Source/MQTTnet/Internal/MqttPacketBus.cs b/Source/MQTTnet/Internal/MqttPacketBus.cs index b43e6630b..7ceb25475 100644 --- a/Source/MQTTnet/Internal/MqttPacketBus.cs +++ b/Source/MQTTnet/Internal/MqttPacketBus.cs @@ -128,6 +128,39 @@ public void EnqueueItem(MqttPacketBusItem item, MqttPacketBusPartition partition } } + public int DequeueItems(MqttPacketBusItem[] buffer, int maxCount) + { + var count = 0; + lock (_syncRoot) + { + while (count < maxCount) + { + MqttPacketBusItem item = null; + + for (var i = 0; i < 3; i++) + { + MoveActivePartition(); + + var activePartition = _partitions[_activePartition]; + if (activePartition.First != null) + { + item = activePartition.First.Value; + activePartition.RemoveFirst(); + break; + } + } + + if (item == null) + { + break; + } + + buffer[count++] = item; + } + } + return count; + } + public List ExportPackets(MqttPacketBusPartition partition) { lock (_syncRoot) diff --git a/Source/MQTTnet/Internal/MqttPacketBusItem.cs b/Source/MQTTnet/Internal/MqttPacketBusItem.cs index 5f4944731..d228d70f4 100644 --- a/Source/MQTTnet/Internal/MqttPacketBusItem.cs +++ b/Source/MQTTnet/Internal/MqttPacketBusItem.cs @@ -8,7 +8,7 @@ namespace MQTTnet.Internal; public sealed class MqttPacketBusItem { - readonly AsyncTaskCompletionSource _promise = new(); + AsyncTaskCompletionSource _promise; public MqttPacketBusItem(MqttPacket packet) { @@ -21,22 +21,24 @@ public MqttPacketBusItem(MqttPacket packet) public void Cancel() { - _promise.TrySetCanceled(); + _promise?.TrySetCanceled(); } public void Complete() { - _promise.TrySetResult(Packet); + _promise?.TrySetResult(Packet); Completed?.Invoke(this, EventArgs.Empty); } public void Fail(Exception exception) { - _promise.TrySetException(exception); + _promise?.TrySetException(exception); } public Task WaitAsync() { + // Lazy initialization - only allocate when actually needed + _promise ??= new AsyncTaskCompletionSource(); return _promise.Task; } } \ No newline at end of file