Skip to content

Commit 39c6eb3

Browse files
bordingadamralph
authored andcommitted
Move poison messages to the error queue
# Conflicts: # src/NServiceBus.RabbitMQ.Tests/RabbitMqContext.cs
1 parent 856f675 commit 39c6eb3

File tree

4 files changed

+37
-13
lines changed

4 files changed

+37
-13
lines changed

src/NServiceBus.RabbitMQ.Tests/ClusteringTests/ClusteredTestContext.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
public abstract class ClusteredTestContext
2323
{
2424
protected const string QueueName = "testreceiver";
25+
protected const string ErrorQueue = "error";
2526
const string ErlangProcessName = "erl";
2627
protected static Logger Logger = LogManager.GetCurrentClassLogger();
2728

@@ -210,7 +211,7 @@ protected void SetupQueueAndSenderAndListener(string connectionString)
210211
void SetupQueueListener(string queueName)
211212
{
212213
receivedMessages = new BlockingCollection<TransportMessage>();
213-
dequeueStrategy = new RabbitMqDequeueStrategy(connectionManager, null, new ReceiveOptions(s => SecondaryReceiveSettings.Disabled(), new MessageConverter(),1,1000,true,"Cluster test"));
214+
dequeueStrategy = new RabbitMqDequeueStrategy(connectionManager, null, new ReceiveOptions(s => SecondaryReceiveSettings.Disabled(), new MessageConverter(),1,1000,true,"Cluster test"), ErrorQueue);
214215
dequeueStrategy.Init(Address.Parse(queueName), new TransactionSettings(true, TimeSpan.FromSeconds(30), IsolationLevel.ReadCommitted, 5, false, false), m =>
215216
{
216217
receivedMessages.Add(m);

src/NServiceBus.RabbitMQ.Tests/RabbitMqContext.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,11 @@ public void SetUp()
7979
sender = new RabbitMqMessageSender(routingTopology, channelProvider, new IncomingContext(null, null));
8080

8181
dequeueStrategy = new RabbitMqDequeueStrategy(connectionManager, new RepeatedFailuresOverTimeCircuitBreaker("UnitTest",TimeSpan.FromMinutes(2),e=>{}),
82-
new ReceiveOptions(s => SecondaryReceiveSettings.Enabled(CallbackQueue, 1), new MessageConverter(),1,1000,false,"Unit test"));
82+
new ReceiveOptions(s => SecondaryReceiveSettings.Enabled(CallbackQueue, 1), new MessageConverter(),1,1000,false,"Unit test"), ErrorQueue);
8383

8484

8585
MakeSureQueueAndExchangeExists(ReceiverQueue);
86+
MakeSureQueueAndExchangeExists(ErrorQueue);
8687

8788

8889
MessagePublisher = new RabbitMqMessagePublisher
@@ -146,6 +147,7 @@ protected TransportMessage WaitForMessage()
146147
protected string CallbackQueue = "testreceiver." + RuntimeEnvironment.MachineName;
147148

148149
protected const string ReceiverQueue = "testreceiver";
150+
protected const string ErrorQueue = "error";
149151
protected RabbitMqMessagePublisher MessagePublisher;
150152
protected RabbitMqConnectionManager connectionManager;
151153
protected RabbitMqDequeueStrategy dequeueStrategy;
@@ -229,4 +231,4 @@ public ConfirmsAwareChannel GetNewPublishChannel()
229231
throw new NotImplementedException();
230232
}
231233
}
232-
}
234+
}

src/NServiceBus.RabbitMQ/Config/RabbitMqTransportFeature.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22
{
33
using System;
44
using System.Configuration;
5-
using NServiceBus.CircuitBreakers;
6-
using NServiceBus.Pipeline;
7-
using NServiceBus.Transports.RabbitMQ.Connection;
5+
using CircuitBreakers;
6+
using Config;
7+
using Pipeline;
88
using RabbitMQ.Client.Events;
99
using Settings;
1010
using Support;
1111
using Transports;
1212
using Transports.RabbitMQ;
1313
using Transports.RabbitMQ.Config;
14+
using Transports.RabbitMQ.Connection;
1415
using Transports.RabbitMQ.Routing;
1516

1617
class RabbitMqTransportFeature : ConfigureTransport
@@ -41,6 +42,7 @@ protected override void Configure(FeatureConfigurationContext context, string co
4142
var queueName = GetLocalAddress(context.Settings);
4243
var callbackQueue = string.Format("{0}.{1}", queueName, RuntimeEnvironment.MachineName);
4344
var connectionConfiguration = new ConnectionStringParser(context.Settings).Parse(connectionString);
45+
var errorQueue = context.Settings.GetConfigSection<MessageForwardingInCaseOfFaultConfig>()?.ErrorQueue;
4446

4547
MessageConverter messageConverter;
4648

@@ -83,7 +85,8 @@ protected override void Configure(FeatureConfigurationContext context, string co
8385
context.Container.ConfigureComponent(builder => new RabbitMqDequeueStrategy(
8486
builder.Build<IManageRabbitMqConnections>(),
8587
SetupCircuitBreaker(builder.Build<CriticalError>()),
86-
receiveOptions), DependencyLifecycle.InstancePerCall);
88+
receiveOptions,
89+
errorQueue), DependencyLifecycle.InstancePerCall);
8790

8891

8992
context.Container.ConfigureComponent<OpenPublishChannelBehavior>(DependencyLifecycle.InstancePerCall);

src/NServiceBus.RabbitMQ/RabbitMqDequeueStrategy.cs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313

1414
class RabbitMqDequeueStrategy : IDequeueMessages, IDisposable
1515
{
16-
public RabbitMqDequeueStrategy(IManageRabbitMqConnections connectionManager, RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, ReceiveOptions receiveOptions)
16+
public RabbitMqDequeueStrategy(IManageRabbitMqConnections connectionManager, RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, ReceiveOptions receiveOptions, string errorQueue)
1717
{
1818
this.connectionManager = connectionManager;
1919
this.circuitBreaker = circuitBreaker;
2020
this.receiveOptions = receiveOptions;
21+
this.errorQueue = errorQueue;
2122
}
2223

2324
public void Init(Address address, TransactionSettings transactionSettings, Func<TransportMessage, bool> tryProcessMessage, Action<TransportMessage, Exception> endProcessMessage)
@@ -187,10 +188,8 @@ void ConsumeMessages(object state)
187188
}
188189
catch (Exception ex)
189190
{
190-
Logger.Error("Poison message detected, deliveryTag: " + message.DeliveryTag, ex);
191-
192-
//just ack the poison message to avoid getting stuck
193-
messageProcessedOk = true;
191+
Logger.Error($"Poison message detected. Moving message to queue '{errorQueue}'...", ex);
192+
messageProcessedOk = MovePoisonMessage(message, errorQueue);
194193
}
195194

196195
if (transportMessage != null)
@@ -276,7 +275,26 @@ void Purge()
276275
}
277276
}
278277

278+
bool MovePoisonMessage(BasicDeliverEventArgs message, string queue)
279+
{
280+
var success = false;
281+
var connection = connectionManager.GetPublishConnection();
282+
283+
using (var channel = connection.CreateModel())
284+
{
285+
try
286+
{
287+
channel.BasicPublish("", queue, false, message.BasicProperties, message.Body);
288+
success = true;
289+
}
290+
catch (Exception ex)
291+
{
292+
Logger.Error($"Failed to move poison message to queue '{queue}'. Returning message to original queue...", ex);
293+
}
294+
}
279295

296+
return success;
297+
}
280298

281299
static ILog Logger = LogManager.GetLogger(typeof(RabbitMqDequeueStrategy));
282300

@@ -293,7 +311,7 @@ void Purge()
293311
readonly ReceiveOptions receiveOptions;
294312
ushort actualPrefetchCount;
295313
bool isStopping;
296-
314+
readonly string errorQueue;
297315

298316
class ConsumeParams
299317
{

0 commit comments

Comments
 (0)