diff --git a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs index 891d516..1f4cfec 100644 --- a/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs +++ b/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs @@ -61,6 +61,7 @@ public class AzureServiceBusTransport : ITransport, IInitializable, IDisposable, readonly ExceptionIgnorant _subscriptionExceptionIgnorant = new ExceptionIgnorant(maxAttemps: 10).Ignore(ex => ex.IsTransient); readonly ConcurrentStack _disposables = new(); readonly ConcurrentDictionary _messageLockRenewers = new(); + readonly ConcurrentDictionary _messageRenewerTokenSources = new(); readonly ConcurrentDictionary _cachedSubscriberAddresses = new(); readonly ConcurrentDictionary> _messageSenders = new(); readonly ConcurrentDictionary>> _topicClients = new(); @@ -73,7 +74,6 @@ public class AzureServiceBusTransport : ITransport, IInitializable, IDisposable, readonly string _subscriptionName; readonly ILog _log; readonly ServiceBusClient _client; - bool _prefetchingEnabled; int _prefetchCount; @@ -153,7 +153,6 @@ public async Task RegisterSubscriber(string topic, string subscriberAddress) _log.Info("Transport configured to not configure topic - skipping configuration for topic {topicName}", topic); return; } - VerifyIsOwnInputQueueAddress(subscriberAddress); topic = _nameFormatter.FormatTopicName(topic); @@ -193,7 +192,7 @@ public async Task UnregisterSubscriber(string topic, string subscriberAddress) _log.Info("Transport configured to not configure topic - skipping configuration for topic {topicName}", topic); return; } - + VerifyIsOwnInputQueueAddress(subscriberAddress); topic = _nameFormatter.FormatTopicName(topic); @@ -618,11 +617,15 @@ public async Task Receive(ITransactionContext context, Cancell var message = receivedMessage.Message; var messageReceiver = receivedMessage.MessageReceiver; - var items = context.Items; + var renewFailedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken); + _messageRenewerTokenSources.AddOrUpdate(message.MessageId, renewFailedTokenSource, (_, _) => renewFailedTokenSource); + var items = context.Items; // add the message and its receiver to the context items["asb-message"] = message; items["asb-message-receiver"] = messageReceiver; + //add token that cancels when renew fails or when the pipeline token cancels + items["asb-message-cancel-token"] = renewFailedTokenSource.Token; if (string.IsNullOrWhiteSpace(message.LockToken)) { @@ -638,6 +641,8 @@ public async Task Receive(ITransactionContext context, Cancell context.OnAck(async ctx => { + _messageRenewerTokenSources.TryRemove(messageId, out var tokenSource); + tokenSource?.Dispose(); _messageLockRenewers.TryRemove(messageId, out _); // only ACK the message if it's still in the context - this way, carefully crafted @@ -665,6 +670,8 @@ await messageReceiver context.OnNack(async ctx => { + _messageRenewerTokenSources.TryRemove(messageId, out var tokenSource); + tokenSource?.Dispose(); _messageLockRenewers.TryRemove(messageId, out _); // only NACK the message if it's still in the context - this way, carefully crafted @@ -693,7 +700,12 @@ await messageReceiver.AbandonMessageAsync( } }); - context.OnDisposed(ctx => _messageLockRenewers.TryRemove(messageId, out _)); + context.OnDisposed(ctx => + { + _messageRenewerTokenSources.TryRemove(messageId, out var tokenSource); + tokenSource?.Dispose(); + _messageLockRenewers.TryRemove(messageId, out _); + }); var transportMessage = _messageConverter.ToTransport(message); context.Items[TransportmessageItemKey] = transportMessage; @@ -702,7 +714,7 @@ await messageReceiver.AbandonMessageAsync( { transportMessage.Headers[Headers.DeliveryCount] = message.DeliveryCount.ToString(); } - + return transportMessage; } @@ -796,7 +808,7 @@ public void Initialize() /// Gets/sets whether to skip checking queues configuration /// public bool DoNotCheckQueueConfigurationEnabled { get; set; } - + /// /// Gets/sets whether to skip checking topics configuration /// @@ -915,7 +927,7 @@ void PurgeQueue(string queueName) async Task GetTopicClient(string topic) => await _topicClients.GetOrAdd(topic, _ => new(async () => { - if(!DoNotConfigureTopicEnabled) + if (!DoNotConfigureTopicEnabled) { await EnsureTopicExists(topic); } @@ -965,7 +977,12 @@ await Task.WhenAll(mustBeRenewed.Select(async r => _log.Warn(exception, "Error when renewing peek lock for message with ID {messageId}", r.MessageId); - // peek lock renewal will be automatically retried, because it's still due for renewal + if (_messageRenewerTokenSources.TryGetValue(r.MessageId, out var renewFailedTokenSource)) + { + renewFailedTokenSource.Cancel(); + } + + // peek lock renewal will be automatically retried if no-one is looking at the cancellationToken , because it's still due for renewal } })); }