Skip to content

Commit c49561c

Browse files
Use RabbitMQ BasicProperties (#156)
1 parent 1095ff3 commit c49561c

File tree

6 files changed

+33
-19
lines changed

6 files changed

+33
-19
lines changed

src/MyServiceBus.RabbitMq/RabbitMqQueueSendTransport.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ public async Task Send<T>(T message, SendContext context, CancellationToken canc
2222
{
2323
var body = await context.Serialize(message);
2424

25-
var props = _channel.CreateBasicProperties();
26-
props.Persistent = true;
25+
var props = new BasicProperties
26+
{
27+
Persistent = true
28+
};
2729

2830
if (context.Headers != null)
2931
{

src/MyServiceBus.RabbitMq/RabbitMqSendTransport.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ public async Task Send<T>(T message, SendContext context, CancellationToken canc
1818
{
1919
var body = await context.Serialize(message);
2020

21-
var props = _channel.CreateBasicProperties();
22-
props.Persistent = true;
21+
var props = new BasicProperties
22+
{
23+
Persistent = true
24+
};
2325

2426
if (context.Headers != null)
2527
{

src/MyServiceBus.RabbitMq/RabbitMqTransportFactory.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public RabbitMqTransportFactory(ConnectionProvider connectionProvider)
1818
_connectionProvider = connectionProvider;
1919
}
2020

21-
[Throws(typeof(OverflowException), typeof(InvalidOperationException), typeof(ArgumentException))]
21+
[Throws(typeof(OverflowException), typeof(InvalidOperationException), typeof(ArgumentException), typeof(OperationCanceledException))]
2222
public async Task<ISendTransport> GetSendTransport(Uri address, CancellationToken cancellationToken = default)
2323
{
2424
string? exchange = null;
@@ -133,6 +133,7 @@ await channel.QueueDeclareAsync(
133133
return sendTransport;
134134
}
135135

136+
[Throws(typeof(ObjectDisposedException))]
136137
public async Task<IReceiveTransport> CreateReceiveTransport(
137138
ReceiveEndpointTopology topology,
138139
Func<ReceiveContext, Task> handler,
@@ -150,6 +151,7 @@ await channel.ExchangeDeclareAsync(
150151
);
151152

152153
var hasErrorQueue = !topology.AutoDelete;
154+
IDictionary<string, object?>? mainQueueArguments = null;
153155

154156
if (hasErrorQueue)
155157
{
@@ -178,13 +180,19 @@ await channel.QueueBindAsync(
178180
routingKey: string.Empty,
179181
cancellationToken: cancellationToken
180182
);
183+
184+
mainQueueArguments = new Dictionary<string, object?>
185+
{
186+
["x-dead-letter-exchange"] = errorExchange
187+
};
181188
}
182189

183190
await channel.QueueDeclareAsync(
184191
queue: topology.QueueName,
185192
durable: topology.Durable,
186193
exclusive: false,
187194
autoDelete: topology.AutoDelete,
195+
arguments: mainQueueArguments,
188196
cancellationToken: cancellationToken
189197
);
190198

test/MyServiceBus.RabbitMq.Tests/HeaderEncodingTests.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
using System.Text.Json;
99
using System.Threading;
1010
using System.Threading.Tasks;
11+
using MyServiceBus;
12+
using MyServiceBus.Serialization;
1113

1214
namespace MyServiceBus.RabbitMq.Tests;
1315

@@ -22,21 +24,20 @@ class FaultingConsumer : IConsumer<TestMessage>
2224
}
2325

2426
[Fact]
25-
[Throws(typeof(UriFormatException), typeof(ArgumentException), typeof(InvalidOperationException))]
27+
[Throws(typeof(UriFormatException), typeof(ArgumentException), typeof(InvalidOperationException), typeof(JsonException))]
2628
public async Task Faulted_message_headers_include_mt_prefix()
2729
{
2830
var channel = Substitute.For<IChannel>();
29-
IBasicProperties? captured = null;
30-
channel.CreateBasicProperties().Returns(new BasicProperties());
31+
BasicProperties? captured = null;
3132
channel.BasicPublishAsync(
3233
Arg.Any<string>(),
3334
Arg.Any<string>(),
3435
Arg.Any<bool>(),
35-
Arg.Any<IBasicProperties>(),
36+
Arg.Any<BasicProperties>(),
3637
Arg.Any<ReadOnlyMemory<byte>>(),
3738
Arg.Any<CancellationToken>())
38-
.Returns(Task.CompletedTask)
39-
.AndDoes(ci => captured = ci.Arg<IBasicProperties>());
39+
.Returns(_ => ValueTask.CompletedTask)
40+
.AndDoes(ci => captured = ci.Arg<BasicProperties>());
4041

4142
var services = new ServiceCollection();
4243
services.AddTransient<FaultingConsumer>();

test/MyServiceBus.RabbitMq.Tests/RabbitMqTransportFactoryTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace MyServiceBus.RabbitMq.Tests;
1313

1414
public class RabbitMqTransportFactoryTests
1515
{
16-
[Fact(Skip = "Not working")]
16+
[Fact]
1717
[Throws(typeof(Exception))]
1818
public async Task Declares_dead_letter_exchange_and_queue()
1919
{
@@ -164,7 +164,7 @@ await channel.DidNotReceive().QueueDeclareAsync(
164164
}
165165

166166
[Fact]
167-
[Throws(typeof(OverflowException))]
167+
[Throws(typeof(OverflowException), typeof(InvalidOperationException))]
168168
public async Task Supports_exchange_scheme_uri()
169169
{
170170
var channel = Substitute.For<IChannel>();
@@ -205,7 +205,7 @@ await channel.Received(1).ExchangeDeclareAsync(
205205
}
206206

207207
[Fact]
208-
[Throws(typeof(OverflowException))]
208+
[Throws(typeof(OverflowException), typeof(InvalidOperationException))]
209209
public async Task Supports_queue_scheme_uri()
210210
{
211211
var channel = Substitute.For<IChannel>();

test/MyServiceBus.RabbitMq.Tests/TransportHeaderTests.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Threading;
66
using System.Threading.Tasks;
77
using Xunit;
8+
using MyServiceBus;
89
using MyServiceBus.Serialization;
910

1011
namespace MyServiceBus.RabbitMq.Tests;
@@ -14,20 +15,20 @@ public class TransportHeaderTests
1415
class TestMessage { }
1516

1617
[Fact]
18+
[Throws(typeof(NotSupportedException))]
1719
public async Task Underscore_headers_are_applied_to_basic_properties()
1820
{
1921
var channel = Substitute.For<IChannel>();
20-
IBasicProperties? captured = null;
21-
channel.CreateBasicProperties().Returns(new BasicProperties());
22+
BasicProperties? captured = null;
2223
channel.BasicPublishAsync(
2324
Arg.Any<string>(),
2425
Arg.Any<string>(),
2526
Arg.Any<bool>(),
26-
Arg.Any<IBasicProperties>(),
27+
Arg.Any<BasicProperties>(),
2728
Arg.Any<ReadOnlyMemory<byte>>(),
2829
Arg.Any<CancellationToken>())
29-
.Returns(Task.CompletedTask)
30-
.AndDoes(ci => captured = ci.Arg<IBasicProperties>());
30+
.Returns(_ => ValueTask.CompletedTask)
31+
.AndDoes(ci => captured = ci.Arg<BasicProperties>());
3132

3233
var transport = new RabbitMqSendTransport(channel, "test");
3334
var context = new SendContext(MessageTypeCache.GetMessageTypes(typeof(TestMessage)), new EnvelopeMessageSerializer())

0 commit comments

Comments
 (0)