Skip to content

Commit 5440c8e

Browse files
committed
[Host.AmazonSQS] Add SNS support
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
1 parent 1134aad commit 5440c8e

File tree

11 files changed

+355
-117
lines changed

11 files changed

+355
-117
lines changed

src/SlimMessageBus.Host.AmazonSQS/ClientFactory/NullSnsClientProvider.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
internal class NullSnsClientProvider : ISnsClientProvider
44
{
5-
public AmazonSimpleNotificationServiceClient Client => null;
5+
public AmazonSimpleNotificationServiceClient Client
6+
=> throw new ConfigurationMessageBusException("The connection to Amazon SNS has not been provided - check your bus configuration");
67

78
public Task EnsureClientAuthenticated() => Task.CompletedTask;
89
}

src/SlimMessageBus.Host.AmazonSQS/ClientFactory/NullSqsClientProvider.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
internal class NullSqsClientProvider : ISqsClientProvider
44
{
5-
public AmazonSQSClient Client => null;
5+
public AmazonSQSClient Client
6+
=> throw new ConfigurationMessageBusException("The connection to Amazon SQS has not been provided - check your bus configuration");
67

78
public Task EnsureClientAuthenticated() => Task.CompletedTask;
89
}

src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public abstract class SqsBaseConsumer : AbstractConsumer
1313

1414
public SqsMessageBus MessageBus { get; }
1515
protected IMessageProcessor<Message> MessageProcessor { get; }
16-
protected ISqsHeaderSerializer HeaderSerializer { get; }
16+
protected ISqsHeaderSerializer<Amazon.SQS.Model.MessageAttributeValue> HeaderSerializer { get; }
1717

1818
protected SqsBaseConsumer(
1919
SqsMessageBus messageBus,
@@ -30,7 +30,7 @@ protected SqsBaseConsumer(
3030
_clientProvider = clientProvider ?? throw new ArgumentNullException(nameof(clientProvider));
3131
MessageBus = messageBus;
3232
MessageProcessor = messageProcessor ?? throw new ArgumentNullException(nameof(messageProcessor));
33-
HeaderSerializer = messageBus.HeaderSerializer;
33+
HeaderSerializer = messageBus.SqsHeaderSerializer;
3434
T GetSingleValue<T>(Func<AbstractConsumerSettings, T> selector, string settingName, T defaultValue = default)
3535
{
3636
var set = consumerSettings.Select(x => selector(x)).Where(x => x is not null && !x.Equals(defaultValue)).ToHashSet();
@@ -43,7 +43,7 @@ T GetSingleValue<T>(Func<AbstractConsumerSettings, T> selector, string settingNa
4343

4444
_maxMessages = GetSingleValue(x => x.GetOrDefault(SqsProperties.MaxMessages), nameof(SqsConsumerBuilderExtensions.MaxMessages)) ?? messageBus.ProviderSettings.MaxMessageCount;
4545
_visibilityTimeout = GetSingleValue(x => x.GetOrDefault(SqsProperties.VisibilityTimeout), nameof(SqsConsumerBuilderExtensions.VisibilityTimeout)) ?? 30;
46-
_messageAttributeNames = new List<string>(GetSingleValue(x => x.GetOrDefault(SqsProperties.MessageAttributes), nameof(SqsConsumerBuilderExtensions.FetchMessageAttributes)) ?? ["All"]);
46+
_messageAttributeNames = [.. GetSingleValue(x => x.GetOrDefault(SqsProperties.MessageAttributes), nameof(SqsConsumerBuilderExtensions.FetchMessageAttributes)) ?? ["All"]];
4747
}
4848

4949
private async Task<IReadOnlyCollection<Message>> ReceiveMessagesByUrl(string queueUrl)
@@ -101,7 +101,7 @@ protected override async Task OnStop()
101101

102102
protected async Task Run()
103103
{
104-
var queueUrl = MessageBus.GetQueueUrlOrException(Path);
104+
var (queueUrl, _) = MessageBus.GetUrlAndKindOrException(Path);
105105

106106
var messagesToDelete = new List<Message>(_maxMessages);
107107

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
namespace SlimMessageBus.Host.AmazonSQS;
2+
3+
public class DefaultSnsHeaderSerializer(bool detectStringType = true) : ISqsHeaderSerializer<Amazon.SimpleNotificationService.Model.MessageAttributeValue>
4+
{
5+
const string DataTypeNumber = "Number";
6+
const string DataTypeString = "String";
7+
8+
public Amazon.SimpleNotificationService.Model.MessageAttributeValue Serialize(string key, object value) => value switch
9+
{
10+
// See more https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes
11+
var x when x is long || x is int || x is short || x is byte => new Amazon.SimpleNotificationService.Model.MessageAttributeValue
12+
{
13+
DataType = DataTypeNumber,
14+
StringValue = value.ToString()
15+
},
16+
_ => new Amazon.SimpleNotificationService.Model.MessageAttributeValue
17+
{
18+
DataType = DataTypeString,
19+
StringValue = value?.ToString()
20+
}
21+
};
22+
23+
public object Deserialize(string key, Amazon.SimpleNotificationService.Model.MessageAttributeValue value) => value.DataType switch
24+
{
25+
DataTypeNumber when long.TryParse(value.StringValue, out var longValue) => longValue,
26+
DataTypeString when detectStringType && key != ReqRespMessageHeaders.RequestId && Guid.TryParse(value.StringValue, out var guid) => guid,
27+
DataTypeString when detectStringType && bool.TryParse(value.StringValue, out var b) => b,
28+
DataTypeString when detectStringType && DateTime.TryParse(value.StringValue, out var dt) => dt,
29+
DataTypeString => value.StringValue,
30+
_ => null
31+
};
32+
}

src/SlimMessageBus.Host.AmazonSQS/Headers/DefaultSqsHeaderSerializer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
namespace SlimMessageBus.Host.AmazonSQS;
22

3-
public class DefaultSqsHeaderSerializer(bool detectStringType = true) : ISqsHeaderSerializer
3+
public class DefaultSqsHeaderSerializer(bool detectStringType = true) : ISqsHeaderSerializer<Amazon.SQS.Model.MessageAttributeValue>
44
{
55
const string DataTypeNumber = "Number";
66
const string DataTypeString = "String";
@@ -29,4 +29,4 @@ DataTypeNumber when long.TryParse(value.StringValue, out var longValue) => longV
2929
DataTypeString => value.StringValue,
3030
_ => null
3131
};
32-
}
32+
}
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
namespace SlimMessageBus.Host.AmazonSQS;
22

3-
public interface ISqsHeaderSerializer
3+
public interface ISqsHeaderSerializer<THeaderValue> where THeaderValue : class
44
{
5-
MessageAttributeValue Serialize(string key, object value);
6-
object Deserialize(string key, MessageAttributeValue value);
5+
THeaderValue Serialize(string key, object value);
6+
object Deserialize(string key, THeaderValue value);
77
}
8+

0 commit comments

Comments
 (0)