From 279c896c5cd3b38edc5dde3d88d0c8b6388e52f0 Mon Sep 17 00:00:00 2001 From: Henrik Drachmann Date: Tue, 13 May 2025 10:59:04 +0200 Subject: [PATCH 1/6] first try --- .../AzureServiceBusTransport.cs | 32 ++++++++++++------- .../Events/RenewMessageFailedEvent.cs | 14 ++++++++ 2 files changed, 35 insertions(+), 11 deletions(-) create mode 100644 Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs diff --git a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs index 05b64f7..de22c7d 100644 --- a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs +++ b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs @@ -1,12 +1,7 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Azure.Core; +using Azure.Core; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; +using Rebus.AzureServiceBus.Events; using Rebus.AzureServiceBus.Messages; using Rebus.AzureServiceBus.NameFormat; using Rebus.Bus; @@ -18,6 +13,12 @@ using Rebus.Subscriptions; using Rebus.Threading; using Rebus.Transport; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; // ReSharper disable RedundantArgumentDefaultValue // ReSharper disable ArgumentsStyleNamedExpression @@ -73,7 +74,7 @@ public class AzureServiceBusTransport : ITransport, IInitializable, IDisposable, readonly string _subscriptionName; readonly ILog _log; readonly ServiceBusClient _client; - + event RenewMessageFailedHandler RenewMessageFailed; bool _prefetchingEnabled; int _prefetchCount; @@ -602,10 +603,17 @@ public async Task Receive(ITransactionContext context, Cancell var receivedMessage = await ReceiveInternal().ConfigureAwait(false); if (receivedMessage == null) return null; - + var message = receivedMessage.Message; var messageReceiver = receivedMessage.MessageReceiver; + RenewMessageFailedHandler failedEventHandler = (messageId, exception) => + { + context.SetResult(commit: false, ack: false); + _log.Warn("MessageId: {MessageId} - Failed to renew peek lock: {Exception}", messageId, exception.Message); + }; + RenewMessageFailed += failedEventHandler; + var items = context.Items; // add the message and its receiver to the context @@ -626,6 +634,8 @@ public async Task Receive(ITransactionContext context, Cancell context.OnAck(async ctx => { + RenewMessageFailed -= failedEventHandler; + _messageLockRenewers.TryRemove(messageId, out _); // only ACK the message if it's still in the context - this way, carefully crafted @@ -690,7 +700,7 @@ await messageReceiver.AbandonMessageAsync( { transportMessage.Headers[Headers.DeliveryCount] = message.DeliveryCount.ToString(); } - + return transportMessage; } @@ -944,7 +954,7 @@ await Task.WhenAll(mustBeRenewed.Select(async r => if (!_messageLockRenewers.ContainsKey(r.MessageId)) return; _log.Warn(exception, "Error when renewing peek lock for message with ID {messageId}", r.MessageId); - + RenewMessageFailed(r.MessageId, exception); // peek lock renewal will be automatically retried, because it's still due for renewal } })); diff --git a/Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs b/Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs new file mode 100644 index 0000000..b6b80fb --- /dev/null +++ b/Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Rebus.AzureServiceBus.Events +{ + public delegate void RenewMessageFailedHandler(string messageId, Exception exception); + public class RebusEvents + { + public event RenewMessageFailedHandler RenewMessageFailed; + } + + +} From 5346cc60dc608b072da4e8a1d171494a200b8166 Mon Sep 17 00:00:00 2001 From: Henrik Drachmann Date: Tue, 13 May 2025 11:00:08 +0200 Subject: [PATCH 2/6] removed ununsed event --- .../AzureServiceBus/Events/RenewMessageFailedEvent.cs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs b/Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs index b6b80fb..0917b6a 100644 --- a/Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs +++ b/Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs @@ -5,10 +5,5 @@ namespace Rebus.AzureServiceBus.Events { public delegate void RenewMessageFailedHandler(string messageId, Exception exception); - public class RebusEvents - { - public event RenewMessageFailedHandler RenewMessageFailed; - } - - + } From 9267c48b5916484c9e9aa2c497ade9a8e625b9d8 Mon Sep 17 00:00:00 2001 From: Henrik Drachmann Date: Fri, 27 Jun 2025 13:05:47 +0200 Subject: [PATCH 3/6] used dictionary of messageId with CancellationTokenSource instead of events --- .../AzureServiceBusTransport.cs | 40 +++++++++++-------- .../Events/RenewMessageFailedEvent.cs | 9 ----- 2 files changed, 23 insertions(+), 26 deletions(-) delete mode 100644 Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs diff --git a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs index 383db7c..a917960 100644 --- a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs +++ b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs @@ -1,7 +1,6 @@ using Azure.Core; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; -using Rebus.AzureServiceBus.Events; using Rebus.AzureServiceBus.Messages; using Rebus.AzureServiceBus.NameFormat; using Rebus.Bus; @@ -62,6 +61,7 @@ public class AzureServiceBusTransport : ITransport, IInitializable, IDisposable, readonly ExceptionIgnorant _subscriptionExceptionIgnorant = new ExceptionIgnorant(maxAttemps: 10).Ignore(ex => ex.IsTransient); readonly ConcurrentStack _disposables = new(); readonly ConcurrentDictionary _messageLockRenewers = new(); + readonly ConcurrentDictionary _messageRenewerTokenSources = new(); readonly ConcurrentDictionary _cachedSubscriberAddresses = new(); readonly ConcurrentDictionary> _messageSenders = new(); readonly ConcurrentDictionary>> _topicClients = new(); @@ -74,7 +74,6 @@ public class AzureServiceBusTransport : ITransport, IInitializable, IDisposable, readonly string _subscriptionName; readonly ILog _log; readonly ServiceBusClient _client; - event RenewMessageFailedHandler RenewMessageFailed; bool _prefetchingEnabled; int _prefetchCount; @@ -153,7 +152,7 @@ public async Task RegisterSubscriber(string topic, string subscriberAddress) { return; } - + VerifyIsOwnInputQueueAddress(subscriberAddress); topic = _nameFormatter.FormatTopicName(topic); @@ -192,7 +191,7 @@ public async Task UnregisterSubscriber(string topic, string subscriberAddress) { return; } - + VerifyIsOwnInputQueueAddress(subscriberAddress); topic = _nameFormatter.FormatTopicName(topic); @@ -613,22 +612,19 @@ public async Task Receive(ITransactionContext context, Cancell var receivedMessage = await ReceiveInternal().ConfigureAwait(false); if (receivedMessage == null) return null; - + var message = receivedMessage.Message; var messageReceiver = receivedMessage.MessageReceiver; - RenewMessageFailedHandler failedEventHandler = (messageId, exception) => - { - context.SetResult(commit: false, ack: false); - _log.Warn("MessageId: {MessageId} - Failed to renew peek lock: {Exception}", messageId, exception.Message); - }; - RenewMessageFailed += failedEventHandler; + var renewFailedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken); + _messageRenewerTokenSources.AddOrUpdate(message.MessageId, renewFailedTokenSource, (_, _) => renewFailedTokenSource); var items = context.Items; - // add the message and its receiver to the context items["asb-message"] = message; items["asb-message-receiver"] = messageReceiver; + //add token that cancels when renew fails or when the pipeline token cancels + items["asb-message-cancel-token"] = renewFailedTokenSource.Token; if (string.IsNullOrWhiteSpace(message.LockToken)) { @@ -644,8 +640,7 @@ public async Task Receive(ITransactionContext context, Cancell context.OnAck(async ctx => { - RenewMessageFailed -= failedEventHandler; - + _messageRenewerTokenSources.TryRemove(messageId, out var _); _messageLockRenewers.TryRemove(messageId, out _); // only ACK the message if it's still in the context - this way, carefully crafted @@ -673,6 +668,7 @@ await messageReceiver context.OnNack(async ctx => { + _messageRenewerTokenSources.TryRemove(messageId, out var _); _messageLockRenewers.TryRemove(messageId, out _); // only NACK the message if it's still in the context - this way, carefully crafted @@ -701,7 +697,11 @@ await messageReceiver.AbandonMessageAsync( } }); - context.OnDisposed(ctx => _messageLockRenewers.TryRemove(messageId, out _)); + context.OnDisposed(ctx => + { + _messageRenewerTokenSources.TryRemove(messageId, out var _); + _messageLockRenewers.TryRemove(messageId, out _); + }); var transportMessage = _messageConverter.ToTransport(message); context.Items[TransportmessageItemKey] = transportMessage; @@ -804,7 +804,7 @@ public void Initialize() /// Gets/sets whether to skip checking queues configuration /// public bool DoNotCheckQueueConfigurationEnabled { get; set; } - + /// /// Gets/sets whether to skip checking topics configuration /// @@ -969,7 +969,13 @@ await Task.WhenAll(mustBeRenewed.Select(async r => if (!_messageLockRenewers.ContainsKey(r.MessageId)) return; _log.Warn(exception, "Error when renewing peek lock for message with ID {messageId}", r.MessageId); - RenewMessageFailed(r.MessageId, exception); + + if (_messageRenewerTokenSources.TryGetValue(r.MessageId, out var renewFailedTokenSource)) + { + renewFailedTokenSource.Cancel(); + } + + // peek lock renewal will be automatically retried, because it's still due for renewal } })); diff --git a/Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs b/Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs deleted file mode 100644 index 0917b6a..0000000 --- a/Rebus.AzureServiceBus/AzureServiceBus/Events/RenewMessageFailedEvent.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Rebus.AzureServiceBus.Events -{ - public delegate void RenewMessageFailedHandler(string messageId, Exception exception); - -} From ec9f821b856e5a51ba7b3e5978f99e5a5354df03 Mon Sep 17 00:00:00 2001 From: Henrik Drachmann Date: Fri, 27 Jun 2025 13:10:20 +0200 Subject: [PATCH 4/6] removed orig --- .../AzureServiceBusTransport.cs | 13 +- .../AzureServiceBusTransport.cs.orig | 998 ------------------ 2 files changed, 7 insertions(+), 1004 deletions(-) delete mode 100644 Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs.orig diff --git a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs index 53963dd..a84f965 100644 --- a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs +++ b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs @@ -1,3 +1,9 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; using Azure.Core; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; @@ -12,12 +18,7 @@ using Rebus.Subscriptions; using Rebus.Threading; using Rebus.Transport; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; + // ReSharper disable RedundantArgumentDefaultValue // ReSharper disable ArgumentsStyleNamedExpression diff --git a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs.orig b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs.orig deleted file mode 100644 index 2c00e58..0000000 --- a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs.orig +++ /dev/null @@ -1,998 +0,0 @@ -<<<<<<< HEAD -using Azure.Core; -======= -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Azure.Core; ->>>>>>> f2ab590cf54d91340e2495893c9d462291d8e018 -using Azure.Messaging.ServiceBus; -using Azure.Messaging.ServiceBus.Administration; -using Rebus.AzureServiceBus.Messages; -using Rebus.AzureServiceBus.NameFormat; -using Rebus.Bus; -using Rebus.Exceptions; -using Rebus.Extensions; -using Rebus.Internals; -using Rebus.Logging; -using Rebus.Messages; -using Rebus.Subscriptions; -using Rebus.Threading; -using Rebus.Transport; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -// ReSharper disable RedundantArgumentDefaultValue -// ReSharper disable ArgumentsStyleNamedExpression -// ReSharper disable ArgumentsStyleOther -// ReSharper disable ArgumentsStyleLiteral -// ReSharper disable ArgumentsStyleAnonymousFunction -#pragma warning disable 1998 - -namespace Rebus.AzureServiceBus; - -/// -/// Implementation of that uses Azure Service Bus queues to send/receive messages. -/// -public class AzureServiceBusTransport : ITransport, IInitializable, IDisposable, ISubscriptionStorage -{ - /// - /// Outgoing messages are stashed in a concurrent queue under this key - /// - const string OutgoingMessagesKey = "new-azure-service-bus-transport"; - - /// - /// Subscriber "addresses" are prefixed with this bad boy so we can recognize them and publish to a topic client instead - /// - const string MagicSubscriptionPrefix = "TOPIC:"; - - /// - /// External timeout manager address set to this magic address will be routed to the destination address specified by the header - /// - public const string MagicDeferredMessagesAddress = "___deferred___"; - - const string TransportmessageItemKey = "transportMessage"; - - static readonly ServiceBusRetryOptions DefaultRetryStrategy = new() - { - Mode = ServiceBusRetryMode.Exponential, - Delay = TimeSpan.FromMilliseconds(100), - MaxDelay = TimeSpan.FromSeconds(10), - MaxRetries = 10 - }; - - readonly ExceptionIgnorant _subscriptionExceptionIgnorant = new ExceptionIgnorant(maxAttemps: 10).Ignore(ex => ex.IsTransient); - readonly ConcurrentStack _disposables = new(); - readonly ConcurrentDictionary _messageLockRenewers = new(); - readonly ConcurrentDictionary _messageRenewerTokenSources = new(); - readonly ConcurrentDictionary _cachedSubscriberAddresses = new(); - readonly ConcurrentDictionary> _messageSenders = new(); - readonly ConcurrentDictionary>> _topicClients = new(); - readonly CancellationToken _cancellationToken; - readonly IAsyncTask _messageLockRenewalTask; - readonly ServiceBusAdministrationClient _managementClient; - readonly ConnectionStringParser _connectionStringParser; - readonly INameFormatter _nameFormatter; - readonly IMessageConverter _messageConverter; - readonly string _subscriptionName; - readonly ILog _log; - readonly ServiceBusClient _client; - bool _prefetchingEnabled; - int _prefetchCount; - - ServiceBusReceiver _messageReceiver; - - /// - /// Constructs the transport, connecting to the service bus pointed to by the connection string. - /// - public AzureServiceBusTransport(string connectionString, string queueName, IRebusLoggerFactory rebusLoggerFactory, - IAsyncTaskFactory asyncTaskFactory, INameFormatter nameFormatter, IMessageConverter messageConverter, - CancellationToken cancellationToken = default, TokenCredential tokenCredential = null) - { - if (rebusLoggerFactory == null) throw new ArgumentNullException(nameof(rebusLoggerFactory)); - if (asyncTaskFactory == null) throw new ArgumentNullException(nameof(asyncTaskFactory)); - if (connectionString == null) throw new ArgumentNullException(nameof(connectionString)); - - _nameFormatter = nameFormatter; - _messageConverter = messageConverter; - - if (queueName != null) - { - // this never happens - if (queueName.StartsWith(MagicSubscriptionPrefix)) - { - throw new ArgumentException($"Sorry, but the queue name '{queueName}' cannot be used because it conflicts with Rebus' internally used 'magic subscription prefix': '{MagicSubscriptionPrefix}'. "); - } - - Address = _nameFormatter.FormatQueueName(queueName); - _subscriptionName = _nameFormatter.FormatSubscriptionName(queueName); - } - - _cancellationToken = cancellationToken; - _log = rebusLoggerFactory.GetLogger(); - - _connectionStringParser = new ConnectionStringParser(connectionString); - - var clientOptions = new ServiceBusClientOptions - { - TransportType = _connectionStringParser.Transport, - RetryOptions = DefaultRetryStrategy, - }; - - if (tokenCredential != null) - { - var connectionStringProperties = ServiceBusConnectionStringProperties.Parse(connectionString); - _managementClient = new ServiceBusAdministrationClient(connectionStringProperties.FullyQualifiedNamespace, tokenCredential); - _client = new ServiceBusClient(connectionStringProperties.FullyQualifiedNamespace, tokenCredential, clientOptions); - } - else - { - var connectionStringWithoutEntityPath = _connectionStringParser.GetConnectionStringWithoutEntityPath(); - - _client = new ServiceBusClient(connectionStringWithoutEntityPath, clientOptions); - _managementClient = new ServiceBusAdministrationClient(connectionStringWithoutEntityPath); - } - - _messageLockRenewalTask = asyncTaskFactory.Create("Peek Lock Renewal", RenewPeekLocks, prettyInsignificant: true, intervalSeconds: 10); - } - - /// - /// Gets "subscriber addresses" by getting one single magic "queue name", which is then - /// interpreted as a publish operation to a topic when the time comes to send to that "queue" - /// - public async Task> GetSubscriberAddresses(string topic) - { - return _cachedSubscriberAddresses.GetOrAdd(topic, _ => new[] { $"{MagicSubscriptionPrefix}{topic}" }); - } - - /// - /// Registers this endpoint as a subscriber by creating a subscription for the given topic, setting up - /// auto-forwarding from that subscription to this endpoint's input queue - /// - public async Task RegisterSubscriber(string topic, string subscriberAddress) - { - if (DoNotConfigureTopicEnabled) - { - _log.Info("Transport configured to not configure topic - skipping configuration for topic {topicName}", topic); - return; - } - - VerifyIsOwnInputQueueAddress(subscriberAddress); - - topic = _nameFormatter.FormatTopicName(topic); - - _log.Debug("Registering subscription for topic {topicName}", topic); - - await _subscriptionExceptionIgnorant.Execute(async () => - { - var topicProperties = await EnsureTopicExists(topic).ConfigureAwait(false); - var messageSender = GetMessageSender(Address); - - var inputQueuePath = messageSender.GetQueuePath(); - var topicName = topicProperties.Name; - - var subscription = await GetOrCreateSubscription(topicName, _subscriptionName).ConfigureAwait(false); - - bool ForwardToMatches() => subscription.ForwardTo == inputQueuePath; - - // if it looks fine, just skip it - if (ForwardToMatches()) return; - - subscription.ForwardTo = inputQueuePath; - - await _managementClient.UpdateSubscriptionAsync(subscription, _cancellationToken).ConfigureAwait(false); - - _log.Info("Subscription {subscriptionName} for topic {topicName} successfully registered", _subscriptionName, topic); - }, _cancellationToken); - } - - /// - /// Unregisters this endpoint as a subscriber by deleting the subscription for the given topic - /// - public async Task UnregisterSubscriber(string topic, string subscriberAddress) - { - if (DoNotConfigureTopicEnabled) - { - _log.Info("Transport configured to not configure topic - skipping configuration for topic {topicName}", topic); - return; - } - - VerifyIsOwnInputQueueAddress(subscriberAddress); - - topic = _nameFormatter.FormatTopicName(topic); - - _log.Debug("Unregistering subscription for topic {topicName}", topic); - - await _subscriptionExceptionIgnorant.Execute(async () => - { - var topicProperties = await EnsureTopicExists(topic).ConfigureAwait(false); - var topicName = topicProperties.Name; - - try - { - await _managementClient.DeleteSubscriptionAsync(topicName, _subscriptionName, _cancellationToken).ConfigureAwait(false); - - _log.Info("Subscription {subscriptionName} for topic {topicName} successfully unregistered", - _subscriptionName, topic); - } - catch (ServiceBusException) - { - // it's alright man - } - }, _cancellationToken); - } - - async Task GetOrCreateSubscription(string topicPath, string subscriptionName) - { - if (await _managementClient.SubscriptionExistsAsync(topicPath, subscriptionName, _cancellationToken).ConfigureAwait(false)) - { - return await _managementClient.GetSubscriptionAsync(topicPath, subscriptionName, _cancellationToken).ConfigureAwait(false); - } - - try - { - var options = new CreateSubscriptionOptions(topicPath, subscriptionName) - { - ForwardTo = GetMessageSender(Address).GetQueuePath() - }; - - return await _managementClient.CreateSubscriptionAsync(options, _cancellationToken).ConfigureAwait(false); - } - catch (ServiceBusException) - { - // most likely a race between two competing consumers - we should be able to get it now - return await _managementClient.GetSubscriptionAsync(topicPath, subscriptionName, _cancellationToken).ConfigureAwait(false); - } - } - - void VerifyIsOwnInputQueueAddress(string subscriberAddress) - { - if (subscriberAddress == Address) return; - - var message = $"Cannot register subscriptions endpoint with input queue '{subscriberAddress}' in endpoint with input" + - $" queue '{Address}'! The Azure Service Bus transport functions as a centralized subscription" + - " storage, which means that all subscribers are capable of managing their own subscriptions"; - - throw new ArgumentException(message); - } - - async Task EnsureTopicExists(string normalizedTopic) - { - if (await _managementClient.TopicExistsAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false)) - { - return await _managementClient.GetTopicAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false); - } - - try - { - return await _managementClient.CreateTopicAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false); - } - catch (ServiceBusException) - { - // most likely a race between two clients trying to create the same topic - we should be able to get it now - return await _managementClient.GetTopicAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false); - } - catch (Exception exception) - { - throw new ArgumentException($"Could not create topic '{normalizedTopic}'", exception); - } - } - - /// - /// Creates a queue with the given address - /// - public void CreateQueue(string address) - { - address = _nameFormatter.FormatQueueName(address); - - InnerCreateQueue(address); - } - - void InnerCreateQueue(string normalizedAddress) - { - CreateQueueOptions GetInputQueueDescription() - { - var queueOptions = new CreateQueueOptions(normalizedAddress); - - // if it's the input queue, do this: - if (normalizedAddress == Address) - { - // must be set when the queue is first created - queueOptions.EnablePartitioning = PartitioningEnabled; - - if (LockDuration.HasValue) - { - queueOptions.LockDuration = LockDuration.Value; - } - - if (DefaultMessageTimeToLive.HasValue) - { - queueOptions.DefaultMessageTimeToLive = DefaultMessageTimeToLive.Value; - } - - if (DuplicateDetectionHistoryTimeWindow.HasValue) - { - queueOptions.RequiresDuplicateDetection = true; - queueOptions.DuplicateDetectionHistoryTimeWindow = DuplicateDetectionHistoryTimeWindow.Value; - } - - if (AutoDeleteOnIdle.HasValue) - { - queueOptions.AutoDeleteOnIdle = AutoDeleteOnIdle.Value; - } - - queueOptions.MaxDeliveryCount = 100; - } - - return queueOptions; - } - - // one-way client does not create any queues - if (Address == null) - { - return; - } - - if (DoNotCreateQueuesEnabled) - { - _log.Info("Transport configured to not create queue - skipping existence check and potential creation for {queueName}", normalizedAddress); - return; - } - - AsyncHelpers.RunSync(async () => - { - if (await _managementClient.QueueExistsAsync(normalizedAddress, _cancellationToken).ConfigureAwait(false)) return; - - try - { - _log.Info("Creating ASB queue {queueName}", normalizedAddress); - - var queueDescription = GetInputQueueDescription(); - - await _managementClient.CreateQueueAsync(queueDescription, _cancellationToken).ConfigureAwait(false); - } - catch (ServiceBusException) - { - // it's alright man - } - catch (Exception exception) - { - throw new ArgumentException($"Could not create Azure Service Bus queue '{normalizedAddress}'", exception); - } - }); - } - - void CheckInputQueueConfiguration(string address) - { - if (DoNotCheckQueueConfigurationEnabled) - { - _log.Info("Transport configured to not check queue configuration - skipping existence check for {queueName}", address); - return; - } - - AsyncHelpers.RunSync(async () => - { - var queueDescription = await GetQueueDescription(address).ConfigureAwait(false); - - if (queueDescription.EnablePartitioning != PartitioningEnabled) - { - _log.Warn("The queue {queueName} has EnablePartitioning={enablePartitioning}, but the transport has PartitioningEnabled={partitioningEnabled}. As this setting cannot be changed after the queue is created, please either make sure the Rebus transport settings are consistent with the queue settings, or delete the queue and let Rebus create it again with the new settings.", - address, queueDescription.EnablePartitioning, PartitioningEnabled); - } - - if (DuplicateDetectionHistoryTimeWindow.HasValue) - { - var duplicateDetectionHistoryTimeWindow = DuplicateDetectionHistoryTimeWindow.Value; - - if (!queueDescription.RequiresDuplicateDetection || - queueDescription.DuplicateDetectionHistoryTimeWindow != duplicateDetectionHistoryTimeWindow) - { - _log.Warn("The queue {queueName} has RequiresDuplicateDetection={requiresDuplicateDetection}, but the transport has DuplicateDetectionHistoryTimeWindow={duplicateDetectionHistoryTimeWindow}. As this setting cannot be changed after the queue is created, please either make sure the Rebus transport settings are consistent with the queue settings, or delete the queue and let Rebus create it again with the new settings.", - address, queueDescription.RequiresDuplicateDetection, duplicateDetectionHistoryTimeWindow); - } - } - else - { - if (queueDescription.RequiresDuplicateDetection) - { - _log.Warn("The queue {queueName} has RequiresDuplicateDetection={requiresDuplicateDetection}, but the transport has DuplicateDetectionHistoryTimeWindow=null. As this setting cannot be changed after the queue is created, please either make sure the Rebus transport settings are consistent with the queue settings, or delete the queue and let Rebus create it again with the new settings.", - address, queueDescription.RequiresDuplicateDetection); - } - } - - var updates = new List(); - - if (DefaultMessageTimeToLive.HasValue) - { - var defaultMessageTimeToLive = DefaultMessageTimeToLive.Value; - if (queueDescription.DefaultMessageTimeToLive != defaultMessageTimeToLive) - { - queueDescription.DefaultMessageTimeToLive = defaultMessageTimeToLive; - updates.Add($"DefaultMessageTimeToLive = {defaultMessageTimeToLive}"); - } - } - - if (LockDuration.HasValue) - { - var lockDuration = LockDuration.Value; - if (queueDescription.LockDuration != lockDuration) - { - queueDescription.LockDuration = lockDuration; - updates.Add($"LockDuration = {lockDuration}"); - } - } - - if (AutoDeleteOnIdle.HasValue) - { - var autoDeleteOnIdle = AutoDeleteOnIdle.Value; - if (queueDescription.AutoDeleteOnIdle != autoDeleteOnIdle) - { - queueDescription.AutoDeleteOnIdle = autoDeleteOnIdle; - updates.Add($"AutoDeleteOnIdle = {autoDeleteOnIdle}"); - } - } - - if (!updates.Any()) return; - - if (DoNotCreateQueuesEnabled) - { - _log.Warn("Detected changes in the settings for the queue {queueName}: {updates} - but the transport is configured to NOT create queues, so no settings will be changed", address, updates); - return; - } - - _log.Info("Updating ASB queue {queueName}: {updates}", address, updates); - await _managementClient.UpdateQueueAsync(queueDescription, _cancellationToken); - }); - } - - async Task GetQueueDescription(string address) - { - try - { - return await _managementClient.GetQueueAsync(address, _cancellationToken).ConfigureAwait(false); - } - catch (Exception exception) - { - throw new RebusApplicationException(exception, $"Could not get queue description for queue {address}"); - } - } - - /// - /// - /// Sends the given message to the queue with the given - /// - public async Task Send(string destinationAddress, TransportMessage message, ITransactionContext context) - { - var actualDestinationAddress = GetActualDestinationAddress(destinationAddress, message); - var outgoingMessages = GetOutgoingMessages(context); - - outgoingMessages.Enqueue(new OutgoingMessage(actualDestinationAddress, message)); - } - - string GetActualDestinationAddress(string destinationAddress, TransportMessage message) - { - if (message.Headers.ContainsKey(Headers.DeferredUntil)) - { - if (destinationAddress == MagicDeferredMessagesAddress) - { - try - { - return message.Headers.GetValue(Headers.DeferredRecipient); - } - catch (Exception exception) - { - throw new ArgumentException($"The destination address was set to '{MagicDeferredMessagesAddress}', but no '{Headers.DeferredRecipient}' header could be found on the message", exception); - } - } - - if (message.Headers.TryGetValue(Headers.DeferredRecipient, out var deferredRecipient)) - { - return _nameFormatter.FormatQueueName(deferredRecipient); - } - } - - if (!destinationAddress.StartsWith(MagicSubscriptionPrefix)) - { - return _nameFormatter.FormatQueueName(destinationAddress); - } - - return destinationAddress; - } - - ConcurrentQueue GetOutgoingMessages(ITransactionContext context) - { - ConcurrentQueue CreateNewOutgoingMessagesQueue() - { - var messagesToSend = new ConcurrentQueue(); - - async Task SendOutgoingMessages(ITransactionContext ctx) - { - var messagesByDestinationQueue = messagesToSend.GroupBy(m => m.DestinationAddress); - - async Task SendOutgoingMessagesToDestination(IGrouping group) - { - var destinationQueue = group.Key; - var messages = group; - - if (destinationQueue.StartsWith(MagicSubscriptionPrefix)) - { - var topicName = _nameFormatter.FormatTopicName(destinationQueue.Substring(MagicSubscriptionPrefix.Length)); - var topicClient = await GetTopicClient(topicName); - var serviceBusMessageBatches = await GetBatches(messages.Select(m => _messageConverter.ToServiceBus(m.TransportMessage)), topicClient); - - using (serviceBusMessageBatches.AsDisposable(b => b.DisposeCollection())) - { - foreach (var batch in serviceBusMessageBatches) - { - try - { - await topicClient - .SendMessagesAsync(batch, CancellationToken.None) //< don't cancel when sending outgoing messages - .ConfigureAwait(false); - } - catch (Exception exception) - { - throw new RebusApplicationException(exception, $"Could not publish to topic '{topicName}'"); - } - } - } - } - else - { - var messageSender = GetMessageSender(destinationQueue); - var serviceBusMessageBatches = await GetBatches(messages.Select(m => _messageConverter.ToServiceBus(m.TransportMessage)), messageSender); - - using (serviceBusMessageBatches.AsDisposable(b => b.DisposeCollection())) - { - foreach (var batch in serviceBusMessageBatches) - { - try - { - await messageSender - .SendMessagesAsync(batch, CancellationToken.None) //< don't cancel when sending outgoing messages - .ConfigureAwait(false); - } - catch (Exception exception) - { - throw new RebusApplicationException(exception, $"Could not send to queue '{destinationQueue}'"); - } - } - } - } - } - - await Task.WhenAll(messagesByDestinationQueue.Select(SendOutgoingMessagesToDestination)).ConfigureAwait(false); - } - - context.OnCommit(SendOutgoingMessages); - - return messagesToSend; - } - - return context.GetOrAdd(OutgoingMessagesKey, CreateNewOutgoingMessagesQueue); - } - - async Task> GetBatches(IEnumerable messages, ServiceBusSender sender) - { - async ValueTask CreateMessageBatchAsync() - { - try - { - return await sender.CreateMessageBatchAsync(CancellationToken.None); //< don't cancel when sending outgoing messages - } - catch (ServiceBusException exception) when (exception.Reason == ServiceBusFailureReason.MessagingEntityNotFound) - { - throw new RebusApplicationException(exception, $"Message batch creation failed, because the messaging entity with path '{sender.EntityPath}' does not exist"); - } - } - - var batches = new List(); - var currentBatch = await CreateMessageBatchAsync(); - - foreach (var message in messages) - { - if (currentBatch.TryAddMessage(message)) continue; - - batches.Add(currentBatch); - currentBatch = await CreateMessageBatchAsync(); - - if (currentBatch.TryAddMessage(message)) continue; - - throw new ArgumentException($"The message {message} could not be added to a brand new message batch (batch max size is {currentBatch.MaxSizeInBytes} bytes)"); - } - - if (currentBatch.Count > 0) - { - batches.Add(currentBatch); - } - - return batches; - } - - /// - /// Receives the next message from the input queue. Returns null if no message was available - /// - public async Task Receive(ITransactionContext context, CancellationToken cancellationToken) - { - var receivedMessage = await ReceiveInternal().ConfigureAwait(false); - - if (receivedMessage == null) return null; - - var message = receivedMessage.Message; - var messageReceiver = receivedMessage.MessageReceiver; - - var renewFailedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken); - _messageRenewerTokenSources.AddOrUpdate(message.MessageId, renewFailedTokenSource, (_, _) => renewFailedTokenSource); - - var items = context.Items; - // add the message and its receiver to the context - items["asb-message"] = message; - items["asb-message-receiver"] = messageReceiver; - //add token that cancels when renew fails or when the pipeline token cancels - items["asb-message-cancel-token"] = renewFailedTokenSource.Token; - - if (string.IsNullOrWhiteSpace(message.LockToken)) - { - throw new RebusApplicationException($"OMG that's weird - message with ID {message.MessageId} does not have a lock token!"); - } - - var messageId = message.MessageId; - - if (AutomaticallyRenewPeekLock && !_prefetchingEnabled) - { - _messageLockRenewers.TryAdd(message.MessageId, new MessageLockRenewer(message, messageReceiver)); - } - - context.OnAck(async ctx => - { - _messageRenewerTokenSources.TryRemove(messageId, out var _); - _messageLockRenewers.TryRemove(messageId, out _); - - // only ACK the message if it's still in the context - this way, carefully crafted - // user code can take over responsibility for the message by removing it from the transaction context - if (ctx.Items.TryGetValue("asb-message", out var messageObject) && messageObject is ServiceBusReceivedMessage asbMessage) - { - var lockToken = asbMessage.LockToken; - - try - { - await messageReceiver - .CompleteMessageAsync( - message: asbMessage, - cancellationToken: CancellationToken.None //< pass none here to avoid canceling the call immediately when we're shutting down - ) - .ConfigureAwait(false); - } - catch (Exception exception) - { - throw new RebusApplicationException(exception, - $"Could not complete message with ID {messageId} and lock token {lockToken}"); - } - } - }); - - context.OnNack(async ctx => - { - _messageRenewerTokenSources.TryRemove(messageId, out var _); - _messageLockRenewers.TryRemove(messageId, out _); - - // only NACK the message if it's still in the context - this way, carefully crafted - // user code can take over responsibility for the message by removing it from the transaction context - if (!ctx.Items.TryGetValue("asb-message", out var messageObject) || messageObject is not ServiceBusReceivedMessage asbMessage) return; - - var lockToken = asbMessage.LockToken; - - try - { - var transportMessage = (TransportMessage)ctx.Items[TransportmessageItemKey]; - var propertiesToModify = transportMessage.Headers.ToDictionary(k => k.Key, v => (object)v.Value); - - await messageReceiver.AbandonMessageAsync( - message: message, - propertiesToModify: propertiesToModify, - cancellationToken: CancellationToken - .None //< pass none here to avoid canceling the call immediately when we're shutting down - ) - .ConfigureAwait(false); - } - catch (Exception exception) - { - throw new RebusApplicationException(exception, - $"Could not abandon message with ID {messageId} and lock token {lockToken}"); - } - }); - - context.OnDisposed(ctx => - { - _messageRenewerTokenSources.TryRemove(messageId, out var _); - _messageLockRenewers.TryRemove(messageId, out _); - }); - - var transportMessage = _messageConverter.ToTransport(message); - context.Items[TransportmessageItemKey] = transportMessage; - - if (NativeMessageDeliveryCountEnabled) - { - transportMessage.Headers[Headers.DeliveryCount] = message.DeliveryCount.ToString(); - } - - return transportMessage; - } - - async Task ReceiveInternal() - { - try - { - var message = await _messageReceiver.ReceiveMessageAsync(ReceiveOperationTimeout, _cancellationToken); - - return message == null ? null : new ReceivedMessage(message, _messageReceiver); - } - catch (ServiceBusException exception) - { - throw new RebusApplicationException(exception, $"Could not receive next message from Azure Service Bus queue '{Address}'"); - } - } - - /// - /// Gets the input queue name for this transport - /// - public string Address { get; } - - /// - /// Initializes the transport by ensuring that the input queue has been created - /// - /// - public void Initialize() - { - _disposables.Push(_messageLockRenewalTask); - - if (Address == null) - { - _log.Info("Initializing one-way Azure Service Bus transport"); - return; - } - - _log.Info("Initializing Azure Service Bus transport with queue {queueName}", Address); - - InnerCreateQueue(Address); - - CheckInputQueueConfiguration(Address); - - var receiverOptions = new ServiceBusReceiverOptions - { - PrefetchCount = _prefetchCount, - ReceiveMode = ServiceBusReceiveMode.PeekLock - }; - - _messageReceiver = _client.CreateReceiver(Address, receiverOptions); - - _disposables.Push(_messageReceiver.AsDisposable(m => AsyncHelpers.RunSync(async () => - { - try - { - await m.CloseAsync(CancellationToken.None).ConfigureAwait(false); - } - catch (Exception) - { - // don't care - } - }))); - - if (AutomaticallyRenewPeekLock) - { - _messageLockRenewalTask.Start(); - } - } - - /// - /// Always returns true because Azure Service Bus topics and subscriptions are global - /// - public bool IsCentralized => true; - - /// - /// Enables automatic peek lock renewal - only recommended if you truly need to handle messages for a very long time - /// - public bool AutomaticallyRenewPeekLock { get; set; } - - /// - /// Gets/sets whether partitioning should be enabled on new queues. Only takes effect for queues created - /// after the property has been enabled - /// - public bool PartitioningEnabled { get; set; } - - /// - /// Gets/sets whether to skip creating queues - /// - public bool DoNotCreateQueuesEnabled { get; set; } - - /// - /// Gets/sets whether to skip checking queues configuration - /// - public bool DoNotCheckQueueConfigurationEnabled { get; set; } - - /// - /// Gets/sets whether to skip checking topics configuration - /// - public bool DoNotConfigureTopicEnabled { get; set; } - - /// - /// Gets/sets the default message TTL. Must be set before calling , because that is the time when the queue is (re)configured - /// - public TimeSpan? DefaultMessageTimeToLive { get; set; } - - /// - /// Gets/sets message peek lock duration - /// - public TimeSpan? LockDuration { get; set; } - - /// - /// Gets/sets auto-delete-on-idle duration - /// - public TimeSpan? AutoDeleteOnIdle { get; set; } - - /// - /// Gets/sets the duplicate detection window - /// - public TimeSpan? DuplicateDetectionHistoryTimeWindow { get; set; } - - /// - /// Gets/sets the receive timeout. - /// - public TimeSpan ReceiveOperationTimeout { get; set; } = TimeSpan.FromMinutes(1); - - /// - /// Gets/sets whether native message delivery count is used - /// - public bool NativeMessageDeliveryCountEnabled { get; set; } - - /// - /// Purges the input queue by receiving all messages as quickly as possible - /// - public void PurgeInputQueue() - { - var queueName = Address; - - if (string.IsNullOrWhiteSpace(queueName)) - { - throw new InvalidOperationException("Cannot 'purge input queue' because there's no input queue name – it's most likely because this is a one-way client, and hence there is no input queue"); - } - - PurgeQueue(queueName); - } - - /// - /// Configures the transport to prefetch the specified number of messages into an in-mem queue for processing, disabling automatic peek lock renewal - /// - public void PrefetchMessages(int prefetchCount) - { - if (prefetchCount < 0) - { - throw new ArgumentOutOfRangeException(nameof(prefetchCount), prefetchCount, "Must prefetch zero or more messages"); - } - - _prefetchingEnabled = prefetchCount > 0; - _prefetchCount = prefetchCount; - } - - /// - /// Disposes all resources associated with this particular transport instance - /// - public void Dispose() - { - var disposables = new List(); - - while (_disposables.TryPop(out var disposable)) - { - disposables.Add(disposable); - } - - Parallel.ForEach(disposables, d => d.Dispose()); - } - - void PurgeQueue(string queueName) - { - try - { - AsyncHelpers.RunSync(() => ManagementExtensions.PurgeQueue(_connectionStringParser.GetConnectionString(), queueName, _cancellationToken)); - } - catch (OperationCanceledException) when (_cancellationToken.IsCancellationRequested) - { - // we're on our way out - let cancellation bubble out - throw; - } - catch (Exception exception) - { - throw new ArgumentException($"Could not purge queue '{queueName}'", exception); - } - } - - ServiceBusSender GetMessageSender(string queue) => _messageSenders.GetOrAdd(queue, _ => new(() => - { - var messageSender = _client.CreateSender(queue); - - _disposables.Push(messageSender.AsDisposable(t => AsyncHelpers.RunSync(async () => - { - try - { - await t.CloseAsync(CancellationToken.None).ConfigureAwait(false); - } - catch (Exception) - { - // don't care - } - }))); - - return messageSender; - - })).Value; - - async Task GetTopicClient(string topic) => await _topicClients.GetOrAdd(topic, _ => new(async () => - { - if(!DoNotConfigureTopicEnabled) - { - await EnsureTopicExists(topic); - } - - var topicClient = _client.CreateSender(topic); - - _disposables.Push(topicClient.AsDisposable(t => AsyncHelpers.RunSync(async () => - { - try - { - await t.CloseAsync(CancellationToken.None).ConfigureAwait(false); - } - catch (Exception) - { - // don't care - } - }))); - - return topicClient; - })).Value; - - async Task RenewPeekLocks() - { - var mustBeRenewed = _messageLockRenewers - .Where(r => r.Value.IsDue) - .Select(kvp => kvp.Value) - .ToList(); - - if (!mustBeRenewed.Any()) return; - - _log.Debug("Found {count} peek locks to be renewed", mustBeRenewed.Count); - - await Task.WhenAll(mustBeRenewed.Select(async r => - { - try - { - await r.Renew().ConfigureAwait(false); - - _log.Debug("Successfully renewed peek lock for message with ID {messageId}", r.MessageId); - } - catch (Exception exception) - { - // if an exception occurs, check if the peek lock renewer is still in the dictionary of active - // peek lock renewers - if it isn't, then the message must have been completed/abandoned in the meantime, - // and then it's not an error that the peek lock could not be renewed - if (!_messageLockRenewers.ContainsKey(r.MessageId)) return; - - _log.Warn(exception, "Error when renewing peek lock for message with ID {messageId}", r.MessageId); - - if (_messageRenewerTokenSources.TryGetValue(r.MessageId, out var renewFailedTokenSource)) - { - renewFailedTokenSource.Cancel(); - } - - - // peek lock renewal will be automatically retried, because it's still due for renewal - } - })); - } -} From e34ecddb319f3b98ed451a856e74a01208572a17 Mon Sep 17 00:00:00 2001 From: Henrik Drachmann Date: Fri, 27 Jun 2025 13:11:45 +0200 Subject: [PATCH 5/6] formatting --- .../AzureServiceBus/AzureServiceBusTransport.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs index a84f965..a5de12c 100644 --- a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs +++ b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs @@ -19,7 +19,6 @@ using Rebus.Threading; using Rebus.Transport; - // ReSharper disable RedundantArgumentDefaultValue // ReSharper disable ArgumentsStyleNamedExpression // ReSharper disable ArgumentsStyleOther @@ -154,7 +153,6 @@ public async Task RegisterSubscriber(string topic, string subscriberAddress) _log.Info("Transport configured to not configure topic - skipping configuration for topic {topicName}", topic); return; } - VerifyIsOwnInputQueueAddress(subscriberAddress); topic = _nameFormatter.FormatTopicName(topic); From 4cf402fad419452d6b13da762ab386b91374561b Mon Sep 17 00:00:00 2001 From: Henrik Drachmann Date: Fri, 27 Jun 2025 13:19:46 +0200 Subject: [PATCH 6/6] updated to dispose the tokensource --- .../AzureServiceBus/AzureServiceBusTransport.cs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs index a5de12c..1f4cfec 100644 --- a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs +++ b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs @@ -641,7 +641,8 @@ public async Task Receive(ITransactionContext context, Cancell context.OnAck(async ctx => { - _messageRenewerTokenSources.TryRemove(messageId, out var _); + _messageRenewerTokenSources.TryRemove(messageId, out var tokenSource); + tokenSource?.Dispose(); _messageLockRenewers.TryRemove(messageId, out _); // only ACK the message if it's still in the context - this way, carefully crafted @@ -669,7 +670,8 @@ await messageReceiver context.OnNack(async ctx => { - _messageRenewerTokenSources.TryRemove(messageId, out var _); + _messageRenewerTokenSources.TryRemove(messageId, out var tokenSource); + tokenSource?.Dispose(); _messageLockRenewers.TryRemove(messageId, out _); // only NACK the message if it's still in the context - this way, carefully crafted @@ -700,7 +702,8 @@ await messageReceiver.AbandonMessageAsync( context.OnDisposed(ctx => { - _messageRenewerTokenSources.TryRemove(messageId, out var _); + _messageRenewerTokenSources.TryRemove(messageId, out var tokenSource); + tokenSource?.Dispose(); _messageLockRenewers.TryRemove(messageId, out _); }); @@ -924,7 +927,7 @@ void PurgeQueue(string queueName) async Task GetTopicClient(string topic) => await _topicClients.GetOrAdd(topic, _ => new(async () => { - if(!DoNotConfigureTopicEnabled) + if (!DoNotConfigureTopicEnabled) { await EnsureTopicExists(topic); } @@ -979,8 +982,7 @@ await Task.WhenAll(mustBeRenewed.Select(async r => renewFailedTokenSource.Cancel(); } - - // peek lock renewal will be automatically retried, because it's still due for renewal + // peek lock renewal will be automatically retried if no-one is looking at the cancellationToken , because it's still due for renewal } })); }