Skip to content

Commit 7b581c2

Browse files
Use DisposedCancellationToken for message bus topic/subscription setup
EnsureTopicCreatedAsync and EnsureTopicSubscriptionAsync are one-time infrastructure setup methods. Previously they received the caller's cancellation token (via the linked token in PublishAsync, or directly in SubscribeAsync). This meant that if an individual caller cancelled their publish or subscribe operation, topic creation or subscription setup could be aborted mid-way, leaving infrastructure in a half-created state for subsequent callers — the same class of bug fixed in queue creation (99e3353). Now both setup methods receive DisposedCancellationToken, ensuring they only abort when the message bus itself is being disposed. The actual publish and subscribe operations continue to use the linked/caller token so they remain individually cancellable. Affected concrete implementations (Azure Service Bus, RabbitMQ, SQS, Kafka, Redis) all receive the token through the overridden method parameter, so their internal lock acquisitions and network calls automatically benefit from this fix. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 65e3116 commit 7b581c2

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

src/Foundatio/Messaging/MessageBusBase.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ public async Task PublishAsync(Type messageType, object message, MessageOptions
8787
using var linkedCancellationTokenSource = GetLinkedDisposableCancellationTokenSource(cancellationToken);
8888
try
8989
{
90-
await EnsureTopicCreatedAsync(linkedCancellationTokenSource.Token).AnyContext();
90+
// Use DisposedCancellationToken for setup: topic creation should only abort on disposal,
91+
// not due to an individual caller's cancellation token.
92+
await EnsureTopicCreatedAsync(DisposedCancellationToken).AnyContext();
9193
await PublishImplAsync(GetMappedMessageType(messageType), message, options, linkedCancellationTokenSource.Token).AnyContext();
9294
}
9395
catch (Exception ex) when (ex is not OperationCanceledException and not MessageBusException)
@@ -194,7 +196,9 @@ public async Task SubscribeAsync<T>(Func<T, CancellationToken, Task> handler, Ca
194196
_logger.LogTrace("Adding subscriber for {MessageType}", typeof(T).FullName);
195197

196198
await SubscribeImplAsync(handler, cancellationToken).AnyContext();
197-
await EnsureTopicSubscriptionAsync(cancellationToken).AnyContext();
199+
// Use DisposedCancellationToken for setup: subscription infrastructure should only abort on disposal,
200+
// not due to the caller's cancellation token.
201+
await EnsureTopicSubscriptionAsync(DisposedCancellationToken).AnyContext();
198202
}
199203

200204
protected List<Subscriber> GetMessageSubscribers(IMessage message)

0 commit comments

Comments
 (0)