Skip to content

Commit 1ed24d9

Browse files
committed
Publish/Send should pass declared producer message type (Produce<T>) even if sending derived message type
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
1 parent 963fa28 commit 1ed24d9

File tree

10 files changed

+343
-264
lines changed

10 files changed

+343
-264
lines changed

docs/intro.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,13 @@ await bus.Publish(new CustomerCreatedEvent { });
10201020
await bus.Publish(new CustomerChangedEvent { });
10211021
```
10221022

1023+
> **Note on Serialization with Polymorphic Messages:**
1024+
> When publishing a derived message type (e.g., `CustomerCreatedEvent`) with only the base type configured (e.g., `Produce<CustomerEvent>`), the serializer's `Serialize` method will receive the **configured type** from the `Produce<T>()` declaration (i.e., `CustomerEvent`), not the actual runtime type (`CustomerCreatedEvent`). This allows serializers to maintain consistent type information in headers and ensures proper polymorphic behavior.
1025+
>
1026+
> However, if you explicitly declare `Produce<CustomerCreatedEvent>()` for the derived type, the serializer will receive `CustomerCreatedEvent` instead. SMB will use the most specific matching producer configuration available for each message type.
1027+
>
1028+
> This behavior applies to both single message publishing via `Publish()` and bulk publishing with collections of messages.
1029+
10231030
#### Polymorphic consumer
10241031

10251032
Given the following consumers:

docs/intro.t.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,13 @@ await bus.Publish(new CustomerCreatedEvent { });
10201020
await bus.Publish(new CustomerChangedEvent { });
10211021
```
10221022

1023+
> **Note on Serialization with Polymorphic Messages:**
1024+
> When publishing a derived message type (e.g., `CustomerCreatedEvent`) with only the base type configured (e.g., `Produce<CustomerEvent>`), the serializer's `Serialize` method will receive the **configured type** from the `Produce<T>()` declaration (i.e., `CustomerEvent`), not the actual runtime type (`CustomerCreatedEvent`). This allows serializers to maintain consistent type information in headers and ensures proper polymorphic behavior.
1025+
>
1026+
> However, if you explicitly declare `Produce<CustomerCreatedEvent>()` for the derived type, the serializer will receive `CustomerCreatedEvent` instead. SMB will use the most specific matching producer configuration available for each message type.
1027+
>
1028+
> This behavior applies to both single message publishing via `Publish()` and bulk publishing with collections of messages.
1029+
10231030
#### Polymorphic consumer
10241031

10251032
Given the following consumers:

docs/provider_amazon_sqs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ services.AddSlimMessageBus((mbb) =>
3737
cfg.UseRegion(Amazon.RegionEndpoint.EUCentral1);
3838

3939
// Use static credentials: https://docs.aws.amazon.com/sdkref/latest/guide/access-iam-users.html
40-
cfg.UseStaticCredentials(accessKey, secretAccessKey, SqsMessageBusMode.All);
40+
cfg.UseStaticCredentials(accessKey, secretAccessKey, SqsMessageBusModes.All);
4141

4242
// Use default credentials pulled from environment variables (EC2, ECS, Fargate, etc.):
4343
// cfg.UseDefaultCredentials(); // This is the default, so you can skip this line if you want to use the default credentials.

src/Host.Plugin.Properties.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<Import Project="Common.NuGet.Properties.xml" />
55

66
<PropertyGroup>
7-
<Version>3.3.5</Version>
7+
<Version>3.3.6</Version>
88
</PropertyGroup>
99

1010
</Project>

src/SlimMessageBus.Host/MessageBusBase.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,8 @@ public async virtual Task ProducePublish(object message, string path = null, IDi
441441
// produce multiple messages to transport
442442
var messages = collectionInfo
443443
.ToCollection(message)
444-
.Select(m => new BulkMessageEnvelope(m, messageType, GetMessageHeaders(m, headers, producerSettings)))
444+
// Note: We want to send the producer message type here to preserve polymorphic behavior of the serializers
445+
.Select(m => new BulkMessageEnvelope(m, producerSettings.MessageType, GetMessageHeaders(m, headers, producerSettings)))
445446
.ToList();
446447

447448
var result = await ProduceToTransportBulk(messages, path, targetBus, cancellationToken);
@@ -482,7 +483,8 @@ public async virtual Task ProducePublish(object message, string path = null, IDi
482483
}
483484

484485
// produce a single message to transport
485-
await ProduceToTransport(message, messageType, path, messageHeaders, targetBus, cancellationToken);
486+
// Note: We want to send the producer message type here to preserve polymorphic behavior of the serializers
487+
await ProduceToTransport(message, producerSettings.MessageType, path, messageHeaders, targetBus, cancellationToken);
486488
}
487489

488490
protected static string GetProducerErrorMessage(string path, object message, Type messageType, Exception ex)
@@ -570,7 +572,8 @@ public virtual async Task<TResponse> ProduceSend<TResponse>(object request, stri
570572
return await pipeline.Next();
571573
}
572574

573-
return await SendInternal<TResponse>(request, path, requestType, responseType, producerSettings, created, expires, requestId, requestHeaders, targetBus, cancellationToken);
575+
// Note: We want to send the producer message type here to preserve polymorphic behavior of the serializers
576+
return await SendInternal<TResponse>(request, path, producerSettings.MessageType, responseType, producerSettings, created, expires, requestId, requestHeaders, targetBus, cancellationToken);
574577
}
575578

576579
protected async internal virtual Task<TResponseMessage> SendInternal<TResponseMessage>(object request, string path, Type requestType, Type responseType, ProducerSettings producerSettings, DateTimeOffset created, DateTimeOffset expires, string requestId, IDictionary<string, object> requestHeaders, IMessageBusTarget targetBus, CancellationToken cancellationToken)
Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,59 @@
11
namespace SlimMessageBus.Host;
2-
3-
internal class PublishInterceptorPipeline : ProducerInterceptorPipeline<PublishContext>
4-
{
5-
private readonly ITransportProducer _bus;
6-
private readonly Func<object, object, Func<Task>, IProducerContext, Task> _publishInterceptorFunc;
7-
private IEnumerator<object> _publishInterceptorsEnumerator;
8-
private bool _publishInterceptorsVisited = false;
9-
10-
public PublishInterceptorPipeline(ITransportProducer bus, RuntimeTypeCache runtimeTypeCache, object message, ProducerSettings producerSettings, IMessageBusTarget targetBus, PublishContext context, IEnumerable<object> producerInterceptors, IEnumerable<object> publishInterceptors)
11-
: base(runtimeTypeCache, message, producerSettings, targetBus, context, producerInterceptors)
12-
{
13-
_bus = bus;
14-
_publishInterceptorFunc = runtimeTypeCache.PublishInterceptorType[message.GetType()];
15-
_publishInterceptorsVisited = publishInterceptors is null;
16-
_publishInterceptorsEnumerator = publishInterceptors?.GetEnumerator();
17-
}
18-
19-
public async Task<object> Next()
20-
{
21-
if (!_producerInterceptorsVisited)
22-
{
23-
if (_producerInterceptorsEnumerator.MoveNext())
24-
{
25-
return await _producerInterceptorFunc(_producerInterceptorsEnumerator.Current, _message, Next, _context);
26-
}
27-
_producerInterceptorsVisited = true;
28-
_producerInterceptorsEnumerator = null;
29-
}
30-
31-
if (!_publishInterceptorsVisited)
32-
{
33-
if (_publishInterceptorsEnumerator.MoveNext())
34-
{
35-
await _publishInterceptorFunc(_publishInterceptorsEnumerator.Current, _message, Next, _context);
36-
return null;
37-
}
38-
_publishInterceptorsVisited = true;
39-
_publishInterceptorsEnumerator = null;
40-
}
41-
42-
if (!_targetVisited)
43-
{
44-
_targetVisited = true;
45-
await _bus.ProduceToTransport(_message,
46-
_message.GetType(),
47-
_context.Path,
48-
_context.Headers,
49-
_targetBus,
50-
_context.CancellationToken);
51-
return null;
52-
}
53-
54-
// throw exception as it should never happen
55-
throw new PublishMessageBusException("The next() was invoked more than once on one of the provided interceptors");
56-
}
57-
}
2+
3+
internal class PublishInterceptorPipeline : ProducerInterceptorPipeline<PublishContext>
4+
{
5+
private readonly ITransportProducer _bus;
6+
private readonly Func<object, object, Func<Task>, IProducerContext, Task> _publishInterceptorFunc;
7+
private IEnumerator<object> _publishInterceptorsEnumerator;
8+
private bool _publishInterceptorsVisited = false;
9+
10+
public PublishInterceptorPipeline(ITransportProducer bus, RuntimeTypeCache runtimeTypeCache, object message, ProducerSettings producerSettings, IMessageBusTarget targetBus, PublishContext context, IEnumerable<object> producerInterceptors, IEnumerable<object> publishInterceptors)
11+
: base(runtimeTypeCache, message, producerSettings, targetBus, context, producerInterceptors)
12+
{
13+
_bus = bus;
14+
_publishInterceptorFunc = runtimeTypeCache.PublishInterceptorType[message.GetType()];
15+
_publishInterceptorsVisited = publishInterceptors is null;
16+
_publishInterceptorsEnumerator = publishInterceptors?.GetEnumerator();
17+
}
18+
19+
public async Task<object> Next()
20+
{
21+
if (!_producerInterceptorsVisited)
22+
{
23+
if (_producerInterceptorsEnumerator.MoveNext())
24+
{
25+
return await _producerInterceptorFunc(_producerInterceptorsEnumerator.Current, _message, Next, _context);
26+
}
27+
_producerInterceptorsVisited = true;
28+
_producerInterceptorsEnumerator = null;
29+
}
30+
31+
if (!_publishInterceptorsVisited)
32+
{
33+
if (_publishInterceptorsEnumerator.MoveNext())
34+
{
35+
await _publishInterceptorFunc(_publishInterceptorsEnumerator.Current, _message, Next, _context);
36+
return null;
37+
}
38+
_publishInterceptorsVisited = true;
39+
_publishInterceptorsEnumerator = null;
40+
}
41+
42+
if (!_targetVisited)
43+
{
44+
_targetVisited = true;
45+
// Note: We want to send the producer message type here to preserve polymorphic behavior of the serializers
46+
await _bus.ProduceToTransport(_message,
47+
_producerSettings.MessageType,
48+
_context.Path,
49+
_context.Headers,
50+
_targetBus,
51+
_context.CancellationToken);
52+
return null;
53+
}
54+
55+
// throw exception as it should never happen
56+
throw new PublishMessageBusException("The next() was invoked more than once on one of the provided interceptors");
57+
}
58+
}
59+

src/SlimMessageBus.Host/Producer/InterceptorPipelines/SendInterceptorPipeline.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ public async Task<TResponse> Next()
4747
_targetVisited = true;
4848
var response = await _bus.SendInternal<TResponse>(_message,
4949
_context.Path,
50-
_message.GetType(),
50+
// Note: We want to send the producer message type here to preserve polymorphic behavior of the serializers
51+
_producerSettings.MessageType,
5152
typeof(TResponse),
5253
_producerSettings,
5354
_context.Created,
Lines changed: 67 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,70 @@
11
namespace SlimMessageBus.Host.Test;
22

3-
using SlimMessageBus.Host.Collections;
4-
using SlimMessageBus.Host.Interceptor;
5-
6-
public class PublishInterceptorPipelineTests
7-
{
8-
private readonly MessageBusMock _busMock = new();
9-
10-
[Theory]
11-
[InlineData(false, false)]
12-
[InlineData(true, false)]
13-
[InlineData(true, true)]
14-
public async Task When_Next_Then_InterceptorIsCalledAndTargetIsCalled(bool producerInterceptorRegistered, bool publishInterceptorRegistered)
15-
{
16-
// arrange
17-
var message = new SomeMessage();
18-
var topic = "topic1";
19-
20-
var producerInterceptorMock = new Mock<IProducerInterceptor<SomeMessage>>();
21-
producerInterceptorMock
22-
.Setup(x => x.OnHandle(message, It.IsAny<Func<Task<object>>>(), It.IsAny<IProducerContext>()))
23-
.Returns((SomeMessage message, Func<Task<object>> next, IProducerContext context) => next());
24-
25-
var producerInterceptors = producerInterceptorRegistered ? new[] { producerInterceptorMock.Object } : null;
26-
27-
var publishInterceptorMock = new Mock<IPublishInterceptor<SomeMessage>>();
28-
publishInterceptorMock
29-
.Setup(x => x.OnHandle(message, It.IsAny<Func<Task>>(), It.IsAny<IProducerContext>()))
30-
.Returns((SomeMessage message, Func<Task> next, IProducerContext context) => next());
31-
32-
var publishInterceptors = publishInterceptorRegistered ? new[] { publishInterceptorMock.Object } : null;
33-
34-
var producerSettings = new ProducerBuilder<SomeMessage>(new ProducerSettings())
35-
.DefaultTopic(topic)
36-
.Settings;
37-
38-
var context = new PublishContext
39-
{
40-
Path = topic,
41-
Headers = new Dictionary<string, object>(),
42-
};
43-
44-
_busMock.BusMock
45-
.Setup(x => x.ProduceToTransport(message, message.GetType(), context.Path, context.Headers, _busMock.Bus.MessageBusTarget, context.CancellationToken))
46-
.Returns(() => Task.FromResult<object>(null));
47-
48-
var subject = new PublishInterceptorPipeline(_busMock.Bus, new RuntimeTypeCache(), message, producerSettings, _busMock.Bus.MessageBusTarget, context, producerInterceptors: producerInterceptors, publishInterceptors: publishInterceptors);
49-
50-
// act
51-
var result = await subject.Next();
52-
53-
// assert
54-
result.Should().BeNull();
55-
56-
if (producerInterceptorRegistered)
57-
{
58-
producerInterceptorMock.Verify(x => x.OnHandle(message, It.IsAny<Func<Task<object>>>(), It.IsAny<IProducerContext>()), Times.Once);
59-
}
60-
producerInterceptorMock.VerifyNoOtherCalls();
61-
62-
if (publishInterceptorRegistered)
63-
{
64-
publishInterceptorMock.Verify(x => x.OnHandle(message, It.IsAny<Func<Task>>(), It.IsAny<IProducerContext>()), Times.Once);
65-
}
66-
publishInterceptorMock.VerifyNoOtherCalls();
67-
68-
_busMock.BusMock.Verify(x => x.ProduceToTransport(message, message.GetType(), context.Path, context.Headers, _busMock.Bus.MessageBusTarget, context.CancellationToken), Times.Once);
69-
}
3+
using SlimMessageBus.Host.Collections;
4+
using SlimMessageBus.Host.Interceptor;
5+
6+
public class PublishInterceptorPipelineTests
7+
{
8+
private readonly MessageBusMock _busMock = new();
9+
10+
[Theory]
11+
[InlineData(false, false)]
12+
[InlineData(true, false)]
13+
[InlineData(true, true)]
14+
public async Task When_Next_Then_InterceptorIsCalledAndTargetIsCalled(bool producerInterceptorRegistered, bool publishInterceptorRegistered)
15+
{
16+
// arrange
17+
var message = new SomeMessage();
18+
var topic = "topic1";
19+
20+
var producerInterceptorMock = new Mock<IProducerInterceptor<SomeMessage>>();
21+
producerInterceptorMock
22+
.Setup(x => x.OnHandle(message, It.IsAny<Func<Task<object>>>(), It.IsAny<IProducerContext>()))
23+
.Returns((SomeMessage message, Func<Task<object>> next, IProducerContext context) => next());
24+
25+
var producerInterceptors = producerInterceptorRegistered ? new[] { producerInterceptorMock.Object } : null;
26+
27+
var publishInterceptorMock = new Mock<IPublishInterceptor<SomeMessage>>();
28+
publishInterceptorMock
29+
.Setup(x => x.OnHandle(message, It.IsAny<Func<Task>>(), It.IsAny<IProducerContext>()))
30+
.Returns((SomeMessage message, Func<Task> next, IProducerContext context) => next());
31+
32+
var publishInterceptors = publishInterceptorRegistered ? new[] { publishInterceptorMock.Object } : null;
33+
34+
var producerSettings = new ProducerBuilder<SomeMessage>(new ProducerSettings())
35+
.DefaultTopic(topic)
36+
.Settings;
37+
38+
var context = new PublishContext
39+
{
40+
Path = topic,
41+
Headers = new Dictionary<string, object>(),
42+
};
43+
44+
_busMock.BusMock
45+
.Setup(x => x.ProduceToTransport(message, producerSettings.MessageType, context.Path, context.Headers, _busMock.Bus.MessageBusTarget, context.CancellationToken))
46+
.Returns(() => Task.FromResult<object>(null));
47+
48+
var subject = new PublishInterceptorPipeline(_busMock.Bus, new RuntimeTypeCache(), message, producerSettings, _busMock.Bus.MessageBusTarget, context, producerInterceptors: producerInterceptors, publishInterceptors: publishInterceptors);
49+
50+
// act
51+
var result = await subject.Next();
52+
53+
// assert
54+
result.Should().BeNull();
55+
56+
if (producerInterceptorRegistered)
57+
{
58+
producerInterceptorMock.Verify(x => x.OnHandle(message, It.IsAny<Func<Task<object>>>(), It.IsAny<IProducerContext>()), Times.Once);
59+
}
60+
producerInterceptorMock.VerifyNoOtherCalls();
61+
62+
if (publishInterceptorRegistered)
63+
{
64+
publishInterceptorMock.Verify(x => x.OnHandle(message, It.IsAny<Func<Task>>(), It.IsAny<IProducerContext>()), Times.Once);
65+
}
66+
publishInterceptorMock.VerifyNoOtherCalls();
67+
68+
_busMock.BusMock.Verify(x => x.ProduceToTransport(message, producerSettings.MessageType, context.Path, context.Headers, _busMock.Bus.MessageBusTarget, context.CancellationToken), Times.Once);
69+
}
7070
}

0 commit comments

Comments
 (0)