Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ To summarize, if you want pain free development and testing while allowing your
- [Redis](https://github.com/FoundatioFx/Foundatio.Redis) - Caching, Storage, Queues, Messaging, Locks
- [Azure Storage](https://github.com/FoundatioFx/Foundatio.AzureStorage) - Storage, Queues
- [Azure ServiceBus](https://github.com/FoundatioFx/Foundatio.AzureServiceBus) - Queues, Messaging
- [AWS](https://github.com/FoundatioFx/Foundatio.AWS) - Storage, Queues
- [AWS](https://github.com/FoundatioFx/Foundatio.AWS) - Storage, Queues, Messaging
- [Kafka](https://github.com/FoundatioFx/Foundatio.Kafka) - Messaging
- [RabbitMQ](https://github.com/FoundatioFx/Foundatio.RabbitMQ) - Messaging
- [Minio](https://github.com/FoundatioFx/Foundatio.Minio) - Storage
Expand Down Expand Up @@ -133,6 +133,7 @@ Allows you to publish and subscribe to messages flowing through your application
3. [RabbitMQMessageBus](https://github.com/FoundatioFx/Foundatio.RabbitMQ/blob/master/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs): A RabbitMQ implementation.
4. [KafkaMessageBus](https://github.com/FoundatioFx/Foundatio.Kafka/blob/main/src/Foundatio.Kafka/Messaging/KafkaMessageBus.cs): A Kafka implementation.
5. [AzureServiceBusMessageBus](https://github.com/FoundatioFx/Foundatio.AzureServiceBus/blob/master/src/Foundatio.AzureServiceBus/Messaging/AzureServiceBusMessageBus.cs): An Azure Service Bus implementation.
6. [SQSMessageBus](https://github.com/FoundatioFx/Foundatio.AWS/blob/master/src/Foundatio.AWS/Messaging/SQSMessageBus.cs): An AWS SNS/SQS implementation using SNS for publishing and SQS for subscribing.

#### Sample

Expand Down
16 changes: 15 additions & 1 deletion src/Foundatio.AWS/Extensions/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
Expand All @@ -22,6 +22,20 @@ public static ConfiguredTaskAwaitable AnyContext(this Task task)
return task.ConfigureAwait(continueOnCapturedContext: false);
}

[DebuggerStepThrough]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ConfiguredValueTaskAwaitable AnyContext(this ValueTask task)
{
return task.ConfigureAwait(continueOnCapturedContext: false);
}

[DebuggerStepThrough]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ConfiguredValueTaskAwaitable<TResult> AnyContext<TResult>(this ValueTask<TResult> task)
{
return task.ConfigureAwait(continueOnCapturedContext: false);
}

[DebuggerStepThrough]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ConfiguredTaskAwaitable<TResult> AnyContext<TResult>(this AwaitableDisposable<TResult> task) where TResult : IDisposable
Expand Down
3 changes: 2 additions & 1 deletion src/Foundatio.AWS/Foundatio.AWS.csproj
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<PackageTags>$(PackageTags);Amazon;AWS;S3</PackageTags>
<PackageTags>$(PackageTags);Amazon;AWS;S3;SNS;SQS</PackageTags>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AWSSDK.CloudWatch" Version="4.0.6.6" />
<PackageReference Include="AWSSDK.S3" Version="4.0.17.3" />
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="4.0.2.13" />
<PackageReference Include="AWSSDK.SQS" Version="4.0.2.13" />

<PackageReference Include="Foundatio" Version="13.0.0-beta1.11" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" />
Expand Down
935 changes: 935 additions & 0 deletions src/Foundatio.AWS/Messaging/SQSMessageBus.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Foundatio.Messaging;

public class SQSMessageBusConnectionStringBuilder : AmazonConnectionStringBuilder
{
public SQSMessageBusConnectionStringBuilder()
{
}

public SQSMessageBusConnectionStringBuilder(string connectionString) : base(connectionString)
{
}
}
285 changes: 285 additions & 0 deletions src/Foundatio.AWS/Messaging/SQSMessageBusOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
using System;
using Amazon;
using Amazon.Runtime;

namespace Foundatio.Messaging;

public class SQSMessageBusOptions : SharedMessageBusOptions
{
/// <summary>
/// The connection string containing AWS credentials and configuration.
/// Format: "AccessKey=xxx;SecretKey=xxx;Region=us-east-1" or "ServiceUrl=http://localhost:4566;AccessKey=xxx;SecretKey=xxx"
/// </summary>
public string ConnectionString { get; set; }

/// <summary>
/// The AWS credentials to use for authentication. If not specified, uses default credential resolution.
/// </summary>
/// <seealso cref="Amazon.Runtime.AWSCredentials"/>
public AWSCredentials Credentials { get; set; }

/// <summary>
/// The AWS region endpoint. If not specified, uses default region resolution.
/// </summary>
/// <seealso cref="Amazon.RegionEndpoint"/>
public RegionEndpoint Region { get; set; }

/// <summary>
/// The service URL for LocalStack or custom endpoints. When set, overrides the region endpoint.
/// </summary>
public string ServiceUrl { get; set; }

/// <summary>
/// Whether the SNS topic can be created if it doesn't exist. Default is true.
/// </summary>
public bool CanCreateTopic { get; set; } = true;

/// <summary>
/// The name of the SQS subscription queue. If not specified, a unique queue name will be generated.
/// Use this for durable subscriptions that persist across restarts.
/// </summary>
public string SubscriptionQueueName { get; set; }

/// <summary>
/// Whether the subscription queue should be automatically deleted when the message bus is disposed.
/// Default is true for ephemeral queues, set to false for durable subscriptions.
/// </summary>
public bool SubscriptionQueueAutoDelete { get; set; } = true;

/// <summary>
/// The timeout for reading messages from the SQS queue using long polling. Default is 20 seconds.
/// </summary>
public TimeSpan ReadQueueTimeout { get; set; } = TimeSpan.FromSeconds(20);

/// <summary>
/// The interval between dequeue attempts when no messages are available. Default is 1 second.
/// </summary>
public TimeSpan DequeueInterval { get; set; } = TimeSpan.FromSeconds(1);

/// <summary>
/// The visibility timeout for messages. Messages will be invisible to other consumers
/// for this duration after being received. If not set, uses SQS default (30 seconds).
/// </summary>
public TimeSpan? MessageVisibilityTimeout { get; set; }

/// <summary>
/// Whether to use SQS managed server-side encryption (SSE-SQS). Default is false.
/// </summary>
public bool SqsManagedSseEnabled { get; set; }

/// <summary>
/// The KMS master key ID for server-side encryption (SSE-KMS).
/// </summary>
public string KmsMasterKeyId { get; set; }

/// <summary>
/// The KMS data key reuse period in seconds. Default is 300 seconds (5 minutes).
/// </summary>
public int KmsDataKeyReusePeriodSeconds { get; set; } = 300;

/// <summary>
/// Optional function to resolve the SNS topic name for a given message type.
/// If not set or returns null, falls back to the default <see cref="SharedMessageBusOptions.Topic"/>.
/// This enables routing different message types to different SNS topics.
/// </summary>
public Func<Type, string> TopicResolver { get; set; }
}

public class SQSMessageBusOptionsBuilder : SharedMessageBusOptionsBuilder<SQSMessageBusOptions, SQSMessageBusOptionsBuilder>
{
/// <summary>
/// Sets the connection string containing AWS credentials and configuration.
/// </summary>
public SQSMessageBusOptionsBuilder ConnectionString(string connectionString)
{
ArgumentException.ThrowIfNullOrEmpty(connectionString);
Target.ConnectionString = connectionString;
return this;
}

/// <summary>
/// Sets the AWS credentials to use for authentication.
/// </summary>
public SQSMessageBusOptionsBuilder Credentials(AWSCredentials credentials)
{
ArgumentNullException.ThrowIfNull(credentials);
Target.Credentials = credentials;
return this;
}

/// <summary>
/// Sets the AWS credentials using access key and secret key.
/// </summary>
public SQSMessageBusOptionsBuilder Credentials(string accessKey, string secretKey)
{
ArgumentException.ThrowIfNullOrEmpty(accessKey);
ArgumentException.ThrowIfNullOrEmpty(secretKey);

Target.Credentials = new BasicAWSCredentials(accessKey, secretKey);
return this;
}

/// <summary>
/// Sets the AWS region endpoint.
/// </summary>
public SQSMessageBusOptionsBuilder Region(RegionEndpoint region)
{
ArgumentNullException.ThrowIfNull(region);
Target.Region = region;
return this;
}

/// <summary>
/// Sets the AWS region by system name (e.g., "us-east-1").
/// </summary>
public SQSMessageBusOptionsBuilder Region(string region)
{
ArgumentException.ThrowIfNullOrEmpty(region);
Target.Region = RegionEndpoint.GetBySystemName(region);
return this;
}

/// <summary>
/// Sets the service URL for LocalStack or custom endpoints.
/// </summary>
public SQSMessageBusOptionsBuilder ServiceUrl(string serviceUrl)
{
Target.ServiceUrl = serviceUrl;
return this;
}

/// <summary>
/// Sets whether the SNS topic can be created if it doesn't exist.
/// </summary>
public SQSMessageBusOptionsBuilder CanCreateTopic(bool enabled)
{
Target.CanCreateTopic = enabled;
return this;
}

/// <summary>
/// Enables automatic topic creation if it doesn't exist.
/// </summary>
public SQSMessageBusOptionsBuilder EnableCreateTopic() => CanCreateTopic(true);

/// <summary>
/// Disables automatic topic creation. An exception will be thrown if the topic doesn't exist.
/// </summary>
public SQSMessageBusOptionsBuilder DisableCreateTopic() => CanCreateTopic(false);

/// <summary>
/// Sets the subscription queue name for durable subscriptions.
/// </summary>
public SQSMessageBusOptionsBuilder SubscriptionQueueName(string queueName)
{
Target.SubscriptionQueueName = queueName;
return this;
}

/// <summary>
/// Configures whether the subscription queue should be auto-deleted on dispose.
/// </summary>
public SQSMessageBusOptionsBuilder SubscriptionQueueAutoDelete(bool autoDelete)
{
Target.SubscriptionQueueAutoDelete = autoDelete;
return this;
}

/// <summary>
/// Configures a durable subscription that persists across restarts.
/// Sets the queue name and disables auto-delete.
/// </summary>
public SQSMessageBusOptionsBuilder UseDurableSubscription(string queueName)
{
ArgumentException.ThrowIfNullOrEmpty(queueName);

Target.SubscriptionQueueName = queueName;
Target.SubscriptionQueueAutoDelete = false;
return this;
}

/// <summary>
/// Sets the timeout for reading messages from the SQS queue using long polling.
/// </summary>
public SQSMessageBusOptionsBuilder ReadQueueTimeout(TimeSpan timeout)
{
ArgumentOutOfRangeException.ThrowIfLessThan(timeout, TimeSpan.Zero);

Target.ReadQueueTimeout = timeout;
return this;
}

/// <summary>
/// Sets the interval between dequeue attempts when no messages are available.
/// </summary>
public SQSMessageBusOptionsBuilder DequeueInterval(TimeSpan interval)
{
ArgumentOutOfRangeException.ThrowIfLessThan(interval, TimeSpan.Zero);

Target.DequeueInterval = interval;
return this;
}

/// <summary>
/// Sets the visibility timeout for messages.
/// </summary>
public SQSMessageBusOptionsBuilder MessageVisibilityTimeout(TimeSpan timeout)
{
ArgumentOutOfRangeException.ThrowIfLessThan(timeout, TimeSpan.Zero);

Target.MessageVisibilityTimeout = timeout;
return this;
}

/// <summary>
/// Enables KMS server-side encryption (SSE-KMS) with the specified master key.
/// </summary>
public SQSMessageBusOptionsBuilder UseKmsEncryption(string kmsMasterKeyId, int kmsKeyReusePeriodSeconds = 300)
{
ArgumentException.ThrowIfNullOrEmpty(kmsMasterKeyId);
Target.KmsMasterKeyId = kmsMasterKeyId;
Target.KmsDataKeyReusePeriodSeconds = kmsKeyReusePeriodSeconds;
Target.SqsManagedSseEnabled = false;
return this;
}

/// <summary>
/// Enables SQS managed server-side encryption (SSE-SQS).
/// </summary>
public SQSMessageBusOptionsBuilder UseSqsManagedEncryption()
{
Target.SqsManagedSseEnabled = true;
Target.KmsMasterKeyId = null;
return this;
}

/// <summary>
/// Sets a function to resolve the SNS topic name for a given message type.
/// This enables routing different message types to different SNS topics.
/// </summary>
/// <param name="resolver">
/// A function that takes a message type and returns the topic name.
/// Return null to use the default topic.
/// </param>
public SQSMessageBusOptionsBuilder TopicResolver(Func<Type, string> resolver)
{
Target.TopicResolver = resolver;
return this;
}

/// <inheritdoc />
public override SQSMessageBusOptions Build()
{
if (String.IsNullOrEmpty(Target.ConnectionString))
return Target;

var connectionString = new SQSMessageBusConnectionStringBuilder(Target.ConnectionString);
Target.Credentials ??= connectionString.GetCredentials();
Target.Region ??= connectionString.GetRegion();

if (String.IsNullOrEmpty(Target.ServiceUrl) && !String.IsNullOrEmpty(connectionString.ServiceUrl))
Target.ServiceUrl = connectionString.ServiceUrl;

return Target;
}
}
Loading
Loading