Skip to content

Commit 26bc9b1

Browse files
authored
Fix topic support in DLE+TTL Scheduler (#1124)
* Do not set dead letter routing key to support custom routing keys * Delete test
1 parent e55c5ab commit 26bc9b1

9 files changed

+104
-39
lines changed

Source/EasyNetQ.IntegrationTests/Scheduler/When_future_publish_using_dead_letter_exchange.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public async Task Test()
3838

3939
using (bus.Subscribe<Message>(subscriptionId, messagesSink.Receive))
4040
{
41-
await bus.FuturePublishBatchAsync(messages, TimeSpan.FromSeconds(5), cts.Token)
41+
await bus.FuturePublishBatchAsync(messages, TimeSpan.FromSeconds(5), "#", cts.Token)
4242
.ConfigureAwait(false);
4343

4444
await messagesSink.WaitAllReceivedAsync(cts.Token).ConfigureAwait(false);

Source/EasyNetQ.IntegrationTests/Scheduler/When_future_publish_using_dead_letter_exchange_with_publish_confirms.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public async Task Test()
3939

4040
using (bus.Subscribe<Message>(subscriptionId, messagesSink.Receive))
4141
{
42-
await bus.FuturePublishBatchAsync(messages, TimeSpan.FromSeconds(5), cts.Token)
42+
await bus.FuturePublishBatchAsync(messages, TimeSpan.FromSeconds(5), "#", cts.Token)
4343
.ConfigureAwait(false);
4444

4545
await messagesSink.WaitAllReceivedAsync(cts.Token).ConfigureAwait(false);
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using EasyNetQ.IntegrationTests.Utils;
5+
using EasyNetQ.Scheduling;
6+
using FluentAssertions;
7+
using Xunit;
8+
9+
namespace EasyNetQ.IntegrationTests.Scheduler
10+
{
11+
[Collection("RabbitMQ")]
12+
public class When_publish_and_subscribe_with_delay_using_dead_letter_exchange_with_topic : IDisposable
13+
{
14+
public When_publish_and_subscribe_with_delay_using_dead_letter_exchange_with_topic(RabbitMQFixture fixture)
15+
{
16+
bus = RabbitHutch.CreateBus(
17+
$"host={fixture.Host};prefetchCount=1;timeout=5", c => c.EnableDeadLetterExchangeAndMessageTtlScheduler()
18+
);
19+
}
20+
21+
public void Dispose()
22+
{
23+
bus.Dispose();
24+
}
25+
26+
private const int MessagesCount = 10;
27+
28+
private readonly IBus bus;
29+
30+
[Fact]
31+
public async Task Test()
32+
{
33+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
34+
35+
var subscriptionId = Guid.NewGuid().ToString();
36+
var firstTopicMessagesSink = new MessagesSink(MessagesCount);
37+
var firstTopicMessages = MessagesFactories.Create(MessagesCount);
38+
var secondTopicMessagesSink = new MessagesSink(MessagesCount);
39+
var secondTopicMessages = MessagesFactories.Create(MessagesCount);
40+
using (bus.Subscribe<Message>(subscriptionId, firstTopicMessagesSink.Receive, x => x.WithTopic("first")))
41+
using (bus.Subscribe<Message>(subscriptionId, secondTopicMessagesSink.Receive, x => x.WithTopic("second")))
42+
{
43+
await Task.WhenAll(
44+
bus.FuturePublishBatchAsync(firstTopicMessages, TimeSpan.FromSeconds(5), "first", cts.Token),
45+
bus.FuturePublishBatchAsync(secondTopicMessages, TimeSpan.FromSeconds(5), "second", cts.Token)
46+
).ConfigureAwait(false);
47+
48+
await Task.WhenAll(
49+
firstTopicMessagesSink.WaitAllReceivedAsync(cts.Token),
50+
secondTopicMessagesSink.WaitAllReceivedAsync(cts.Token)
51+
).ConfigureAwait(false);
52+
53+
firstTopicMessagesSink.ReceivedMessages.Should().Equal(firstTopicMessages);
54+
secondTopicMessagesSink.ReceivedMessages.Should().Equal(secondTopicMessages);
55+
}
56+
}
57+
}
58+
}

Source/EasyNetQ.IntegrationTests/Scheduler/When_future_publish_using_delayed_exchange.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public async Task Test()
3838

3939
using (bus.Subscribe<Message>(subscriptionId, messagesSink.Receive))
4040
{
41-
await bus.FuturePublishBatchAsync(messages, TimeSpan.FromSeconds(5), cts.Token)
41+
await bus.FuturePublishBatchAsync(messages, TimeSpan.FromSeconds(5), "#", cts.Token)
4242
.ConfigureAwait(false);
4343

4444
await messagesSink.WaitAllReceivedAsync(cts.Token).ConfigureAwait(false);

Source/EasyNetQ.IntegrationTests/Scheduler/When_future_publish_using_delayed_exchange_with_publish_confirms.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public async Task Test()
3939

4040
using (bus.Subscribe<Message>(subscriptionId, messagesSink.Receive))
4141
{
42-
await bus.FuturePublishBatchAsync(messages, TimeSpan.FromSeconds(5), cts.Token)
42+
await bus.FuturePublishBatchAsync(messages, TimeSpan.FromSeconds(5), "#", cts.Token)
4343
.ConfigureAwait(false);
4444

4545
await messagesSink.WaitAllReceivedAsync(cts.Token).ConfigureAwait(false);

Source/EasyNetQ.IntegrationTests/Utils/BusExtensions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@ public static async Task FuturePublishBatchAsync<T>(
3535
this IBus bus,
3636
IEnumerable<T> messages,
3737
TimeSpan delay,
38+
string topic = "#",
3839
CancellationToken cancellationToken = default
3940
) where T : class
4041
{
4142
foreach (var message in messages)
4243
{
4344
cancellationToken.ThrowIfCancellationRequested();
4445

45-
await bus.FuturePublishAsync(delay, message).ConfigureAwait(false);
46+
await bus.FuturePublishAsync(delay, message, topic).ConfigureAwait(false);
4647
}
4748
}
4849

Source/EasyNetQ.Tests/Scheduling/SchedulingExtensionsTests.cs

Lines changed: 0 additions & 28 deletions
This file was deleted.

Source/EasyNetQ/Scheduling/DeadLetterExchangeAndMessageTtlScheduler.cs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@ public class DeadLetterExchangeAndMessageTtlScheduler : IScheduler
1111
private readonly IAdvancedBus advancedBus;
1212
private readonly IConventions conventions;
1313
private readonly IMessageDeliveryModeStrategy messageDeliveryModeStrategy;
14+
private readonly bool setDeadLetterRoutingKey;
1415

1516
public DeadLetterExchangeAndMessageTtlScheduler(
1617
IAdvancedBus advancedBus,
1718
IConventions conventions,
18-
IMessageDeliveryModeStrategy messageDeliveryModeStrategy)
19+
IMessageDeliveryModeStrategy messageDeliveryModeStrategy,
20+
bool setDeadLetterRoutingKey = false
21+
)
1922
{
2023
Preconditions.CheckNotNull(advancedBus, "advancedBus");
2124
Preconditions.CheckNotNull(conventions, "conventions");
@@ -24,6 +27,7 @@ public DeadLetterExchangeAndMessageTtlScheduler(
2427
this.advancedBus = advancedBus;
2528
this.conventions = conventions;
2629
this.messageDeliveryModeStrategy = messageDeliveryModeStrategy;
30+
this.setDeadLetterRoutingKey = setDeadLetterRoutingKey;
2731
}
2832

2933
public Task FuturePublishAsync<T>(DateTime futurePublishDate, T message, string cancellationKey = null) where T : class
@@ -77,13 +81,19 @@ private async Task FuturePublishInternalAsync<T>(TimeSpan messageDelay, T messag
7781
Preconditions.CheckNotNull(message, "message");
7882
Preconditions.CheckLess(messageDelay, MaxMessageDelay, "messageDelay");
7983
Preconditions.CheckNull(cancellationKey, "cancellationKey");
84+
8085
var delay = Round(messageDelay);
8186
var delayString = delay.ToString(@"dd\_hh\_mm\_ss");
8287
var exchangeName = conventions.ExchangeNamingConvention(typeof (T));
8388
var futureExchangeName = exchangeName + "_" + delayString;
8489
var futureQueueName = conventions.QueueNamingConvention(typeof (T), delayString);
8590
var futureExchange = await advancedBus.ExchangeDeclareAsync(futureExchangeName, ExchangeType.Topic).ConfigureAwait(false);
86-
var futureQueue = await advancedBus.QueueDeclareAsync(futureQueueName, perQueueMessageTtl: (int) delay.TotalMilliseconds, deadLetterExchange: exchangeName, deadLetterRoutingKey: topic).ConfigureAwait(false);
91+
var futureQueue = await advancedBus.QueueDeclareAsync(
92+
futureQueueName,
93+
perQueueMessageTtl: (int) delay.TotalMilliseconds,
94+
deadLetterExchange: exchangeName,
95+
deadLetterRoutingKey: setDeadLetterRoutingKey ? topic : null
96+
).ConfigureAwait(false);
8797
await advancedBus.BindAsync(futureExchange, futureQueue, topic).ConfigureAwait(false);
8898
var easyNetQMessage = new Message<T>(message)
8999
{
@@ -100,13 +110,19 @@ private void FuturePublishInternal<T>(TimeSpan messageDelay, T message, string t
100110
Preconditions.CheckNotNull(message, "message");
101111
Preconditions.CheckLess(messageDelay, MaxMessageDelay, "messageDelay");
102112
Preconditions.CheckNull(cancellationKey, "cancellationKey");
113+
103114
var delay = Round(messageDelay);
104115
var delayString = delay.ToString(@"dd\_hh\_mm\_ss");
105116
var exchangeName = conventions.ExchangeNamingConvention(typeof (T));
106117
var futureExchangeName = exchangeName + "_" + delayString;
107118
var futureQueueName = conventions.QueueNamingConvention(typeof (T), delayString);
108119
var futureExchange = advancedBus.ExchangeDeclare(futureExchangeName, ExchangeType.Topic);
109-
var futureQueue = advancedBus.QueueDeclare(futureQueueName, perQueueMessageTtl: (int) delay.TotalMilliseconds, deadLetterExchange: exchangeName, deadLetterRoutingKey: topic);
120+
var futureQueue = advancedBus.QueueDeclare(
121+
futureQueueName,
122+
perQueueMessageTtl: (int) delay.TotalMilliseconds,
123+
deadLetterExchange: exchangeName,
124+
deadLetterRoutingKey: setDeadLetterRoutingKey ? topic : null
125+
);
110126
advancedBus.Bind(futureExchange, futureQueue, topic);
111127
var easyNetQMessage = new Message<T>(message)
112128
{
@@ -123,4 +139,4 @@ private static TimeSpan Round(TimeSpan timeSpan)
123139
return new TimeSpan(timeSpan.Days, timeSpan.Hours, timeSpan.Minutes, timeSpan.Seconds, 0);
124140
}
125141
}
126-
}
142+
}

Source/EasyNetQ/Scheduling/SchedulingExtensions.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,25 @@ public static IServiceRegister EnableDelayedExchangeScheduler(this IServiceRegis
1111

1212
public static IServiceRegister EnableDeadLetterExchangeAndMessageTtlScheduler(this IServiceRegister serviceRegister)
1313
{
14-
return serviceRegister.Register<IScheduler, DeadLetterExchangeAndMessageTtlScheduler>();
14+
return serviceRegister.Register<IScheduler>(
15+
x => new DeadLetterExchangeAndMessageTtlScheduler(
16+
x.Resolve<IAdvancedBus>(),
17+
x.Resolve<IConventions>(),
18+
x.Resolve<IMessageDeliveryModeStrategy>()
19+
)
20+
);
21+
}
22+
23+
public static IServiceRegister EnableLegacyDeadLetterExchangeAndMessageTtlScheduler(this IServiceRegister serviceRegister)
24+
{
25+
return serviceRegister.Register<IScheduler>(
26+
x => new DeadLetterExchangeAndMessageTtlScheduler(
27+
x.Resolve<IAdvancedBus>(),
28+
x.Resolve<IConventions>(),
29+
x.Resolve<IMessageDeliveryModeStrategy>(),
30+
true
31+
)
32+
);
1533
}
1634
}
17-
}
35+
}

0 commit comments

Comments
 (0)