Skip to content

Commit 06184f9

Browse files
committed
Allow Access to Full Message During Deserialization #403
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
1 parent 73f5d4c commit 06184f9

File tree

61 files changed

+423
-374
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+423
-374
lines changed

src/Host.Plugin.Properties.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<Import Project="Common.NuGet.Properties.xml" />
55

66
<PropertyGroup>
7-
<Version>3.3.0-rc201</Version>
7+
<Version>3.3.0-rc202</Version>
88
</PropertyGroup>
99

1010
</Project>

src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,14 @@ protected async Task Run()
151151
}
152152
}
153153

154+
private static readonly IReadOnlyDictionary<string, object> EmptyHeaders = new Dictionary<string, object>();
155+
154156
private void GetPayloadAndHeadersFromMessage(Message message, out string messagePayload, out Dictionary<string, object> messageHeaders)
155157
{
156158
if (_isSubscribedToTopic)
157159
{
158160
// Note: Messages ariving from SNS topics are wrapped in an envelope like SnsEnvelope type. We need to get the actual message and headers from it.
159-
var snsEnvelope = (SnsEnvelope)MessageSerializer.Deserialize(typeof(SnsEnvelope), message.Body);
161+
var snsEnvelope = (SnsEnvelope)MessageSerializer.Deserialize(typeof(SnsEnvelope), EmptyHeaders, message.Body, message);
160162

161163
messagePayload = snsEnvelope.Message ?? throw new ConsumerMessageBusException("Message of the SNS Envelope was null");
162164
messageHeaders = (snsEnvelope.MessageAttributes ?? throw new ConsumerMessageBusException("Message of the SNS Envelope was null"))

src/SlimMessageBus.Host.AmazonSQS/GlobalUsings.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,3 @@
1111

1212
global using SlimMessageBus.Host.Consumer.ErrorHandling;
1313
global using SlimMessageBus.Host.Serialization;
14-

src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ void AddQueueConsumer(string queue, IMessageProcessor<SqsTransportMessageWithPay
5959
.ToDictionary(x => x.Key, x => x.ToList()))
6060
{
6161
var messageSerializer = GetMessageSerializer(queue);
62-
object MessageProvider(Type messageType, SqsTransportMessageWithPayload transportMessage) => messageSerializer.Deserialize(messageType, transportMessage.Payload);
62+
object MessageProvider(Type messageType, IReadOnlyDictionary<string, object> headers, SqsTransportMessageWithPayload transportMessage) => messageSerializer.Deserialize(messageType, headers, transportMessage.Payload, transportMessage);
6363

6464
var messageProcessor = new MessageProcessor<SqsTransportMessageWithPayload>(
6565
consumerSettings,
@@ -78,7 +78,7 @@ void AddQueueConsumer(string queue, IMessageProcessor<SqsTransportMessageWithPay
7878
var queue = Settings.RequestResponse.GetOrDefault(SqsProperties.UnderlyingQueue);
7979

8080
var messageSerializer = GetMessageSerializer(queue);
81-
object MessageProvider(Type messageType, SqsTransportMessageWithPayload transportMessage) => messageSerializer.Deserialize(messageType, transportMessage.Payload);
81+
object MessageProvider(Type messageType, IReadOnlyDictionary<string, object> headers, SqsTransportMessageWithPayload transportMessage) => messageSerializer.Deserialize(messageType, headers, transportMessage.Payload, transportMessage);
8282

8383
var messageProcessor = new ResponseMessageProcessor<SqsTransportMessageWithPayload>(
8484
LoggerFactory,
@@ -207,7 +207,8 @@ public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBu
207207
});
208208
}
209209

210-
await _clientProviderSqs.Client.SendMessageBatchAsync(new SendMessageBatchRequest(pathMeta.Url, entries), cancellationToken);
210+
var r = new SendMessageBatchRequest(pathMeta.Url, entries);
211+
await _clientProviderSqs.Client.SendMessageBatchAsync(r, cancellationToken);
211212

212213
entries.Clear();
213214

@@ -268,10 +269,9 @@ public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBu
268269
var messageGroupIdProvider = producerSettings.GetOrDefault(SqsProperties.MessageGroupId, null);
269270
var groupId = messageGroupIdProvider?.Invoke(message, messageHeaders);
270271

272+
var messagePayload = messageSerializer.Serialize(messageType, messageHeaders, message, null);
271273
var messageAttributes = GetTransportMessageAttibutes(messageHeaders, headerSerializer);
272274

273-
var messagePayload = messageSerializer.Serialize(messageType, message);
274-
275275
return (messagePayload, messageAttributes, deduplicationId, groupId);
276276
}
277277

src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,24 +55,25 @@ protected override async Task CreateConsumers()
5555
{
5656
await base.CreateConsumers();
5757

58+
MessageProvider<EventData> GetMessageProvider(string path)
59+
=> SerializerProvider.GetSerializer(path).GetMessageProvider<byte[], EventData>(t => t.Body.ToArray());
60+
5861
foreach (var (groupPath, consumerSettings) in Settings.Consumers.GroupBy(x => new GroupPath(path: x.Path, group: x.GetGroup())).ToDictionary(x => x.Key, x => x.ToList()))
5962
{
60-
var messageSerializer = SerializerProvider.GetSerializer(groupPath.Path);
61-
object MessageProvider(Type messageType, EventData transportMessage) => messageSerializer.Deserialize(messageType, transportMessage.Body.ToArray());
63+
var messageProvider = GetMessageProvider(groupPath.Path);
6264

6365
_logger.LogInformation("Creating consumer for Path: {Path}, Group: {Group}", groupPath.Path, groupPath.Group);
64-
AddConsumer(new EhGroupConsumer(consumerSettings, this, groupPath, groupPathPartition => new EhPartitionConsumerForConsumers(this, consumerSettings, groupPathPartition, MessageProvider)));
66+
AddConsumer(new EhGroupConsumer(consumerSettings, this, groupPath, groupPathPartition => new EhPartitionConsumerForConsumers(this, consumerSettings, groupPathPartition, messageProvider)));
6567
}
6668

6769
if (Settings.RequestResponse != null)
6870
{
6971
var groupPath = new GroupPath(Settings.RequestResponse.Path, Settings.RequestResponse.GetGroup());
7072

71-
var messageSerializer = SerializerProvider.GetSerializer(groupPath.Path);
72-
object MessageProvider(Type messageType, EventData transportMessage) => messageSerializer.Deserialize(messageType, transportMessage.Body.ToArray());
73+
var messageProvider = GetMessageProvider(groupPath.Path);
7374

7475
_logger.LogInformation("Creating response consumer for Path: {Path}, Group: {Group}", groupPath.Path, groupPath.Group);
75-
AddConsumer(new EhGroupConsumer([Settings.RequestResponse], this, groupPath, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition, MessageProvider, PendingRequestStore, TimeProvider)));
76+
AddConsumer(new EhGroupConsumer([Settings.RequestResponse], this, groupPath, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition, messageProvider, PendingRequestStore, TimeProvider)));
7677
}
7778
}
7879

@@ -114,13 +115,12 @@ private EventData GetTransportMessage(object message, Type messageType, IDiction
114115
{
115116
OnProduceToTransport(message, messageType, path, messageHeaders);
116117

117-
var messagePayload = message != null
118-
? SerializerProvider.GetSerializer(path).Serialize(messageType, message)
119-
: null;
118+
var transportMessage = new EventData();
120119

121-
var transportMessage = message != null
122-
? new EventData(messagePayload)
123-
: new EventData();
120+
if (message != null)
121+
{
122+
transportMessage.EventBody = new BinaryData(SerializerProvider.GetSerializer(path).Serialize(messageType, messageHeaders, message, transportMessage));
123+
}
124124

125125
if (messageHeaders != null)
126126
{

src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
8585
AddConsumer(consumer);
8686
}
8787

88+
MessageProvider<ServiceBusReceivedMessage> GetMessageProvider(string path)
89+
=> SerializerProvider.GetSerializer(path).GetMessageProvider<byte[], ServiceBusReceivedMessage>(t => t.Body.ToArray());
90+
8891
foreach (var ((path, subscriptionName), consumerSettings) in Settings.Consumers
8992
.GroupBy(x => (x.Path, SubscriptionName: x.GetSubscriptionName(ProviderSettings)))
9093
.ToDictionary(x => x.Key, x => x.ToList()))
@@ -95,14 +98,11 @@ void InitConsumerContext(ServiceBusReceivedMessage m, ConsumerContext ctx)
9598
ctx.SetSubscriptionName(subscriptionName);
9699
}
97100

98-
var messageSerializer = SerializerProvider.GetSerializer(path);
99-
object MessageProvider(Type messageType, ServiceBusReceivedMessage m) => messageSerializer.Deserialize(messageType, m.Body.ToArray());
100-
101101
var topicSubscription = new TopicSubscriptionParams(path: path, subscriptionName: subscriptionName);
102102
var messageProcessor = new MessageProcessor<ServiceBusReceivedMessage>(
103103
consumerSettings,
104104
this,
105-
messageProvider: MessageProvider,
105+
messageProvider: GetMessageProvider(path),
106106
path: path.ToString(),
107107
responseProducer: this,
108108
consumerContextInitializer: InitConsumerContext,
@@ -115,14 +115,11 @@ void InitConsumerContext(ServiceBusReceivedMessage m, ConsumerContext ctx)
115115
{
116116
var path = Settings.RequestResponse.Path;
117117

118-
var messageSerializer = SerializerProvider.GetSerializer(path);
119-
object MessageProvider(Type messageType, ServiceBusReceivedMessage m) => messageSerializer.Deserialize(messageType, m.Body.ToArray());
120-
121118
var topicSubscription = new TopicSubscriptionParams(path, Settings.RequestResponse.GetSubscriptionName(ProviderSettings));
122119
var messageProcessor = new ResponseMessageProcessor<ServiceBusReceivedMessage>(
123120
LoggerFactory,
124121
Settings.RequestResponse,
125-
MessageProvider,
122+
messageProvider: GetMessageProvider(path),
126123
PendingRequestStore,
127124
TimeProvider);
128125

@@ -234,33 +231,34 @@ Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch,
234231

235232
private ServiceBusMessage GetTransportMessage(object message, Type messageType, IDictionary<string, object> messageHeaders, string path)
236233
{
237-
var messagePayload = SerializerProvider.GetSerializer(path).Serialize(messageType, message);
238-
239234
OnProduceToTransport(message, messageType, path, messageHeaders);
240235

241-
var m = messagePayload != null
242-
? new ServiceBusMessage(messagePayload)
243-
: new ServiceBusMessage();
236+
var transportMessage = new ServiceBusMessage();
237+
238+
if (message != null)
239+
{
240+
transportMessage.Body = new BinaryData(SerializerProvider.GetSerializer(path).Serialize(messageType, messageHeaders, message, transportMessage));
241+
}
244242

245243
// add headers
246244
if (messageHeaders != null)
247245
{
248246
foreach (var header in messageHeaders)
249247
{
250-
m.ApplicationProperties.Add(header.Key, header.Value);
248+
transportMessage.ApplicationProperties.Add(header.Key, header.Value);
251249
}
252250
}
253251

254252
// global modifier first
255-
InvokeMessageModifier(message, messageType, m, ProviderSettings);
253+
InvokeMessageModifier(message, messageType, transportMessage, ProviderSettings);
256254
if (messageType != null)
257255
{
258256
// local producer modifier second
259257
var producerSettings = GetProducerSettings(messageType);
260-
InvokeMessageModifier(message, messageType, m, producerSettings);
258+
InvokeMessageModifier(message, messageType, transportMessage, producerSettings);
261259
}
262260

263-
return m;
261+
return transportMessage;
264262
}
265263

266264
private void InvokeMessageModifier(object message, Type messageType, ServiceBusMessage m, HasProviderExtensions settings)

src/SlimMessageBus.Host.Configuration/SlimMessageBus.Host.Configuration.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Description>Core configuration interfaces of SlimMessageBus</Description>
77
<PackageTags>SlimMessageBus</PackageTags>
88
<RootNamespace>SlimMessageBus.Host</RootNamespace>
9-
<Version>3.3.0-rc201</Version>
9+
<Version>3.3.0-rc202</Version>
1010
</PropertyGroup>
1111

1212
<ItemGroup>

src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public static IReadOnlyDictionary<string, object> ToHeaders(this ConsumeResult<I
1616
var headers = new Dictionary<string, object>();
1717
foreach (var header in consumeResult.Message.Headers)
1818
{
19-
var value = headerSerializer.Deserialize(typeof(object), header.GetValueBytes());
19+
var value = headerSerializer.Deserialize(typeof(object), null, header.GetValueBytes(), null);
2020
headers[header.Key] = value;
2121
}
2222

src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumerForConsumers.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,14 @@ namespace SlimMessageBus.Host.Kafka;
88
/// </summary>
99
public class KafkaPartitionConsumerForConsumers : KafkaPartitionConsumer
1010
{
11-
public KafkaPartitionConsumerForConsumers(ILoggerFactory loggerFactory, ConsumerSettings[] consumerSettings, string group, TopicPartition topicPartition, IKafkaCommitController commitController, IMessageSerializer headerSerializer, MessageBusBase messageBus)
11+
public KafkaPartitionConsumerForConsumers(ILoggerFactory loggerFactory,
12+
ConsumerSettings[] consumerSettings,
13+
string group,
14+
TopicPartition topicPartition,
15+
IKafkaCommitController commitController,
16+
IMessageSerializer headerSerializer,
17+
MessageProvider<ConsumeResult> messageProvider,
18+
MessageBusBase messageBus)
1219
: base(
1320
loggerFactory,
1421
consumerSettings,
@@ -21,7 +28,7 @@ public KafkaPartitionConsumerForConsumers(ILoggerFactory loggerFactory, Consumer
2128
messageBus,
2229
path: topicPartition.Topic,
2330
responseProducer: messageBus,
24-
messageProvider: (messageType, transportMessage) => messageBus.SerializerProvider.GetSerializer(topicPartition.Topic).Deserialize(messageType, transportMessage.Message.Value),
31+
messageProvider: messageProvider,
2532
consumerContextInitializer: (m, ctx) => ctx.SetTransportMessage(m),
2633
consumerErrorHandlerOpenGenericType: typeof(IKafkaConsumerErrorHandler<>)))
2734
{

src/SlimMessageBus.Host.Kafka/DefaultKafkaHeaderSerializer.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace SlimMessageBus.Host.Kafka;
22

3+
using System.Collections.Generic;
34
using System.Globalization;
45
using System.Text;
56

@@ -24,14 +25,14 @@ public DefaultKafkaHeaderSerializer(Encoding encoding = null, bool inferType = f
2425

2526
#region Implementation of IMessageSerializer
2627

27-
public byte[] Serialize(Type t, object message)
28+
public byte[] Serialize(Type messageType, IDictionary<string, object> headers, object message, object transportMessage)
2829
{
2930
if (message == null) return null;
3031
var payload = _encoding.GetBytes(Convert.ToString(message, CultureInfo.InvariantCulture));
3132
return payload;
3233
}
3334

34-
public object Deserialize(Type t, byte[] payload)
35+
public object Deserialize(Type messageType, IReadOnlyDictionary<string, object> headers, byte[] payload, object transportMessage)
3536
{
3637
if (payload == null) return null;
3738

0 commit comments

Comments
 (0)