Skip to content

Commit 7a4a4ea

Browse files
committed
Add per-message-type topic routing and optimize topic creation
- Add TopicResolver delegate to SQSMessageBusOptions for routing messages to different SNS topics based on message type - Implement memoization of topic ARNs using ConcurrentDictionary to avoid redundant AWS API calls - Optimize CreateTopicImplAsync to use CreateTopicAsync directly instead of FindTopicAsync + CreateTopicAsync (CreateTopicAsync is idempotent) - Remove unnecessary constructor from SQSMessageBusOptions - Modernize tests: convert message types to records, use range syntax, add TestCancellationToken to all async calls, fix empty catch block - Add comprehensive tests for topic routing, memoization, and concurrency
1 parent e182d7b commit 7a4a4ea

File tree

3 files changed

+411
-57
lines changed

3 files changed

+411
-57
lines changed

src/Foundatio.AWS/Messaging/SQSMessageBus.cs

Lines changed: 84 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.Collections.Generic;
34
using System.Text;
45
using System.Text.Json.Nodes;
@@ -64,7 +65,7 @@ public class SQSMessageBus : MessageBusBase<SQSMessageBusOptions>, IAsyncDisposa
6465
private readonly AsyncLock _lock = new();
6566
private readonly Lazy<AmazonSimpleNotificationServiceClient> _snsClient;
6667
private readonly Lazy<AmazonSQSClient> _sqsClient;
67-
private string _topicArn;
68+
private readonly ConcurrentDictionary<string, string> _topicArns = new();
6869
private string _queueUrl;
6970
private string _queueArn;
7071
private string _subscriptionArn;
@@ -147,61 +148,97 @@ public SQSMessageBus(Builder<SQSMessageBusOptionsBuilder, SQSMessageBusOptions>
147148
/// </remarks>
148149
protected override async Task EnsureTopicCreatedAsync(CancellationToken cancellationToken)
149150
{
150-
if (!String.IsNullOrEmpty(_topicArn))
151-
return;
151+
await GetOrCreateTopicArnAsync(_options.Topic, cancellationToken).AnyContext();
152+
}
153+
154+
/// <summary>
155+
/// Gets the topic name for a given message type using the configured TopicResolver.
156+
/// </summary>
157+
/// <param name="messageType">The CLR type of the message.</param>
158+
/// <returns>The resolved topic name, or the default topic if no resolver is configured or it returns null.</returns>
159+
private string GetTopicName(Type messageType)
160+
{
161+
if (_options.TopicResolver is null)
162+
return _options.Topic;
163+
164+
return _options.TopicResolver(messageType) ?? _options.Topic;
165+
}
166+
167+
/// <summary>
168+
/// Gets or creates the SNS topic ARN for the specified topic name, with memoization.
169+
/// </summary>
170+
/// <param name="topicName">The name of the SNS topic.</param>
171+
/// <param name="cancellationToken">A cancellation token.</param>
172+
/// <returns>The ARN of the SNS topic.</returns>
173+
/// <remarks>
174+
/// This method uses double-check locking to ensure thread-safe topic creation.
175+
/// Topic ARNs are cached in a ConcurrentDictionary to avoid redundant AWS API calls.
176+
/// </remarks>
177+
private async Task<string> GetOrCreateTopicArnAsync(string topicName, CancellationToken cancellationToken)
178+
{
179+
ArgumentException.ThrowIfNullOrEmpty(topicName, nameof(topicName));
180+
181+
if (_topicArns.TryGetValue(topicName, out string arn))
182+
return arn;
152183

153184
using (await _lock.LockAsync(cancellationToken).AnyContext())
154185
{
155-
await EnsureTopicCreatedImplAsync(cancellationToken).AnyContext();
186+
// Double-check after acquiring lock
187+
if (_topicArns.TryGetValue(topicName, out arn))
188+
return arn;
189+
190+
arn = await CreateTopicImplAsync(topicName, cancellationToken).AnyContext();
191+
_topicArns[topicName] = arn;
192+
return arn;
156193
}
157194
}
158195

159196
/// <summary>
160197
/// Internal implementation of topic creation. Caller must hold the lock.
161198
/// </summary>
162-
private async Task EnsureTopicCreatedImplAsync(CancellationToken cancellationToken)
199+
private async Task<string> CreateTopicImplAsync(string topicName, CancellationToken cancellationToken)
163200
{
164-
if (!String.IsNullOrEmpty(_topicArn))
165-
return;
201+
_logger.LogTrace("Ensuring SNS topic {Topic} exists", topicName);
166202

167-
_logger.LogTrace("Ensuring SNS topic {Topic} exists", _options.Topic);
168-
169-
try
203+
if (!_options.CanCreateTopic)
170204
{
171-
var findResponse = await _snsClient.Value.FindTopicAsync(_options.Topic).AnyContext();
172-
if (findResponse?.TopicArn is not null)
205+
// Only check if topic exists when we can't create it
206+
try
173207
{
174-
_topicArn = findResponse.TopicArn;
175-
_logger.LogDebug("Found existing SNS topic {Topic} with ARN {TopicArn}", _options.Topic, _topicArn);
176-
return;
208+
var findResponse = await _snsClient.Value.FindTopicAsync(topicName).AnyContext();
209+
if (findResponse?.TopicArn is not null)
210+
{
211+
_logger.LogDebug("Found existing SNS topic {Topic} with ARN {TopicArn}", topicName, findResponse.TopicArn);
212+
return findResponse.TopicArn;
213+
}
214+
}
215+
catch (SnsNotFoundException)
216+
{
217+
// Topic not found and we can't create it
218+
}
219+
catch (AmazonServiceException ex)
220+
{
221+
_logger.LogWarning(ex, "Error finding topic {Topic}", topicName);
177222
}
178-
}
179-
catch (SnsNotFoundException)
180-
{
181-
_logger.LogTrace("Topic {Topic} not found, will create", _options.Topic);
182-
}
183-
catch (AmazonServiceException ex)
184-
{
185-
// Log as warning since this is an unexpected AWS error, not just "not found"
186-
_logger.LogWarning(ex, "Error finding topic {Topic}, will attempt to create", _options.Topic);
187-
}
188223

189-
if (!_options.CanCreateTopic)
190-
throw new MessageBusException($"Topic {_options.Topic} does not exist and CanCreateTopic is false.");
224+
throw new MessageBusException($"Topic {topicName} does not exist and CanCreateTopic is false.");
225+
}
191226

227+
// CreateTopicAsync is idempotent - if topic exists, it returns the existing ARN
228+
// This is much faster than FindTopicAsync which lists all topics
192229
try
193230
{
194231
var createResponse = await _snsClient.Value.CreateTopicAsync(new CreateTopicRequest
195232
{
196-
Name = _options.Topic
233+
Name = topicName
197234
}, cancellationToken).AnyContext();
198235

199-
_topicArn = createResponse.TopicArn;
200-
_logger.LogDebug("Created SNS topic {Topic} with ARN {TopicArn}", _options.Topic, _topicArn);
236+
_logger.LogDebug("Ensured SNS topic {Topic} exists with ARN {TopicArn}", topicName, createResponse.TopicArn);
237+
return createResponse.TopicArn;
201238
}
202239
catch (AmazonServiceException ex)
203240
{
204-
throw new MessageBusException($"Failed to create SNS topic {_options.Topic}: {ex.Message}", ex);
241+
throw new MessageBusException($"Failed to create SNS topic {topicName}: {ex.Message}", ex);
205242
}
206243
}
207244

@@ -241,9 +278,8 @@ protected override async Task EnsureTopicSubscriptionAsync(CancellationToken can
241278
if (_subscriberTask is not null)
242279
return;
243280

244-
// Ensure topic exists inside the lock to avoid race conditions
245-
if (String.IsNullOrEmpty(_topicArn))
246-
await EnsureTopicCreatedImplAsync(cancellationToken).AnyContext();
281+
// Get or create the default topic for subscription
282+
string topicArn = await GetOrCreateTopicArnAsync(_options.Topic, cancellationToken).AnyContext();
247283

248284
string queueName = GetSubscriptionQueueName();
249285
_logger.LogTrace("Ensuring SQS queue {QueueName} exists for subscription", queueName);
@@ -307,12 +343,12 @@ protected override async Task EnsureTopicSubscriptionAsync(CancellationToken can
307343
_queueArn = getAttributesResponse.QueueARN;
308344

309345
// Check if policy already allows this SNS topic
310-
string expectedSid = $"AllowSNS-{_topicArn.GetHashCode():X8}";
346+
string expectedSid = $"AllowSNS-{topicArn.GetHashCode():X8}";
311347
bool policyNeedsUpdate = !PolicyContainsStatement(getAttributesResponse.Policy, expectedSid);
312348

313349
if (policyNeedsUpdate)
314350
{
315-
string policy = GetMergedQueuePolicy(getAttributesResponse.Policy, _queueArn, _topicArn);
351+
string policy = GetMergedQueuePolicy(getAttributesResponse.Policy, _queueArn, topicArn);
316352

317353
await _sqsClient.Value.SetQueueAttributesAsync(new SetQueueAttributesRequest
318354
{
@@ -323,12 +359,12 @@ await _sqsClient.Value.SetQueueAttributesAsync(new SetQueueAttributesRequest
323359
}
324360
}, cancellationToken).AnyContext();
325361

326-
_logger.LogDebug("Updated SQS queue policy to allow SNS topic {TopicArn}", _topicArn);
362+
_logger.LogDebug("Updated SQS queue policy to allow SNS topic {TopicArn}", topicArn);
327363
}
328364

329365
var subscribeResponse = await _snsClient.Value.SubscribeAsync(new SubscribeRequest
330366
{
331-
TopicArn = _topicArn,
367+
TopicArn = topicArn,
332368
Protocol = "sqs",
333369
Endpoint = _queueArn,
334370
Attributes = new Dictionary<string, string>
@@ -338,7 +374,7 @@ await _sqsClient.Value.SetQueueAttributesAsync(new SetQueueAttributesRequest
338374
}, cancellationToken).AnyContext();
339375

340376
_subscriptionArn = subscribeResponse.SubscriptionArn;
341-
_logger.LogDebug("Subscribed SQS queue {QueueArn} to SNS topic {TopicArn} with subscription {SubscriptionArn}", _queueArn, _topicArn, _subscriptionArn);
377+
_logger.LogDebug("Subscribed SQS queue {QueueArn} to SNS topic {TopicArn} with subscription {SubscriptionArn}", _queueArn, topicArn, _subscriptionArn);
342378

343379
_subscriberCts = CancellationTokenSource.CreateLinkedTokenSource(DisposedCancellationToken);
344380
_subscriberTask = Task.Run(() => SubscriberLoopAsync(_subscriberCts.Token), _subscriberCts.Token);
@@ -588,12 +624,17 @@ protected override async Task PublishImplAsync(string messageType, object messag
588624
return;
589625
}
590626

591-
_logger.LogTrace("Publishing message: {MessageType}", messageType);
627+
// Resolve the topic name based on message type
628+
Type clrMessageType = GetMappedMessageType(messageType);
629+
string topicName = clrMessageType is not null ? GetTopicName(clrMessageType) : _options.Topic;
630+
string topicArn = await GetOrCreateTopicArnAsync(topicName, cancellationToken).AnyContext();
631+
632+
_logger.LogTrace("Publishing message: {MessageType} to topic {Topic}", messageType, topicName);
592633

593634
string messageBody = _serializer.SerializeToString(message);
594635
var publishRequest = new PublishRequest
595636
{
596-
TopicArn = _topicArn,
637+
TopicArn = topicArn,
597638
Message = messageBody,
598639
MessageAttributes = new Dictionary<string, SnsMessageAttributeValue>
599640
{
@@ -643,11 +684,11 @@ await _resiliencePolicy.ExecuteAsync(async _ =>
643684
}
644685
catch (SnsNotFoundException ex)
645686
{
646-
throw new MessageBusException($"SNS topic {_options.Topic} not found: {ex.Message}", ex);
687+
throw new MessageBusException($"SNS topic {topicName} not found: {ex.Message}", ex);
647688
}
648689
catch (AmazonServiceException ex)
649690
{
650-
throw new MessageBusException($"Failed to publish message to SNS topic {_options.Topic}: {ex.Message}", ex);
691+
throw new MessageBusException($"Failed to publish message to SNS topic {topicName}: {ex.Message}", ex);
651692
}
652693
}, cancellationToken).AnyContext();
653694
}

src/Foundatio.AWS/Messaging/SQSMessageBusOptions.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ public class SQSMessageBusOptions : SharedMessageBusOptions
7676
/// The KMS data key reuse period in seconds. Default is 300 seconds (5 minutes).
7777
/// </summary>
7878
public int KmsDataKeyReusePeriodSeconds { get; set; } = 300;
79+
80+
/// <summary>
81+
/// Optional function to resolve the SNS topic name for a given message type.
82+
/// If not set or returns null, falls back to the default <see cref="SharedMessageBusOptions.Topic"/>.
83+
/// This enables routing different message types to different SNS topics.
84+
/// </summary>
85+
public Func<Type, string> TopicResolver { get; set; }
7986
}
8087

8188
public class SQSMessageBusOptionsBuilder : SharedMessageBusOptionsBuilder<SQSMessageBusOptions, SQSMessageBusOptionsBuilder>
@@ -246,6 +253,20 @@ public SQSMessageBusOptionsBuilder UseSqsManagedEncryption()
246253
return this;
247254
}
248255

256+
/// <summary>
257+
/// Sets a function to resolve the SNS topic name for a given message type.
258+
/// This enables routing different message types to different SNS topics.
259+
/// </summary>
260+
/// <param name="resolver">
261+
/// A function that takes a message type and returns the topic name.
262+
/// Return null to use the default topic.
263+
/// </param>
264+
public SQSMessageBusOptionsBuilder TopicResolver(Func<Type, string> resolver)
265+
{
266+
Target.TopicResolver = resolver;
267+
return this;
268+
}
269+
249270
/// <inheritdoc />
250271
public override SQSMessageBusOptions Build()
251272
{

0 commit comments

Comments
 (0)