Skip to content

Commit 1134aad

Browse files
committed
[Host.AmazonSQS] Add SNS support
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
1 parent 0cbcf2e commit 1134aad

21 files changed

+301
-187
lines changed

docs/provider_amazon_sqs.md

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ Before diving into this provider documentation, please make sure to read the [In
55
### Table of Contents
66

77
- [Configuration](#configuration)
8-
- [Amazon SNS](#amazon-sns)
98
- [Producing Messages](#producing-messages)
109
- [Consuming Messages](#consuming-messages)
1110
- [Consumer Context](#consumer-context)
@@ -18,7 +17,7 @@ Before diving into this provider documentation, please make sure to read the [In
1817

1918
## Configuration
2019

21-
To configure Amazon SQS as your transport provider, you need to specify the AWS region and choose an authentication method:
20+
To configure Amazon SQS / SNS as the transport provider, you need to specify the AWS region and choose an authentication method:
2221

2322
- **Static Credentials**: [Learn more](https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html)
2423
- **Temporary Credentials**: [Learn more](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html#RequestWithSTS)
@@ -35,7 +34,7 @@ services.AddSlimMessageBus((mbb) =>
3534
cfg.UseRegion(Amazon.RegionEndpoint.EUCentral1);
3635

3736
// Use static credentials: https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html
38-
cfg.UseCredentials(accessKey, secretAccessKey);
37+
cfg.UseCredentials(accessKey, secretAccessKey, SqsMessageBusMode.Sqs);
3938

4039
// Use temporary credentials: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html#RequestWithSTS
4140
//cfg.UseTemporaryCredentials(roleArn, roleSessionName);
@@ -45,11 +44,10 @@ services.AddSlimMessageBus((mbb) =>
4544
});
4645
```
4746

48-
For an example configuration, check out this file: [`SqsMessageBusSettings`](../src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs). The settings allow you to customize the SQS client object and control topology provisioning for advanced scenarios.
47+
For an example configuration, check out this file: [`SqsMessageBusSettings`](../src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs). The settings allow you to customize the SQS and SNS client object and control topology provisioning for advanced scenarios.
4948

50-
## Amazon SNS
51-
52-
Support for Amazon SNS (Simple Notification Service) will be added soon to this transport plugin.
49+
The plugin supports both SQS (Simple Queue Service Queues) and SNS (Simple Notification Service).
50+
However, if you want to use just one specify the `mode` parameter in either `.UseCredentials(mode: SqsMessageBusMode.Sqs)` or `.UseCredentials(mode: SqsMessageBusMode.Sns)`.
5351

5452
## Producing Messages
5553

docs/provider_amazon_sqs.t.md

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ Before diving into this provider documentation, please make sure to read the [In
55
### Table of Contents
66

77
- [Configuration](#configuration)
8-
- [Amazon SNS](#amazon-sns)
98
- [Producing Messages](#producing-messages)
109
- [Consuming Messages](#consuming-messages)
1110
- [Consumer Context](#consumer-context)
@@ -18,7 +17,7 @@ Before diving into this provider documentation, please make sure to read the [In
1817

1918
## Configuration
2019

21-
To configure Amazon SQS as your transport provider, you need to specify the AWS region and choose an authentication method:
20+
To configure Amazon SQS / SNS as the transport provider, you need to specify the AWS region and choose an authentication method:
2221

2322
- **Static Credentials**: [Learn more](https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html)
2423
- **Temporary Credentials**: [Learn more](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html#RequestWithSTS)
@@ -29,11 +28,10 @@ using SlimMessageBus.Host.AmazonSQS;
2928

3029
@[:cs](../src/Tests/SlimMessageBus.Host.AmazonSQS.Test/SqsMessageBusIt.cs,ExampleSetup)
3130

32-
For an example configuration, check out this file: [`SqsMessageBusSettings`](../src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs). The settings allow you to customize the SQS client object and control topology provisioning for advanced scenarios.
31+
For an example configuration, check out this file: [`SqsMessageBusSettings`](../src/SlimMessageBus.Host.AmazonSQS/SqsMessageBusSettings.cs). The settings allow you to customize the SQS and SNS client object and control topology provisioning for advanced scenarios.
3332

34-
## Amazon SNS
35-
36-
Support for Amazon SNS (Simple Notification Service) will be added soon to this transport plugin.
33+
The plugin supports both SQS (Simple Queue Service Queues) and SNS (Simple Notification Service).
34+
However, if you want to use just one specify the `mode` parameter in either `.UseCredentials(mode: SqsMessageBusMode.Sqs)` or `.UseCredentials(mode: SqsMessageBusMode.Sns)`.
3735

3836
## Producing Messages
3937

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
namespace SlimMessageBus.Host.AmazonSQS;
2+
3+
public abstract class AbstractClientProvider<TClient>(TClient client) : IDisposable
4+
where TClient : IDisposable
5+
{
6+
private bool _disposedValue;
7+
8+
public TClient Client => client;
9+
10+
public virtual Task EnsureClientAuthenticated() => Task.CompletedTask;
11+
12+
#region Dispose Pattern
13+
14+
protected virtual void Dispose(bool disposing)
15+
{
16+
if (!_disposedValue)
17+
{
18+
if (disposing)
19+
{
20+
client?.Dispose();
21+
}
22+
23+
_disposedValue = true;
24+
}
25+
}
26+
27+
public void Dispose()
28+
{
29+
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
30+
Dispose(disposing: true);
31+
GC.SuppressFinalize(this);
32+
}
33+
34+
#endregion
35+
36+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
namespace SlimMessageBus.Host.AmazonSQS;
2+
3+
using Amazon.SecurityToken;
4+
using Amazon.SecurityToken.Model;
5+
6+
/// <summary>
7+
/// Client wrapper that renews authentication token prior expiration.
8+
/// </summary>
9+
/// <typeparam name="TClient"></typeparam>
10+
/// <typeparam name="TClientConfig"></typeparam>
11+
public abstract class AbstractTemporaryCredentialsSqsClientProvider<TClient, TClientConfig> : IDisposable
12+
where TClient : IDisposable
13+
{
14+
private bool _disposedValue;
15+
16+
private readonly TClientConfig _clientConfig;
17+
private readonly string _roleArn;
18+
private readonly string _roleSessionName;
19+
20+
private readonly AmazonSecurityTokenServiceClient _stsClient;
21+
private readonly Timer _timer;
22+
private readonly SemaphoreSlim _semaphoreSlim = new(1, 1);
23+
24+
private TClient _client;
25+
private DateTime _clientCredentialsExpiry;
26+
27+
public AbstractTemporaryCredentialsSqsClientProvider(TClientConfig clientConfig, string roleArn, string roleSessionName)
28+
{
29+
_stsClient = new AmazonSecurityTokenServiceClient();
30+
_clientConfig = clientConfig;
31+
_roleArn = roleArn;
32+
_roleSessionName = roleSessionName;
33+
_timer = new Timer(state => _ = EnsureClientAuthenticated(), null, TimeSpan.Zero, TimeSpan.FromMinutes(1));
34+
}
35+
36+
public TClient Client => _client;
37+
38+
public async Task EnsureClientAuthenticated()
39+
{
40+
if (_client == null || DateTime.UtcNow >= _clientCredentialsExpiry)
41+
{
42+
await _semaphoreSlim.WaitAsync();
43+
try
44+
{
45+
var oldClient = _client;
46+
(_client, _clientCredentialsExpiry) = await RefreshCredentialsAsync();
47+
oldClient?.Dispose();
48+
}
49+
finally
50+
{
51+
_semaphoreSlim.Release();
52+
}
53+
}
54+
}
55+
56+
private async Task<(TClient Client, DateTime ClientExpiry)> RefreshCredentialsAsync()
57+
{
58+
var assumeRoleRequest = new AssumeRoleRequest
59+
{
60+
RoleArn = _roleArn,
61+
RoleSessionName = _roleSessionName
62+
};
63+
64+
var assumeRoleResponse = await _stsClient.AssumeRoleAsync(assumeRoleRequest);
65+
66+
var temporaryCredentials = new SessionAWSCredentials(
67+
assumeRoleResponse.Credentials.AccessKeyId,
68+
assumeRoleResponse.Credentials.SecretAccessKey,
69+
assumeRoleResponse.Credentials.SessionToken
70+
);
71+
72+
var clientCredentialsExpiry = assumeRoleResponse.Credentials.Expiration.AddMinutes(-5); // Renew 5 mins before expiry
73+
74+
var client = CreateClient(temporaryCredentials, _clientConfig);
75+
return (client, clientCredentialsExpiry);
76+
}
77+
78+
protected abstract TClient CreateClient(SessionAWSCredentials temporaryCredentials, TClientConfig clientConfig);
79+
80+
#region Dispose Pattern
81+
82+
protected virtual void Dispose(bool disposing)
83+
{
84+
if (!_disposedValue)
85+
{
86+
if (disposing)
87+
{
88+
_client?.Dispose();
89+
_stsClient?.Dispose();
90+
_timer?.Dispose();
91+
}
92+
_disposedValue = true;
93+
}
94+
}
95+
96+
public void Dispose()
97+
{
98+
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
99+
Dispose(disposing: true);
100+
GC.SuppressFinalize(this);
101+
}
102+
103+
#endregion
104+
105+
}
106+
107+
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace SlimMessageBus.Host.AmazonSQS;
2+
3+
/// <summary>
4+
/// Wrapper for the <see cref="AmazonSimpleNotificationServiceClient"/> and the authentication strategy.
5+
/// </summary>
6+
public interface ISnsClientProvider
7+
{
8+
AmazonSimpleNotificationServiceClient Client { get; }
9+
Task EnsureClientAuthenticated();
10+
}

src/SlimMessageBus.Host.AmazonSQS/ClientFactory/ISqsClientProvider.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
namespace SlimMessageBus.Host.AmazonSQS;
22

3+
/// <summary>
4+
/// Wrapper for the <see cref="AmazonSQSClient"/> and the authentication strategy.
5+
/// </summary>
36
public interface ISqsClientProvider
47
{
58
AmazonSQSClient Client { get; }
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace SlimMessageBus.Host.AmazonSQS;
2+
3+
internal class NullSnsClientProvider : ISnsClientProvider
4+
{
5+
public AmazonSimpleNotificationServiceClient Client => null;
6+
7+
public Task EnsureClientAuthenticated() => Task.CompletedTask;
8+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace SlimMessageBus.Host.AmazonSQS;
2+
3+
internal class NullSqsClientProvider : ISqsClientProvider
4+
{
5+
public AmazonSQSClient Client => null;
6+
7+
public Task EnsureClientAuthenticated() => Task.CompletedTask;
8+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace SlimMessageBus.Host.AmazonSQS;
2+
3+
public class StaticCredentialsSnsClientProvider(AmazonSimpleNotificationServiceConfig config, AWSCredentials credentials)
4+
: AbstractClientProvider<AmazonSimpleNotificationServiceClient>(new AmazonSimpleNotificationServiceClient(credentials, config)),
5+
ISnsClientProvider
6+
{
7+
}
Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,7 @@
11
namespace SlimMessageBus.Host.AmazonSQS;
22

3-
public class StaticCredentialsSqsClientProvider : ISqsClientProvider, IDisposable
3+
public class StaticCredentialsSqsClientProvider(AmazonSQSConfig sqsConfig, AWSCredentials credentials)
4+
: AbstractClientProvider<AmazonSQSClient>(new AmazonSQSClient(credentials, sqsConfig)),
5+
ISqsClientProvider
46
{
5-
private bool _disposedValue;
6-
7-
private readonly AmazonSQSClient _client;
8-
9-
public StaticCredentialsSqsClientProvider(AmazonSQSConfig sqsConfig, AWSCredentials credentials)
10-
=> _client = new AmazonSQSClient(credentials, sqsConfig);
11-
12-
#region ISqsClientProvider
13-
14-
public AmazonSQSClient Client => _client;
15-
16-
public Task EnsureClientAuthenticated() => Task.CompletedTask;
17-
18-
#endregion
19-
20-
#region Dispose Pattern
21-
22-
protected virtual void Dispose(bool disposing)
23-
{
24-
if (!_disposedValue)
25-
{
26-
if (disposing)
27-
{
28-
_client?.Dispose();
29-
}
30-
31-
_disposedValue = true;
32-
}
33-
}
34-
35-
public void Dispose()
36-
{
37-
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
38-
Dispose(disposing: true);
39-
GC.SuppressFinalize(this);
40-
}
41-
42-
#endregion
437
}
44-
45-

0 commit comments

Comments
 (0)