Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 154 additions & 15 deletions Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification
readonly MqttSessionsStorage _sessionsStorage = new();
readonly HashSet<MqttSession> _subscriberSessions = [];

// Cached array to avoid allocations on every message dispatch
MqttSession[] _subscriberSessionsCache;

public MqttClientSessionsManager(MqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, MqttServerEventContainer eventContainer, IMqttNetLogger logger)
{
ArgumentNullException.ThrowIfNull(logger);
Expand Down Expand Up @@ -78,6 +81,7 @@ public async Task DeleteSessionAsync(string clientId)
if (_sessionsStorage.TryRemoveSession(clientId, out session))
{
_subscriberSessions.Remove(session);
_subscriberSessionsCache = null;
}
}
finally
Expand Down Expand Up @@ -161,11 +165,12 @@ public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(
await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false);
}

List<MqttSession> subscriberSessions;
MqttSession[] subscriberSessions;
_sessionsManagementLock.EnterReadLock();
try
{
subscriberSessions = _subscriberSessions.ToList();
// Use cached array to avoid allocation on every message
subscriberSessions = _subscriberSessionsCache ??= [.. _subscriberSessions];
}
finally
{
Expand All @@ -175,6 +180,9 @@ public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(
// Calculate application message topic hash once for subscription checks
MqttTopicHash.Calculate(applicationMessage.Topic, out var topicHash, out _, out _);

// For QoS 0 messages, we can reuse the same packet for all subscribers
MqttPublishPacket sharedQos0Packet = null;

foreach (var session in subscriberSessions)
{
if (!session.TryCheckSubscriptions(applicationMessage.Topic, topicHash, applicationMessage.QualityOfServiceLevel, senderId, out var checkSubscriptionsResult))
Expand All @@ -194,28 +202,46 @@ public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(
continue;
}

var publishPacketCopy = MqttPublishPacketFactory.Create(applicationMessage);
publishPacketCopy.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel;
publishPacketCopy.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers;

if (publishPacketCopy.QualityOfServiceLevel > 0)
{
publishPacketCopy.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier();
}
MqttPublishPacket publishPacket;

if (checkSubscriptionsResult.RetainAsPublished)
// For QoS 0, reuse the same packet to avoid allocations
if (checkSubscriptionsResult.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce &&
checkSubscriptionsResult.SubscriptionIdentifiers == null &&
!checkSubscriptionsResult.RetainAsPublished)
{
// Transfer the original retain state from the publisher. This is a MQTTv5 feature.
publishPacketCopy.Retain = applicationMessage.Retain;
if (sharedQos0Packet == null)
{
sharedQos0Packet = MqttPublishPacketFactory.Create(applicationMessage);
sharedQos0Packet.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
sharedQos0Packet.Retain = false;
}
publishPacket = sharedQos0Packet;
}
else
{
publishPacketCopy.Retain = false;
publishPacket = MqttPublishPacketFactory.Create(applicationMessage);
publishPacket.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel;
publishPacket.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers;

if (publishPacket.QualityOfServiceLevel > 0)
{
publishPacket.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier();
}

if (checkSubscriptionsResult.RetainAsPublished)
{
// Transfer the original retain state from the publisher. This is a MQTTv5 feature.
publishPacket.Retain = applicationMessage.Retain;
}
else
{
publishPacket.Retain = false;
}
}

matchingSubscribersCount++;

var result = session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy));
var result = session.EnqueueDataPacket(new MqttPacketBusItem(publishPacket));

if (_eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.HasHandlers)
{
Expand Down Expand Up @@ -437,6 +463,116 @@ public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter
}
}

/// <summary>
/// Dispatch multiple application messages at once for better performance.
/// </summary>
public async Task DispatchApplicationMessages(
IReadOnlyList<InjectedMqttApplicationMessage> messages,
IDictionary defaultSessionItems,
CancellationToken cancellationToken)
{
// Get subscriber sessions once for all messages
MqttSession[] subscriberSessions;
_sessionsManagementLock.EnterReadLock();
try
{
subscriberSessions = _subscriberSessionsCache ??= [.. _subscriberSessions];
}
finally
{
_sessionsManagementLock.ExitReadLock();
}

if (subscriberSessions.Length == 0)
{
return;
}

// Process each message
foreach (var injectedMessage in messages)
{
var applicationMessage = injectedMessage.ApplicationMessage;
var senderId = injectedMessage.SenderClientId;
var sessionItems = injectedMessage.CustomSessionItems ?? defaultSessionItems;

// Check interception
if (_eventContainer.InterceptingPublishEvent.HasHandlers)
{
var interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, senderId, injectedMessage.SenderUserName, sessionItems, cancellationToken);
await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(false);

if (!interceptingPublishEventArgs.ProcessPublish)
{
continue;
}

applicationMessage = interceptingPublishEventArgs.ApplicationMessage;
}

if (applicationMessage == null)
{
continue;
}

// Handle retained messages
if (applicationMessage.Retain)
{
await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false);
}

// Calculate topic hash once
MqttTopicHash.Calculate(applicationMessage.Topic, out var topicHash, out _, out _);

// For QoS 0 messages, we can reuse the same packet for all subscribers
MqttPublishPacket sharedQos0Packet = null;

// Dispatch to all subscribers
foreach (var session in subscriberSessions)
{
if (!session.TryCheckSubscriptions(applicationMessage.Topic, topicHash, applicationMessage.QualityOfServiceLevel, senderId, out var checkSubscriptionsResult))
{
continue;
}

if (!checkSubscriptionsResult.IsSubscribed)
{
continue;
}

MqttPublishPacket publishPacket;

// For QoS 0, reuse the same packet to avoid allocations
if (checkSubscriptionsResult.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce &&
checkSubscriptionsResult.SubscriptionIdentifiers == null &&
!checkSubscriptionsResult.RetainAsPublished)
{
if (sharedQos0Packet == null)
{
sharedQos0Packet = Formatter.MqttPublishPacketFactory.Create(applicationMessage);
sharedQos0Packet.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
sharedQos0Packet.Retain = false;
}
publishPacket = sharedQos0Packet;
}
else
{
publishPacket = Formatter.MqttPublishPacketFactory.Create(applicationMessage);
publishPacket.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel;
publishPacket.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers;

if (publishPacket.QualityOfServiceLevel > 0)
{
publishPacket.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier();
}

publishPacket.Retain = checkSubscriptionsResult.RetainAsPublished && applicationMessage.Retain;
}

session.EnqueueDataPacket(new MqttPacketBusItem(publishPacket));
}
}
}

public void OnSubscriptionsAdded(MqttSession clientSession, List<string> subscriptionsTopics)
{
_sessionsManagementLock.EnterWriteLock();
Expand All @@ -446,6 +582,7 @@ public void OnSubscriptionsAdded(MqttSession clientSession, List<string> subscri
{
// first subscribed topic
_subscriberSessions.Add(clientSession);
_subscriberSessionsCache = null;
}

foreach (var topic in subscriptionsTopics)
Expand Down Expand Up @@ -473,6 +610,7 @@ public void OnSubscriptionsRemoved(MqttSession clientSession, List<string> subsc
{
// last subscription removed
_subscriberSessions.Remove(clientSession);
_subscriberSessionsCache = null;
}
}
finally
Expand Down Expand Up @@ -563,6 +701,7 @@ async Task<MqttConnectedClient> CreateClientConnection(
{
_logger.Verbose("Deleting existing session of client '{0}' due to clean start", connectPacket.ClientId);
_subscriberSessions.Remove(oldSession);
_subscriberSessionsCache = null;
session = CreateSession(connectPacket, validatingConnectionEventArgs);
}
else
Expand Down
81 changes: 57 additions & 24 deletions Source/MQTTnet.Server/Internal/MqttConnectedClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -489,40 +489,73 @@ async Task ReceivePackagesLoop(CancellationToken cancellationToken)
async Task SendPacketsLoop(CancellationToken cancellationToken)
{
MqttPacketBusItem packetBusItem = null;
var batchBuffer = new MqttPacketBusItem[64];

try
{
while (!cancellationToken.IsCancellationRequested && !IsTakenOver && IsRunning)
{
packetBusItem = await Session.DequeuePacketAsync(cancellationToken).ConfigureAwait(false);

// Also check the cancellation token here because the dequeue is blocking and may take some time.
if (cancellationToken.IsCancellationRequested)
var batchCount = Session.DequeuePackets(batchBuffer, batchBuffer.Length);
if (batchCount == 0)
{
return;
}
// Queue is empty, wait for next packet
packetBusItem = await Session.DequeuePacketAsync(cancellationToken).ConfigureAwait(false);

if (IsTakenOver || !IsRunning)
{
return;
}
// Also check the cancellation token here because the dequeue is blocking and may take some time.
if (cancellationToken.IsCancellationRequested)
{
return;
}

try
{
await SendPacketAsync(packetBusItem.Packet, cancellationToken).ConfigureAwait(false);
packetBusItem.Complete();
}
catch (OperationCanceledException)
{
packetBusItem.Cancel();
}
catch (Exception exception)
{
packetBusItem.Fail(exception);
if (IsTakenOver || !IsRunning)
{
return;
}

try
{
await SendPacketAsync(packetBusItem.Packet, cancellationToken).ConfigureAwait(false);
packetBusItem.Complete();
}
catch (OperationCanceledException)
{
packetBusItem.Cancel();
}
catch (Exception exception)
{
packetBusItem.Fail(exception);
}
}
finally
else
{
await Task.Yield();
// Process batch
for (var i = 0; i < batchCount; i++)
{
if (cancellationToken.IsCancellationRequested || IsTakenOver || !IsRunning)
{
// Cancel remaining items
for (var j = i; j < batchCount; j++)
{
batchBuffer[j].Cancel();
}
return;
}

packetBusItem = batchBuffer[i];
try
{
await SendPacketAsync(packetBusItem.Packet, cancellationToken).ConfigureAwait(false);
packetBusItem.Complete();
}
catch (OperationCanceledException)
{
packetBusItem.Cancel();
}
catch (Exception exception)
{
packetBusItem.Fail(exception);
}
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions Source/MQTTnet.Server/Internal/MqttSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public Task<MqttPacketBusItem> DequeuePacketAsync(CancellationToken cancellation
return _packetBus.DequeueItemAsync(cancellationToken);
}

public int DequeuePackets(MqttPacketBusItem[] buffer, int maxCount)
{
return _packetBus.DequeueItems(buffer, maxCount);
}

public void Dispose()
{
_packetBus.Dispose();
Expand Down
30 changes: 30 additions & 0 deletions Source/MQTTnet.Server/MqttServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,36 @@ public Task InjectApplicationMessage(InjectedMqttApplicationMessage injectedAppl
cancellationToken);
}

/// <summary>
/// Injects multiple application messages at once for better performance.
/// </summary>
public Task InjectApplicationMessages(IReadOnlyList<InjectedMqttApplicationMessage> injectedApplicationMessages, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(injectedApplicationMessages);

if (injectedApplicationMessages.Count == 0)
{
return Task.CompletedTask;
}

ThrowIfNotStarted();

foreach (var injectedApplicationMessage in injectedApplicationMessages)
{
ArgumentNullException.ThrowIfNull(injectedApplicationMessage);
ArgumentNullException.ThrowIfNull(injectedApplicationMessage.ApplicationMessage);

MqttTopicValidator.ThrowIfInvalid(injectedApplicationMessage.ApplicationMessage.Topic);

if (string.IsNullOrEmpty(injectedApplicationMessage.ApplicationMessage.Topic))
{
throw new NotSupportedException("Injected application messages must contain a topic (topic alias is not supported)");
}
}

return _clientSessionsManager.DispatchApplicationMessages(injectedApplicationMessages, ServerSessionItems, cancellationToken);
}

public async Task StartAsync()
{
ThrowIfStarted();
Expand Down
Loading