Skip to content

Commit cf2f3b7

Browse files
committed
Per path Serializer
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
1 parent 9acab68 commit cf2f3b7

File tree

54 files changed

+469
-328
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+469
-328
lines changed

src/Host.Plugin.Properties.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<Import Project="Common.NuGet.Properties.xml" />
55

66
<PropertyGroup>
7-
<Version>3.0.2</Version>
7+
<Version>3.1.0-rc100</Version>
88
</PropertyGroup>
99

1010
</Project>

src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ public class SqsMessageBus : MessageBusBase<SqsMessageBusSettings>
77
private readonly ILogger _logger;
88
private readonly ISqsClientProvider _clientProvider;
99
private readonly Dictionary<string, string> _queueUrlByPath = [];
10-
private readonly IMessageSerializer<string> _messageSerializer;
1110

1211
public ISqsHeaderSerializer HeaderSerializer { get; }
1312

@@ -16,8 +15,6 @@ public SqsMessageBus(MessageBusSettings settings, SqsMessageBusSettings provider
1615
{
1716
_logger = LoggerFactory.CreateLogger<SqsMessageBus>();
1817
_clientProvider = settings.ServiceProvider.GetRequiredService<ISqsClientProvider>();
19-
_messageSerializer = Serializer as IMessageSerializer<string>
20-
?? throw new ConfigurationMessageBusException($"Serializer for Amazon SQS must be able to serialize into a string (it needs to implement {nameof(IMessageSerializer<string>)})");
2118
HeaderSerializer = providerSettings.SqsHeaderSerializer;
2219
OnBuildProvider();
2320
}
@@ -28,6 +25,9 @@ protected override void Build()
2825
InitTaskList.Add(InitAsync, CancellationToken);
2926
}
3027

28+
private IMessageSerializer<string> GetMessageSerializer(string path) => SerializerProvider.GetSerializer(path) as IMessageSerializer<string>
29+
?? throw new ConfigurationMessageBusException($"Serializer for Amazon SQS must be able to serialize into a string (it needs to implement {nameof(IMessageSerializer<string>)})");
30+
3131
protected override async Task CreateConsumers()
3232
{
3333
await base.CreateConsumers();
@@ -44,12 +44,13 @@ void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor<Message>
4444

4545
static void InitConsumerContext(Message m, ConsumerContext ctx) => ctx.SetTransportMessage(m);
4646

47-
object MessageProvider(Type messageType, Message transportMessage) => _messageSerializer.Deserialize(messageType, transportMessage.Body);
48-
4947
foreach (var ((path, pathKind), consumerSettings) in Settings.Consumers
5048
.GroupBy(x => (x.Path, x.PathKind))
5149
.ToDictionary(x => x.Key, x => x.ToList()))
5250
{
51+
var messageSerializer = GetMessageSerializer(path);
52+
object MessageProvider(Type messageType, Message transportMessage) => messageSerializer.Deserialize(messageType, transportMessage.Body);
53+
5354
var messageProcessor = new MessageProcessor<Message>(
5455
consumerSettings,
5556
this,
@@ -64,6 +65,11 @@ void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor<Message>
6465

6566
if (Settings.RequestResponse != null)
6667
{
68+
var path = Settings.RequestResponse.Path;
69+
70+
var messageSerializer = GetMessageSerializer(path);
71+
object MessageProvider(Type messageType, Message transportMessage) => messageSerializer.Deserialize(messageType, transportMessage.Body);
72+
6773
var messageProcessor = new ResponseMessageProcessor<Message>(
6874
LoggerFactory,
6975
Settings.RequestResponse,
@@ -72,7 +78,7 @@ void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor<Message>
7278
CurrentTimeProvider);
7379

7480
AddConsumerFrom(
75-
Settings.RequestResponse.Path,
81+
path,
7682
Settings.RequestResponse.PathKind,
7783
messageProcessor,
7884
[Settings.RequestResponse]);
@@ -150,10 +156,11 @@ public override async Task ProduceToTransport(object message, Type messageType,
150156
{
151157
OnProduceToTransport(message, messageType, path, messageHeaders);
152158

159+
var messageSerializer = GetMessageSerializer(path);
153160
var queueUrl = GetQueueUrlOrException(path);
154161
try
155162
{
156-
var (payload, attributes, deduplicationId, groupId) = GetTransportMessage(message, messageType, messageHeaders);
163+
var (payload, attributes, deduplicationId, groupId) = GetTransportMessage(message, messageType, messageHeaders, messageSerializer);
157164

158165
await _clientProvider.Client.SendMessageAsync(new SendMessageRequest(queueUrl, payload)
159166
{
@@ -176,6 +183,7 @@ public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBu
176183
var dispatched = new List<T>(envelopes.Count);
177184
try
178185
{
186+
var messageSerializer = GetMessageSerializer(path);
179187
var queueUrl = GetQueueUrlOrException(path);
180188

181189
var entries = new List<SendMessageBatchRequestEntry>(MaxMessagesInBatch);
@@ -185,7 +193,7 @@ public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBu
185193
{
186194
foreach (var envelope in envelopeChunk)
187195
{
188-
var (payload, attributes, deduplicationId, groupId) = GetTransportMessage(envelope.Message, envelope.MessageType, envelope.Headers);
196+
var (payload, attributes, deduplicationId, groupId) = GetTransportMessage(envelope.Message, envelope.MessageType, envelope.Headers, messageSerializer);
189197

190198
entries.Add(new SendMessageBatchRequestEntry(Guid.NewGuid().ToString(), payload)
191199
{
@@ -211,7 +219,7 @@ public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBu
211219
}
212220
}
213221

214-
private (string Payload, Dictionary<string, MessageAttributeValue> Attributes, string DeduplicationId, string GroupId) GetTransportMessage(object message, Type messageType, IDictionary<string, object> messageHeaders)
222+
private (string Payload, Dictionary<string, MessageAttributeValue> Attributes, string DeduplicationId, string GroupId) GetTransportMessage(object message, Type messageType, IDictionary<string, object> messageHeaders, IMessageSerializer<string> messageSerializer)
215223
{
216224
var producerSettings = GetProducerSettings(messageType);
217225

@@ -232,7 +240,7 @@ public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBu
232240
}
233241
}
234242

235-
var messagePayload = _messageSerializer.Serialize(messageType, message);
243+
var messagePayload = messageSerializer.Serialize(messageType, message);
236244
return (messagePayload, messageAttributes, deduplicationId, groupId);
237245
}
238246
}

src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,24 @@ protected override async Task CreateConsumers()
5555
{
5656
await base.CreateConsumers();
5757

58-
object MessageProvider(Type messageType, EventData transportMessage) => Serializer.Deserialize(messageType, transportMessage.Body.ToArray());
59-
6058
foreach (var (groupPath, consumerSettings) in Settings.Consumers.GroupBy(x => new GroupPath(path: x.Path, group: x.GetGroup())).ToDictionary(x => x.Key, x => x.ToList()))
6159
{
60+
var messageSerializer = SerializerProvider.GetSerializer(groupPath.Path);
61+
object MessageProvider(Type messageType, EventData transportMessage) => messageSerializer.Deserialize(messageType, transportMessage.Body.ToArray());
62+
6263
_logger.LogInformation("Creating consumer for Path: {Path}, Group: {Group}", groupPath.Path, groupPath.Group);
6364
AddConsumer(new EhGroupConsumer(consumerSettings, this, groupPath, groupPathPartition => new EhPartitionConsumerForConsumers(this, consumerSettings, groupPathPartition, MessageProvider)));
6465
}
6566

6667
if (Settings.RequestResponse != null)
6768
{
68-
var pathGroup = new GroupPath(Settings.RequestResponse.Path, Settings.RequestResponse.GetGroup());
69-
_logger.LogInformation("Creating response consumer for Path: {Path}, Group: {Group}", pathGroup.Path, pathGroup.Group);
70-
AddConsumer(new EhGroupConsumer([Settings.RequestResponse], this, pathGroup, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition, MessageProvider, PendingRequestStore, CurrentTimeProvider)));
69+
var groupPath = new GroupPath(Settings.RequestResponse.Path, Settings.RequestResponse.GetGroup());
70+
71+
var messageSerializer = SerializerProvider.GetSerializer(groupPath.Path);
72+
object MessageProvider(Type messageType, EventData transportMessage) => messageSerializer.Deserialize(messageType, transportMessage.Body.ToArray());
73+
74+
_logger.LogInformation("Creating response consumer for Path: {Path}, Group: {Group}", groupPath.Path, groupPath.Group);
75+
AddConsumer(new EhGroupConsumer([Settings.RequestResponse], this, groupPath, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition, MessageProvider, PendingRequestStore, CurrentTimeProvider)));
7176
}
7277

7378
}
@@ -111,7 +116,7 @@ private EventData GetTransportMessage(object message, Type messageType, IDiction
111116
OnProduceToTransport(message, messageType, path, messageHeaders);
112117

113118
var messagePayload = message != null
114-
? Serializer.Serialize(messageType, message)
119+
? SerializerProvider.GetSerializer(path).Serialize(messageType, message)
115120
: null;
116121

117122
var transportMessage = message != null

src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,14 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
8686
}
8787

8888
static void InitConsumerContext(ServiceBusReceivedMessage m, ConsumerContext ctx) => ctx.SetTransportMessage(m);
89-
object MessageProvider(Type messageType, ServiceBusReceivedMessage m) => Serializer.Deserialize(messageType, m.Body.ToArray());
9089

9190
foreach (var ((path, subscriptionName), consumerSettings) in Settings.Consumers
9291
.GroupBy(x => (x.Path, SubscriptionName: x.GetSubscriptionName(ProviderSettings)))
9392
.ToDictionary(x => x.Key, x => x.ToList()))
9493
{
94+
var messageSerializer = SerializerProvider.GetSerializer(path);
95+
object MessageProvider(Type messageType, ServiceBusReceivedMessage m) => messageSerializer.Deserialize(messageType, m.Body.ToArray());
96+
9597
var topicSubscription = new TopicSubscriptionParams(path: path, subscriptionName: subscriptionName);
9698
var messageProcessor = new MessageProcessor<ServiceBusReceivedMessage>(
9799
consumerSettings,
@@ -107,7 +109,12 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
107109

108110
if (Settings.RequestResponse != null)
109111
{
110-
var topicSubscription = new TopicSubscriptionParams(Settings.RequestResponse.Path, Settings.RequestResponse.GetSubscriptionName(ProviderSettings));
112+
var path = Settings.RequestResponse.Path;
113+
114+
var messageSerializer = SerializerProvider.GetSerializer(path);
115+
object MessageProvider(Type messageType, ServiceBusReceivedMessage m) => messageSerializer.Deserialize(messageType, m.Body.ToArray());
116+
117+
var topicSubscription = new TopicSubscriptionParams(path, Settings.RequestResponse.GetSubscriptionName(ProviderSettings));
111118
var messageProcessor = new ResponseMessageProcessor<ServiceBusReceivedMessage>(
112119
LoggerFactory,
113120
Settings.RequestResponse,
@@ -223,7 +230,7 @@ Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch,
223230

224231
private ServiceBusMessage GetTransportMessage(object message, Type messageType, IDictionary<string, object> messageHeaders, string path)
225232
{
226-
var messagePayload = Serializer.Serialize(messageType, message);
233+
var messagePayload = SerializerProvider.GetSerializer(path).Serialize(messageType, message);
227234

228235
OnProduceToTransport(message, messageType, path, messageHeaders);
229236

src/SlimMessageBus.Host.Configuration/Builders/ISerializationBuilder.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22

33
public interface ISerializationBuilder
44
{
5-
void RegisterSerializer<TMessageSerializer>(Action<IServiceCollection> services) where TMessageSerializer : class, IMessageSerializer;
5+
void RegisterSerializer<TMessageSerializerProvider>(Action<IServiceCollection> services)
6+
where TMessageSerializerProvider : class, IMessageSerializerProvider;
67
}

src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,11 @@ public MessageBusBuilder WithSerializer(Type serializerType)
258258
return this;
259259
}
260260

261-
public void RegisterSerializer<TMessageSerializer>(Action<IServiceCollection> services)
262-
where TMessageSerializer : class, IMessageSerializer
261+
public void RegisterSerializer<TMessageSerializerProvider>(Action<IServiceCollection> services)
262+
where TMessageSerializerProvider : class, IMessageSerializerProvider
263263
{
264264
PostConfigurationActions.Add(services);
265-
PostConfigurationActions.Add(services => services.TryAddSingleton<IMessageSerializer>(sp => sp.GetRequiredService<TMessageSerializer>()));
265+
PostConfigurationActions.Add(services => services.TryAddSingleton<IMessageSerializerProvider>(sp => sp.GetRequiredService<TMessageSerializerProvider>()));
266266
}
267267

268268
public MessageBusBuilder WithDependencyResolver(IServiceProvider serviceProvider)

src/SlimMessageBus.Host.Configuration/Settings/MessageBusSettings.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public MessageBusSettings(MessageBusSettings parent = null)
4444
_children = [];
4545
Producers = [];
4646
Consumers = [];
47-
SerializerType = typeof(IMessageSerializer);
47+
SerializerType = typeof(IMessageSerializerProvider);
4848
AutoStartConsumers = true;
4949

5050
if (parent != null)

src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Description>Core configuration interfaces of SlimMessageBus</Description>
77
<PackageTags>SlimMessageBus</PackageTags>
88
<RootNamespace>SlimMessageBus.Host</RootNamespace>
9-
<Version>3.0.0</Version>
9+
<Version>3.1.0-rc100</Version>
1010
</PropertyGroup>
1111

1212
<ItemGroup>

src/SlimMessageBus.Host.Kafka/Configs/KafkaMessageBusSettings.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class KafkaMessageBusSettings
3535
/// <summary>
3636
/// Serializer used to serialize Kafka message header values. If not specified the default serializer will be used (setup as part of the bus config). By default the <see cref="DefaultKafkaHeaderSerializer"/> is used.
3737
/// </summary>
38-
public IMessageSerializer HeaderSerializer { get; set; }
38+
public IMessageSerializerProvider HeaderSerializer { get; set; }
3939

4040
/// <summary>
4141
/// Should the commit on partitions for the consumed messages happen when the bus is stopped (or disposed)?

src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumerForConsumers.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public KafkaPartitionConsumerForConsumers(ILoggerFactory loggerFactory, Consumer
2121
messageBus,
2222
path: topicPartition.Topic,
2323
responseProducer: messageBus,
24-
messageProvider: (messageType, transportMessage) => messageBus.Serializer.Deserialize(messageType, transportMessage.Message.Value),
24+
messageProvider: (messageType, transportMessage) => messageBus.SerializerProvider.GetSerializer(topicPartition.Topic).Deserialize(messageType, transportMessage.Message.Value),
2525
consumerContextInitializer: (m, ctx) => ctx.SetTransportMessage(m),
2626
consumerErrorHandlerOpenGenericType: typeof(IKafkaConsumerErrorHandler<>)))
2727
{

0 commit comments

Comments
 (0)