Skip to content

Conversation

@niemyjski
Copy link
Member

@niemyjski niemyjski commented Jan 30, 2026

Summary

  • Implement \IMessageBus\ using AWS SNS for publishing and SQS for subscribing
  • Enable true pub/sub semantics with SNS fan-out pattern where all subscribers receive all messages
  • Support durable subscriptions via configurable queue names that persist across restarts
  • Merge queue policies to preserve existing permissions (important for multi-process scenarios)
  • Add SQS managed (SSE-SQS) and KMS encryption support
  • Provide connection string configuration and DI extensions for easy setup
  • Fix subscriber CancellationTokenSource disposal order in \RemoveTopicSubscriptionAsync\
  • Add \ValueTask\ overloads for \AnyContext()\ extension methods
  • Add per-message-type topic routing via \TopicResolver\ delegate - allows routing different message types to different SNS topics
  • Optimize topic creation - use \CreateTopicAsync\ directly (idempotent) instead of \FindTopicAsync\ + \CreateTopicAsync\ for better performance
  • Memoize topic ARNs - cache topic ARNs in \ConcurrentDictionary\ to avoid redundant AWS API calls

Test plan

  • All existing tests pass
  • Run \SQSMessageBusTests\ against LocalStack (\docker-compose up)
  • Test with real AWS credentials in a dev environment

Implement IMessageBus using AWS SNS for publishing and SQS for subscribing,
enabling true pub/sub semantics where all subscribers receive all messages.

Key features:
- SNS fan-out pattern with per-subscriber SQS queues
- Durable subscriptions via configurable queue names
- Policy merging to preserve existing queue permissions
- SQS managed or KMS encryption support
- Connection string configuration for easy setup
- DI extensions for IServiceCollection

Also includes:
- Fix subscriber CTS disposal order in RemoveTopicSubscriptionAsync
- ValueTask AnyContext() extension methods
- Comprehensive test coverage
- Add TopicResolver delegate to SQSMessageBusOptions for routing messages
  to different SNS topics based on message type
- Implement memoization of topic ARNs using ConcurrentDictionary to avoid
  redundant AWS API calls
- Optimize CreateTopicImplAsync to use CreateTopicAsync directly instead
  of FindTopicAsync + CreateTopicAsync (CreateTopicAsync is idempotent)
- Remove unnecessary constructor from SQSMessageBusOptions
- Modernize tests: convert message types to records, use range syntax,
  add TestCancellationToken to all async calls, fix empty catch block
- Add comprehensive tests for topic routing, memoization, and concurrency
@niemyjski
Copy link
Member Author

Latest Changes

This commit adds per-message-type topic routing and optimizes topic creation:

New Features

  • TopicResolver delegate - Configure routing of different message types to different SNS topics

Performance Optimizations

  • Optimized topic creation - Changed from FindTopicAsync (lists ALL topics with pagination) to CreateTopicAsync directly. CreateTopicAsync is idempotent - if the topic exists, it returns the existing ARN. This is significantly faster when there are many topics.
  • Topic ARN memoization - Cache topic ARNs in ConcurrentDictionary with double-check locking to avoid redundant AWS API calls during concurrent publishes.

Code Quality

  • Removed unnecessary constructor from SQSMessageBusOptions (base class already initializes MessageTypeMappings)
  • Modernized tests: converted message types to records, used range syntax instead of Substring
  • Added TestCancellationToken to all async test calls (fixes xUnit1051 warnings)
  • Fixed empty catch block in test cleanup to log warnings instead of silently suppressing errors

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a comprehensive AWS SNS/SQS message bus that enables pub/sub messaging using SNS for publishing and SQS for subscribing. The implementation follows the fan-out pattern where each subscriber gets its own queue to receive all published messages.

Changes:

  • Adds full SQS/SNS message bus implementation with support for durable subscriptions, encryption, and per-message-type topic routing via TopicResolver
  • Optimizes topic creation by using idempotent CreateTopicAsync directly and memoizes topic ARNs to reduce AWS API calls
  • Adds ValueTask extension methods for AnyContext() to support async disposal patterns
  • Includes comprehensive test coverage with tests for durable subscriptions, topic routing, concurrent operations, and memoization

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
src/Foundatio.AWS/Messaging/SQSMessageBus.cs Core implementation with topic management, subscription handling, message polling, and disposal
src/Foundatio.AWS/Messaging/SQSMessageBusOptions.cs Configuration options and builder pattern for message bus setup
src/Foundatio.AWS/Messaging/SQSMessageBusConnectionStringBuilder.cs Connection string parser inheriting from base Amazon builder
src/Foundatio.AWS/Messaging/SQSMessageBusServiceCollectionExtensions.cs Dependency injection extensions for registering message bus
src/Foundatio.AWS/Extensions/TaskExtensions.cs Adds ValueTask overloads for AnyContext extension method
tests/Foundatio.AWS.Tests/Messaging/SQSMessageBusTests.cs Comprehensive test suite covering all features including topic routing and memoization
tests/Foundatio.AWS.Tests/Messaging/SQSMessageBusConnectionStringBuilderTests.cs Tests for connection string parsing
tests/Foundatio.AWS.Tests/appsettings.json Updated configuration for LocalStack testing
src/Foundatio.AWS/Foundatio.AWS.csproj Added SNS and SQS package dependencies
README.md Updated to document SQS message bus support

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


string queueName = $"{_options.Topic}-{Guid.NewGuid():N}";
if (queueName.Length > 80)
queueName = queueName.Substring(0, 80);
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using string.Substring is considered legacy in modern C#. Consider using range operators instead for better consistency with modern C# patterns. Replace 'queueName.Substring(0, 80)' with 'queueName[..80]'.

Suggested change
queueName = queueName.Substring(0, 80);
queueName = queueName[..80];

Copilot uses AI. Check for mistakes.
if (_sqsClient.IsValueCreated)
_sqsClient.Value.Dispose();

base.Dispose();
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DisposeAsync implementation calls base.Dispose() instead of base.DisposeAsync(). While this may work if the base class doesn't have async dispose operations, it's inconsistent with the async disposal pattern. Consider checking if the base class implements IAsyncDisposable and calling the appropriate method. If the base class only has synchronous disposal, this is acceptable but should be verified.

Suggested change
base.Dispose();
await base.DisposeAsync().AnyContext();

Copilot uses AI. Check for mistakes.
Comment on lines 278 to 279
await messageBus2.SqsClient.DeleteQueueAsync(
(await messageBus2.SqsClient.GetQueueUrlAsync(durableQueueName, TestCancellationToken)).QueueUrl, TestCancellationToken);
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accessing the SqsClient property after DisposeAsync has been called can cause issues since the client may have already been disposed. This test accesses messageBus2.SqsClient after calling DisposeAsync on line 273. Consider moving the cleanup code before the DisposeAsync call, or create a separate SQS client instance for cleanup operations.

Copilot uses AI. Check for mistakes.
Comment on lines 774 to 778
foreach (char c in name)
{
if (!Char.IsLetterOrDigit(c) && c != '-' && c != '_')
throw new ArgumentException($"Queue name contains invalid character '{c}'. Only alphanumeric characters, hyphens, and underscores are allowed.", parameterName);
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This foreach loop implicitly filters its target sequence - consider filtering the sequence explicitly using '.Where(...)'.

Copilot uses AI. Check for mistakes.
Comment on lines 828 to 833
foreach (var stmt in statements)
{
string stmtSid = stmt?.AsObject()?["Sid"]?.GetValue<string>();
if (stmtSid == sid)
return true;
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This foreach loop immediately maps its iteration variable to another variable - consider mapping the sequence explicitly using '.Select(...)'.

Copilot uses AI. Check for mistakes.
{
await _subscriberTask.AnyContext();
}
catch (OperationCanceledException) { }
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poor error handling: empty catch block.

Suggested change
catch (OperationCanceledException) { }
catch (OperationCanceledException)
{
// Expected during shutdown after requesting cancellation of the subscriber task.
_logger.LogDebug("Subscriber task was canceled while removing topic subscription.");
}

Copilot uses AI. Check for mistakes.

using (await _lock.LockAsync(cancellationToken).AnyContext())
{
if (_subscriberTask is not null)
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Condition is always false because of ... is ....

Copilot uses AI. Check for mistakes.

// Assert - TopicResolver was called multiple times but topic should only be created once
// The memoization in GetOrCreateTopicArnAsync ensures the actual AWS CreateTopic is called once
Assert.True(topicCreationCount.ContainsKey("routed-TopicAMessage"));
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient use of 'ContainsKey' and indexer.

Copilot uses AI. Check for mistakes.
Comment on lines 500 to 501
Assert.True(resolverCallCount.ContainsKey(typeof(TopicAMessage)));
Assert.Equal(5, resolverCallCount[typeof(TopicAMessage)]);
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient use of 'ContainsKey' and indexer.

Suggested change
Assert.True(resolverCallCount.ContainsKey(typeof(TopicAMessage)));
Assert.Equal(5, resolverCallCount[typeof(TopicAMessage)]);
var topicAMessageType = typeof(TopicAMessage);
Assert.True(resolverCallCount.TryGetValue(topicAMessageType, out var count));
Assert.Equal(5, count);

Copilot uses AI. Check for mistakes.
- Use range syntax queueName[..80] instead of Substring(0, 80)
- Refactor durable queue test to use await using pattern
- Second message bus enables auto-delete for automatic cleanup
@niemyjski niemyjski force-pushed the feature/sqsmessagebus branch from 24366f4 to 43e6a5f Compare January 30, 2026 21:25
RCA: EnsureTopicSubscriptionAsync held _lock and called GetOrCreateTopicArnAsync
which tried to acquire the same _lock. AsyncLock is not reentrant, causing deadlock.

Fix: Remove lock from GetOrCreateTopicArnAsync since caller already holds it.
Use ConcurrentDictionary.TryAdd for thread-safe caching of topic ARNs.
@niemyjski niemyjski force-pushed the feature/sqsmessagebus branch from 43e6a5f to 60eec82 Compare January 30, 2026 21:26
The range syntax [..40] failed when the generated topic name was
shorter than 40 characters. Removed unnecessary truncation since
these topic names are already under the limit.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 12 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Foundatio.Extensions.Hosting.Messaging;
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent namespace: This file uses the namespace 'Foundatio.Extensions.Hosting.Messaging' but other files in this codebase use namespaces like 'Foundatio.Storage', 'Foundatio.Queues', and 'Foundatio.Messaging' directly. For consistency with the codebase conventions, consider using 'Foundatio.Messaging' or creating a dedicated extensions namespace that follows the established pattern (e.g., 'Foundatio.AWS.Extensions' like the existing TaskExtensions.cs uses).

Copilot uses AI. Check for mistakes.
Comment on lines 694 to 737
_subscriberCts?.Cancel();
if (_subscriberTask is not null)
{
try
{
await _subscriberTask.AnyContext();
}
catch (OperationCanceledException)
{
// Ignored
}
}

_subscriberCts?.Dispose();

if (!String.IsNullOrEmpty(_subscriptionArn) && _snsClient.IsValueCreated)
{
try
{
await _snsClient.Value.UnsubscribeAsync(_subscriptionArn).AnyContext();
}
catch
{
// Ignored
}
}

if (!String.IsNullOrEmpty(_queueUrl) && _sqsClient.IsValueCreated && _options.SubscriptionQueueAutoDelete)
{
try
{
await _sqsClient.Value.DeleteQueueAsync(_queueUrl).AnyContext();
}
catch
{
// Ignored
}
}

if (_snsClient.IsValueCreated)
_snsClient.Value.Dispose();

if (_sqsClient.IsValueCreated)
_sqsClient.Value.Dispose();
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential race condition in DisposeAsync: The method doesn't acquire the _lock before accessing and modifying shared state (_subscriberTask, _subscriberCts, _subscriptionArn, _queueUrl). If DisposeAsync is called concurrently with EnsureTopicSubscriptionAsync or RemoveTopicSubscriptionAsync, this could lead to race conditions or partially disposed state.

Consider wrapping the disposal logic (lines 694-737) in a lock acquisition similar to RemoveTopicSubscriptionAsync (line 388), or call RemoveTopicSubscriptionAsync directly and then dispose the clients.

Suggested change
_subscriberCts?.Cancel();
if (_subscriberTask is not null)
{
try
{
await _subscriberTask.AnyContext();
}
catch (OperationCanceledException)
{
// Ignored
}
}
_subscriberCts?.Dispose();
if (!String.IsNullOrEmpty(_subscriptionArn) && _snsClient.IsValueCreated)
{
try
{
await _snsClient.Value.UnsubscribeAsync(_subscriptionArn).AnyContext();
}
catch
{
// Ignored
}
}
if (!String.IsNullOrEmpty(_queueUrl) && _sqsClient.IsValueCreated && _options.SubscriptionQueueAutoDelete)
{
try
{
await _sqsClient.Value.DeleteQueueAsync(_queueUrl).AnyContext();
}
catch
{
// Ignored
}
}
if (_snsClient.IsValueCreated)
_snsClient.Value.Dispose();
if (_sqsClient.IsValueCreated)
_sqsClient.Value.Dispose();
using (await _lock.LockAsync())
{
_subscriberCts?.Cancel();
if (_subscriberTask is not null)
{
try
{
await _subscriberTask.AnyContext();
}
catch (OperationCanceledException)
{
// Ignored
}
}
_subscriberCts?.Dispose();
if (!String.IsNullOrEmpty(_subscriptionArn) && _snsClient.IsValueCreated)
{
try
{
await _snsClient.Value.UnsubscribeAsync(_subscriptionArn).AnyContext();
}
catch
{
// Ignored
}
}
if (!String.IsNullOrEmpty(_queueUrl) && _sqsClient.IsValueCreated && _options.SubscriptionQueueAutoDelete)
{
try
{
await _sqsClient.Value.DeleteQueueAsync(_queueUrl).AnyContext();
}
catch
{
// Ignored
}
}
if (_snsClient.IsValueCreated)
_snsClient.Value.Dispose();
if (_sqsClient.IsValueCreated)
_sqsClient.Value.Dispose();
}

Copilot uses AI. Check for mistakes.
Comment on lines 455 to 458
// Assert - TopicResolver was called multiple times but topic should only be created once
// The memoization in GetOrCreateTopicArnAsync ensures the actual AWS CreateTopic is called once
Assert.True(topicCreationCount.ContainsKey("routed-TopicAMessage"));
// The resolver is called for each publish, but the underlying topic creation is memoized
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading test comment: The comment states "The memoization in GetOrCreateTopicArnAsync ensures the actual AWS CreateTopic is called once" but the current implementation in SQSMessageBus.cs (lines 177-187) does not guarantee single-call creation under concurrent load due to a race condition in the check-then-act pattern. The test may pass intermittently but doesn't actually verify that CreateTopic is called only once.

Suggested change
// Assert - TopicResolver was called multiple times but topic should only be created once
// The memoization in GetOrCreateTopicArnAsync ensures the actual AWS CreateTopic is called once
Assert.True(topicCreationCount.ContainsKey("routed-TopicAMessage"));
// The resolver is called for each publish, but the underlying topic creation is memoized
// Assert - TopicResolver is called for concurrent publishes and resolves a routed topic name
// This test only verifies that the resolver is invoked and that publishing succeeds under concurrency;
// it does not assert how many times the underlying AWS CreateTopic API is called.
Assert.True(topicCreationCount.ContainsKey("routed-TopicAMessage"));

Copilot uses AI. Check for mistakes.
Comment on lines +175 to +186
/// Caller must hold the lock when topic creation may occur.
/// </remarks>
private async Task<string> GetOrCreateTopicArnAsync(string topicName, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(topicName, nameof(topicName));

if (_topicArns.TryGetValue(topicName, out string arn))
return arn;

arn = await CreateTopicImplAsync(topicName, cancellationToken).AnyContext();
_topicArns.TryAdd(topicName, arn);
return arn;
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition in GetOrCreateTopicArnAsync: Multiple concurrent calls for the same topic name can result in duplicate CreateTopicImplAsync calls because the check-then-act pattern (TryGetValue -> CreateTopicImplAsync -> TryAdd) is not atomic. While CreateTopicAsync is idempotent and will return the same ARN, this leads to unnecessary AWS API calls, increased costs, and potential rate limit violations under concurrent load.

The test on line 429 of SQSMessageBusTests.cs ("PublishAsync_WithConcurrentPublishes_CreatesTopicsOnce") documents an expectation that "The memoization in GetOrCreateTopicArnAsync ensures the actual AWS CreateTopic is called once", but the current implementation does not guarantee this.

Consider using GetOrAdd with an async factory function (e.g., using AsyncLazy or SemaphoreSlim-based double-checked locking) to ensure thread-safe, single-call topic creation.

Suggested change
/// Caller must hold the lock when topic creation may occur.
/// </remarks>
private async Task<string> GetOrCreateTopicArnAsync(string topicName, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(topicName, nameof(topicName));
if (_topicArns.TryGetValue(topicName, out string arn))
return arn;
arn = await CreateTopicImplAsync(topicName, cancellationToken).AnyContext();
_topicArns.TryAdd(topicName, arn);
return arn;
/// Concurrent callers for the same topic name are synchronized so that the underlying
/// SNS topic is created or resolved at most once.
/// </remarks>
private async Task<string> GetOrCreateTopicArnAsync(string topicName, CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrEmpty(topicName, nameof(topicName));
// Fast path: topic ARN already cached.
if (_topicArns.TryGetValue(topicName, out string arn))
return arn;
// Ensure that only one concurrent caller per topic name performs the creation.
// Local static dictionary is shared across all invocations of this method within the process.
static readonly ConcurrentDictionary<string, SemaphoreSlim> s_topicLocks = new();
var topicLock = s_topicLocks.GetOrAdd(topicName, _ => new SemaphoreSlim(1, 1));
await topicLock.WaitAsync(cancellationToken).AnyContext();
try
{
// Double-check after acquiring the lock in case another caller created the topic.
if (_topicArns.TryGetValue(topicName, out arn))
return arn;
arn = await CreateTopicImplAsync(topicName, cancellationToken).AnyContext();
_topicArns[topicName] = arn;
return arn;
}
finally
{
topicLock.Release();
}

Copilot uses AI. Check for mistakes.
Comment on lines +173 to +176
/// <remarks>
/// Topic ARNs are cached in a ConcurrentDictionary to avoid redundant AWS API calls.
/// Caller must hold the lock when topic creation may occur.
/// </remarks>
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading documentation comment: The comment states "Caller must hold the lock when topic creation may occur" but this method is called from multiple places (lines 151, 275, 603) without holding any lock. The documentation should either be corrected to match the actual thread-safety guarantees, or the implementation should be changed to require a lock.

Copilot uses AI. Check for mistakes.
Comment on lines 821 to 826
foreach (var stmt in statements)
{
string stmtSid = stmt?.AsObject()?["Sid"]?.GetValue<string>();
if (stmtSid == sid)
return true;
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This foreach loop immediately maps its iteration variable to another variable - consider mapping the sequence explicitly using '.Select(...)'.

Copilot uses AI. Check for mistakes.
{
await _subscriberTask.AnyContext();
}
catch (OperationCanceledException) { }
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poor error handling: empty catch block.

Suggested change
catch (OperationCanceledException) { }
catch (OperationCanceledException)
{
_logger.LogDebug("Subscriber task was canceled during topic unsubscription.");
}

Copilot uses AI. Check for mistakes.

using (await _lock.LockAsync(cancellationToken).AnyContext())
{
if (_subscriberTask is not null)
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Condition is always false because of ... is ....

Copilot uses AI. Check for mistakes.

// Assert - TopicResolver was called multiple times but topic should only be created once
// The memoization in GetOrCreateTopicArnAsync ensures the actual AWS CreateTopic is called once
Assert.True(topicCreationCount.ContainsKey("routed-TopicAMessage"));
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient use of 'ContainsKey' and indexer.

Copilot uses AI. Check for mistakes.
Comment on lines 486 to 487
Assert.True(resolverCallCount.ContainsKey(typeof(TopicAMessage)));
Assert.Equal(5, resolverCallCount[typeof(TopicAMessage)]);
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient use of 'ContainsKey' and indexer.

Suggested change
Assert.True(resolverCallCount.ContainsKey(typeof(TopicAMessage)));
Assert.Equal(5, resolverCallCount[typeof(TopicAMessage)]);
Assert.True(resolverCallCount.TryGetValue(typeof(TopicAMessage), out int count));
Assert.Equal(5, count);

Copilot uses AI. Check for mistakes.
- Add System.Linq import for LINQ extension methods
- Replace empty catch block with comment explaining expected cancellation
- Add exception filters to generic catch clauses to allow fatal exceptions
  (OutOfMemoryException, StackOverflowException) to propagate
- Replace generic catch blocks in DisposeAsync with specific exception types
  (SnsNotFoundException, QueueDoesNotExistException, AmazonServiceException)
- Refactor ValidateQueueName to use LINQ FirstOrDefault instead of foreach
- Refactor PolicyContainsStatement to use LINQ Any instead of foreach
- Replace inefficient ContainsKey + indexer with TryGetValue in tests

using (await _lock.LockAsync(cancellationToken).AnyContext())
{
if (_subscriberTask is not null)
Comment on lines +528 to +542
catch (Exception ex)
{
_logger.LogError(ex, "Error in subscriber loop: {Message}", ex.Message);
if (cancellationToken.IsCancellationRequested)
break;

try
{
await _timeProvider.Delay(TimeSpan.FromSeconds(1), cancellationToken).AnyContext();
}
catch (OperationCanceledException)
{
break;
}
}
Comment on lines +513 to +516
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message {MessageId}: {Message}", sqsMessage.MessageId, ex.Message);
}
Comment on lines +832 to +835
catch
{
// If we can't parse the policy, assume we need to update it
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants