Skip to content

Adding CancellationToken to context.Items to subscribe #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class AzureServiceBusTransport : ITransport, IInitializable, IDisposable,
readonly ExceptionIgnorant _subscriptionExceptionIgnorant = new ExceptionIgnorant(maxAttemps: 10).Ignore<ServiceBusException>(ex => ex.IsTransient);
readonly ConcurrentStack<IDisposable> _disposables = new();
readonly ConcurrentDictionary<string, MessageLockRenewer> _messageLockRenewers = new();
readonly ConcurrentDictionary<string, CancellationTokenSource> _messageRenewerTokenSources = new();
readonly ConcurrentDictionary<string, string[]> _cachedSubscriberAddresses = new();
readonly ConcurrentDictionary<string, Lazy<ServiceBusSender>> _messageSenders = new();
readonly ConcurrentDictionary<string, Lazy<Task<ServiceBusSender>>> _topicClients = new();
Expand All @@ -73,7 +74,6 @@ public class AzureServiceBusTransport : ITransport, IInitializable, IDisposable,
readonly string _subscriptionName;
readonly ILog _log;
readonly ServiceBusClient _client;

bool _prefetchingEnabled;
int _prefetchCount;

Expand Down Expand Up @@ -153,7 +153,6 @@ public async Task RegisterSubscriber(string topic, string subscriberAddress)
_log.Info("Transport configured to not configure topic - skipping configuration for topic {topicName}", topic);
return;
}

VerifyIsOwnInputQueueAddress(subscriberAddress);

topic = _nameFormatter.FormatTopicName(topic);
Expand Down Expand Up @@ -193,7 +192,7 @@ public async Task UnregisterSubscriber(string topic, string subscriberAddress)
_log.Info("Transport configured to not configure topic - skipping configuration for topic {topicName}", topic);
return;
}

VerifyIsOwnInputQueueAddress(subscriberAddress);

topic = _nameFormatter.FormatTopicName(topic);
Expand Down Expand Up @@ -618,11 +617,15 @@ public async Task<TransportMessage> Receive(ITransactionContext context, Cancell
var message = receivedMessage.Message;
var messageReceiver = receivedMessage.MessageReceiver;

var items = context.Items;
var renewFailedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken);
_messageRenewerTokenSources.AddOrUpdate(message.MessageId, renewFailedTokenSource, (_, _) => renewFailedTokenSource);

var items = context.Items;
// add the message and its receiver to the context
items["asb-message"] = message;
items["asb-message-receiver"] = messageReceiver;
//add token that cancels when renew fails or when the pipeline token cancels
items["asb-message-cancel-token"] = renewFailedTokenSource.Token;

if (string.IsNullOrWhiteSpace(message.LockToken))
{
Expand All @@ -638,6 +641,8 @@ public async Task<TransportMessage> Receive(ITransactionContext context, Cancell

context.OnAck(async ctx =>
{
_messageRenewerTokenSources.TryRemove(messageId, out var tokenSource);
tokenSource?.Dispose();
_messageLockRenewers.TryRemove(messageId, out _);

// only ACK the message if it's still in the context - this way, carefully crafted
Expand Down Expand Up @@ -665,6 +670,8 @@ await messageReceiver

context.OnNack(async ctx =>
{
_messageRenewerTokenSources.TryRemove(messageId, out var tokenSource);
tokenSource?.Dispose();
_messageLockRenewers.TryRemove(messageId, out _);

// only NACK the message if it's still in the context - this way, carefully crafted
Expand Down Expand Up @@ -693,7 +700,12 @@ await messageReceiver.AbandonMessageAsync(
}
});

context.OnDisposed(ctx => _messageLockRenewers.TryRemove(messageId, out _));
context.OnDisposed(ctx =>
{
_messageRenewerTokenSources.TryRemove(messageId, out var tokenSource);
tokenSource?.Dispose();
_messageLockRenewers.TryRemove(messageId, out _);
});

var transportMessage = _messageConverter.ToTransport(message);
context.Items[TransportmessageItemKey] = transportMessage;
Expand All @@ -702,7 +714,7 @@ await messageReceiver.AbandonMessageAsync(
{
transportMessage.Headers[Headers.DeliveryCount] = message.DeliveryCount.ToString();
}

return transportMessage;
}

Expand Down Expand Up @@ -796,7 +808,7 @@ public void Initialize()
/// Gets/sets whether to skip checking queues configuration
/// </summary>
public bool DoNotCheckQueueConfigurationEnabled { get; set; }

/// <summary>
/// Gets/sets whether to skip checking topics configuration
/// </summary>
Expand Down Expand Up @@ -915,7 +927,7 @@ void PurgeQueue(string queueName)

async Task<ServiceBusSender> GetTopicClient(string topic) => await _topicClients.GetOrAdd(topic, _ => new(async () =>
{
if(!DoNotConfigureTopicEnabled)
if (!DoNotConfigureTopicEnabled)
{
await EnsureTopicExists(topic);
}
Expand Down Expand Up @@ -965,7 +977,12 @@ await Task.WhenAll(mustBeRenewed.Select(async r =>

_log.Warn(exception, "Error when renewing peek lock for message with ID {messageId}", r.MessageId);

// peek lock renewal will be automatically retried, because it's still due for renewal
if (_messageRenewerTokenSources.TryGetValue(r.MessageId, out var renewFailedTokenSource))
{
renewFailedTokenSource.Cancel();
}

// peek lock renewal will be automatically retried if no-one is looking at the cancellationToken , because it's still due for renewal
}
}));
}
Expand Down