Skip to content

Commit 7f23b3e

Browse files
authored
Merge pull request #289 from Particular/hotfix-3.5.1
Release Hotfix 3.5.1
2 parents 638bbda + a1a3223 commit 7f23b3e

File tree

4 files changed

+36
-12
lines changed

4 files changed

+36
-12
lines changed

src/NServiceBus.RabbitMQ.Tests/ClusteringTests/ClusteredTestContext.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
public abstract class ClusteredTestContext
2323
{
2424
protected const string QueueName = "testreceiver";
25+
protected const string ErrorQueue = "error";
2526
const string ErlangProcessName = "erl";
2627
protected static Logger Logger = LogManager.GetCurrentClassLogger();
2728

@@ -210,7 +211,7 @@ protected void SetupQueueAndSenderAndListener(string connectionString)
210211
void SetupQueueListener(string queueName)
211212
{
212213
receivedMessages = new BlockingCollection<TransportMessage>();
213-
dequeueStrategy = new RabbitMqDequeueStrategy(connectionManager, null, new ReceiveOptions(s => SecondaryReceiveSettings.Disabled(), new MessageConverter(),1,1000,true,"Cluster test"));
214+
dequeueStrategy = new RabbitMqDequeueStrategy(connectionManager, null, new ReceiveOptions(s => SecondaryReceiveSettings.Disabled(), new MessageConverter(),1,1000,true,"Cluster test"), ErrorQueue);
214215
dequeueStrategy.Init(Address.Parse(queueName), new TransactionSettings(true, TimeSpan.FromSeconds(30), IsolationLevel.ReadCommitted, 5, false, false), m =>
215216
{
216217
receivedMessages.Add(m);

src/NServiceBus.RabbitMQ.Tests/RabbitMqContext.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,11 @@ public void SetUp()
7979
sender = new RabbitMqMessageSender(routingTopology, channelProvider, new IncomingContext(null, null));
8080

8181
dequeueStrategy = new RabbitMqDequeueStrategy(connectionManager, new RepeatedFailuresOverTimeCircuitBreaker("UnitTest",TimeSpan.FromMinutes(2),e=>{}),
82-
new ReceiveOptions(s => SecondaryReceiveSettings.Enabled(CallbackQueue, 1), new MessageConverter(),1,1000,false,"Unit test"));
82+
new ReceiveOptions(s => SecondaryReceiveSettings.Enabled(CallbackQueue, 1), new MessageConverter(),1,1000,false,"Unit test"), ErrorQueue);
8383

8484

8585
MakeSureQueueAndExchangeExists(ReceiverQueue);
86+
MakeSureQueueAndExchangeExists(ErrorQueue);
8687

8788

8889
MessagePublisher = new RabbitMqMessagePublisher
@@ -146,6 +147,7 @@ protected TransportMessage WaitForMessage()
146147
protected string CallbackQueue = "testreceiver." + RuntimeEnvironment.MachineName;
147148

148149
protected const string ReceiverQueue = "testreceiver";
150+
protected const string ErrorQueue = "error";
149151
protected RabbitMqMessagePublisher MessagePublisher;
150152
protected RabbitMqConnectionManager connectionManager;
151153
protected RabbitMqDequeueStrategy dequeueStrategy;

src/NServiceBus.RabbitMQ/Config/RabbitMqTransportFeature.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22
{
33
using System;
44
using System.Configuration;
5-
using NServiceBus.CircuitBreakers;
6-
using NServiceBus.Pipeline;
7-
using NServiceBus.Transports.RabbitMQ.Connection;
5+
using CircuitBreakers;
6+
using Config;
7+
using Pipeline;
88
using RabbitMQ.Client.Events;
99
using Settings;
1010
using Support;
1111
using Transports;
1212
using Transports.RabbitMQ;
1313
using Transports.RabbitMQ.Config;
14+
using Transports.RabbitMQ.Connection;
1415
using Transports.RabbitMQ.Routing;
1516

1617
class RabbitMqTransportFeature : ConfigureTransport
@@ -43,6 +44,7 @@ protected override void Configure(FeatureConfigurationContext context, string co
4344
var queueName = GetLocalAddress(context.Settings);
4445
var callbackQueue = string.Format("{0}.{1}", queueName, RuntimeEnvironment.MachineName);
4546
var connectionConfiguration = new ConnectionStringParser(context.Settings).Parse(connectionString);
47+
var errorQueue = context.Settings.GetConfigSection<MessageForwardingInCaseOfFaultConfig>()?.ErrorQueue;
4648

4749
MessageConverter messageConverter;
4850

@@ -85,7 +87,8 @@ protected override void Configure(FeatureConfigurationContext context, string co
8587
context.Container.ConfigureComponent(builder => new RabbitMqDequeueStrategy(
8688
builder.Build<IManageRabbitMqConnections>(),
8789
SetupCircuitBreaker(builder.Build<CriticalError>()),
88-
receiveOptions), DependencyLifecycle.InstancePerCall);
90+
receiveOptions,
91+
errorQueue), DependencyLifecycle.InstancePerCall);
8992

9093

9194
context.Container.ConfigureComponent<OpenPublishChannelBehavior>(DependencyLifecycle.InstancePerCall);

src/NServiceBus.RabbitMQ/RabbitMqDequeueStrategy.cs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313

1414
class RabbitMqDequeueStrategy : IDequeueMessages, IDisposable
1515
{
16-
public RabbitMqDequeueStrategy(IManageRabbitMqConnections connectionManager, RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, ReceiveOptions receiveOptions)
16+
public RabbitMqDequeueStrategy(IManageRabbitMqConnections connectionManager, RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, ReceiveOptions receiveOptions, string errorQueue)
1717
{
1818
this.connectionManager = connectionManager;
1919
this.circuitBreaker = circuitBreaker;
2020
this.receiveOptions = receiveOptions;
21+
this.errorQueue = errorQueue;
2122
}
2223

2324
public void Init(Address address, TransactionSettings transactionSettings, Func<TransportMessage, bool> tryProcessMessage, Action<TransportMessage, Exception> endProcessMessage)
@@ -189,10 +190,8 @@ void ConsumeMessages(object state)
189190
}
190191
catch (Exception ex)
191192
{
192-
Logger.Error("Poison message detected, deliveryTag: " + message.DeliveryTag, ex);
193-
194-
//just ack the poison message to avoid getting stuck
195-
messageProcessedOk = true;
193+
Logger.Error($"Poison message detected. Moving message to queue '{errorQueue}'...", ex);
194+
messageProcessedOk = MovePoisonMessage(message, errorQueue);
196195
}
197196

198197
if (transportMessage != null)
@@ -280,7 +279,26 @@ void Purge()
280279
}
281280
}
282281

282+
bool MovePoisonMessage(BasicDeliverEventArgs message, string queue)
283+
{
284+
var success = false;
285+
var connection = connectionManager.GetPublishConnection();
286+
287+
using (var channel = connection.CreateModel())
288+
{
289+
try
290+
{
291+
channel.BasicPublish("", queue, false, message.BasicProperties, message.Body);
292+
success = true;
293+
}
294+
catch (Exception ex)
295+
{
296+
Logger.Error($"Failed to move poison message to queue '{queue}'. Returning message to original queue...", ex);
297+
}
298+
}
283299

300+
return success;
301+
}
284302

285303
static ILog Logger = LogManager.GetLogger(typeof(RabbitMqDequeueStrategy));
286304

@@ -297,7 +315,7 @@ void Purge()
297315
readonly ReceiveOptions receiveOptions;
298316
ushort actualPrefetchCount;
299317
bool isStopping;
300-
318+
readonly string errorQueue;
301319

302320
class ConsumeParams
303321
{

0 commit comments

Comments
 (0)