Skip to content

Commit dc06772

Browse files
authored
Merge pull request #301 from Particular/hotfix-3.2.1
Release Hotfix 3.2.1
2 parents dc70a49 + 39c6eb3 commit dc06772

File tree

8 files changed

+61
-31
lines changed

8 files changed

+61
-31
lines changed

GitVersion.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
assembly-versioning-scheme: Major
2+
3+
branches:
4+
develop:
5+
tag: alpha

GitVersionConfig.yaml

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/NServiceBus.RabbitMQ.Tests/ClusteringTests/ClusteredTestContext.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
public abstract class ClusteredTestContext
2323
{
2424
protected const string QueueName = "testreceiver";
25+
protected const string ErrorQueue = "error";
2526
const string ErlangProcessName = "erl";
2627
protected static Logger Logger = LogManager.GetCurrentClassLogger();
2728

@@ -138,7 +139,7 @@ public void TestContextFixtureTearDown()
138139

139140
publishChannel.Close();
140141
publishChannel.Dispose();
141-
142+
142143
connectionManager.Dispose();
143144

144145
var erlangProcessesToKill = GetExistingErlangProcesses().Select(p => p.Id).Except(erlangProcessesRunningBeforeTheTest).ToList();
@@ -210,7 +211,7 @@ protected void SetupQueueAndSenderAndListener(string connectionString)
210211
void SetupQueueListener(string queueName)
211212
{
212213
receivedMessages = new BlockingCollection<TransportMessage>();
213-
dequeueStrategy = new RabbitMqDequeueStrategy(connectionManager, null, new ReceiveOptions(s => SecondaryReceiveSettings.Disabled(), new MessageConverter(),1,1000,true,"Cluster test"));
214+
dequeueStrategy = new RabbitMqDequeueStrategy(connectionManager, null, new ReceiveOptions(s => SecondaryReceiveSettings.Disabled(), new MessageConverter(),1,1000,true,"Cluster test"), ErrorQueue);
214215
dequeueStrategy.Init(Address.Parse(queueName), new TransactionSettings(true, TimeSpan.FromSeconds(30), IsolationLevel.ReadCommitted, 5, false, false), m =>
215216
{
216217
receivedMessages.Add(m);
@@ -224,7 +225,8 @@ void SetupQueueListener(string queueName)
224225

225226
void EnsureRabbitQueueExists(string queueName)
226227
{
227-
using (var channel = connectionManager.GetAdministrationConnection().CreateModel())
228+
using (var connection = connectionManager.GetAdministrationConnection())
229+
using (var channel = connection.CreateModel())
228230
{
229231
channel.QueueDeclare(queueName, true, false, false, null);
230232
channel.QueuePurge(queueName);

src/NServiceBus.RabbitMQ.Tests/RabbitMqContext.cs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ class RabbitMqContext
2020
{
2121
protected void MakeSureQueueAndExchangeExists(string queueName)
2222
{
23-
using (var channel = connectionManager.GetAdministrationConnection().CreateModel())
23+
using (var connection = connectionManager.GetAdministrationConnection())
24+
using (var channel = connection.CreateModel())
2425
{
2526
//create main q
2627
channel.QueueDeclare(queueName, true, false, false, null);
@@ -39,7 +40,7 @@ protected void MakeSureQueueAndExchangeExists(string queueName)
3940

4041
void DeleteExchange(string exchangeName)
4142
{
42-
var connection = connectionManager.GetAdministrationConnection();
43+
using (var connection = connectionManager.GetAdministrationConnection())
4344
using (var channel = connection.CreateModel())
4445
{
4546
try
@@ -58,7 +59,7 @@ public virtual int MaximumConcurrency
5859
{
5960
get { return 1; }
6061
}
61-
62+
6263
[SetUp]
6364
public void SetUp()
6465
{
@@ -78,10 +79,11 @@ public void SetUp()
7879
sender = new RabbitMqMessageSender(routingTopology, channelProvider, new IncomingContext(null, null));
7980

8081
dequeueStrategy = new RabbitMqDequeueStrategy(connectionManager, new RepeatedFailuresOverTimeCircuitBreaker("UnitTest",TimeSpan.FromMinutes(2),e=>{}),
81-
new ReceiveOptions(s => SecondaryReceiveSettings.Enabled(CallbackQueue, 1), new MessageConverter(),1,1000,false,"Unit test"));
82-
82+
new ReceiveOptions(s => SecondaryReceiveSettings.Enabled(CallbackQueue, 1), new MessageConverter(),1,1000,false,"Unit test"), ErrorQueue);
83+
8384

8485
MakeSureQueueAndExchangeExists(ReceiverQueue);
86+
MakeSureQueueAndExchangeExists(ErrorQueue);
8587

8688

8789
MessagePublisher = new RabbitMqMessagePublisher
@@ -145,6 +147,7 @@ protected TransportMessage WaitForMessage()
145147
protected string CallbackQueue = "testreceiver." + RuntimeEnvironment.MachineName;
146148

147149
protected const string ReceiverQueue = "testreceiver";
150+
protected const string ErrorQueue = "error";
148151
protected RabbitMqMessagePublisher MessagePublisher;
149152
protected RabbitMqConnectionManager connectionManager;
150153
protected RabbitMqDequeueStrategy dequeueStrategy;
@@ -178,22 +181,22 @@ public IEnumerable<object> BuildAll(Type typeToBuild)
178181

179182
public void Configure(Type component, DependencyLifecycle dependencyLifecycle)
180183
{
181-
184+
182185
}
183186

184187
public void Configure<T>(Func<T> component, DependencyLifecycle dependencyLifecycle)
185188
{
186-
189+
187190
}
188191

189192
public void ConfigureProperty(Type component, string property, object value)
190193
{
191-
194+
192195
}
193196

194197
public void RegisterSingleton(Type lookupType, object instance)
195198
{
196-
199+
197200
}
198201

199202
public bool HasComponent(Type componentType)
@@ -203,7 +206,7 @@ public bool HasComponent(Type componentType)
203206

204207
public void Release(object instance)
205208
{
206-
209+
207210
}
208211
}
209212

@@ -228,4 +231,4 @@ public ConfirmsAwareChannel GetNewPublishChannel()
228231
throw new NotImplementedException();
229232
}
230233
}
231-
}
234+
}

src/NServiceBus.RabbitMQ/Config/RabbitMqTransportFeature.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22
{
33
using System;
44
using System.Configuration;
5-
using NServiceBus.CircuitBreakers;
6-
using NServiceBus.Pipeline;
7-
using NServiceBus.Transports.RabbitMQ.Connection;
5+
using CircuitBreakers;
6+
using Config;
7+
using Pipeline;
88
using RabbitMQ.Client.Events;
99
using Settings;
1010
using Support;
1111
using Transports;
1212
using Transports.RabbitMQ;
1313
using Transports.RabbitMQ.Config;
14+
using Transports.RabbitMQ.Connection;
1415
using Transports.RabbitMQ.Routing;
1516

1617
class RabbitMqTransportFeature : ConfigureTransport
@@ -41,6 +42,7 @@ protected override void Configure(FeatureConfigurationContext context, string co
4142
var queueName = GetLocalAddress(context.Settings);
4243
var callbackQueue = string.Format("{0}.{1}", queueName, RuntimeEnvironment.MachineName);
4344
var connectionConfiguration = new ConnectionStringParser(context.Settings).Parse(connectionString);
45+
var errorQueue = context.Settings.GetConfigSection<MessageForwardingInCaseOfFaultConfig>()?.ErrorQueue;
4446

4547
MessageConverter messageConverter;
4648

@@ -83,7 +85,8 @@ protected override void Configure(FeatureConfigurationContext context, string co
8385
context.Container.ConfigureComponent(builder => new RabbitMqDequeueStrategy(
8486
builder.Build<IManageRabbitMqConnections>(),
8587
SetupCircuitBreaker(builder.Build<CriticalError>()),
86-
receiveOptions), DependencyLifecycle.InstancePerCall);
88+
receiveOptions,
89+
errorQueue), DependencyLifecycle.InstancePerCall);
8790

8891

8992
context.Container.ConfigureComponent<OpenPublishChannelBehavior>(DependencyLifecycle.InstancePerCall);

src/NServiceBus.RabbitMQ/NServiceBus.RabbitMQ.csproj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,11 @@
120120
<PropertyGroup>
121121
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
122122
</PropertyGroup>
123-
<Error Condition="!Exists('..\packages\GitVersionTask.2.0.1\Build\GitVersionTask.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\GitVersionTask.2.0.1\Build\GitVersionTask.targets'))" />
124-
<Error Condition="!Exists('..\packages\NuGetPackager.0.5.5\build\NuGetPackager.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\NuGetPackager.0.5.5\build\NuGetPackager.targets'))" />
125123
<Error Condition="!Exists('..\packages\Fody.1.29.4\build\dotnet\Fody.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\Fody.1.29.4\build\dotnet\Fody.targets'))" />
124+
<Error Condition="!Exists('..\packages\GitVersionTask.3.5.4\build\dotnet\GitVersionTask.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\GitVersionTask.3.5.4\build\dotnet\GitVersionTask.targets'))" />
125+
<Error Condition="!Exists('..\packages\NuGetPackager.0.6.0\build\NuGetPackager.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\packages\NuGetPackager.0.6.0\build\NuGetPackager.targets'))" />
126126
</Target>
127-
<Import Project="..\packages\GitVersionTask.2.0.1\Build\GitVersionTask.targets" Condition="Exists('..\packages\GitVersionTask.2.0.1\Build\GitVersionTask.targets')" />
128-
<Import Project="..\packages\NuGetPackager.0.5.5\build\NuGetPackager.targets" Condition="Exists('..\packages\NuGetPackager.0.5.5\build\NuGetPackager.targets')" />
129127
<Import Project="..\packages\Fody.1.29.4\build\dotnet\Fody.targets" Condition="Exists('..\packages\Fody.1.29.4\build\dotnet\Fody.targets')" />
128+
<Import Project="..\packages\GitVersionTask.3.5.4\build\dotnet\GitVersionTask.targets" Condition="Exists('..\packages\GitVersionTask.3.5.4\build\dotnet\GitVersionTask.targets')" />
129+
<Import Project="..\packages\NuGetPackager.0.6.0\build\NuGetPackager.targets" Condition="Exists('..\packages\NuGetPackager.0.6.0\build\NuGetPackager.targets')" />
130130
</Project>

src/NServiceBus.RabbitMQ/RabbitMqDequeueStrategy.cs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313

1414
class RabbitMqDequeueStrategy : IDequeueMessages, IDisposable
1515
{
16-
public RabbitMqDequeueStrategy(IManageRabbitMqConnections connectionManager, RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, ReceiveOptions receiveOptions)
16+
public RabbitMqDequeueStrategy(IManageRabbitMqConnections connectionManager, RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, ReceiveOptions receiveOptions, string errorQueue)
1717
{
1818
this.connectionManager = connectionManager;
1919
this.circuitBreaker = circuitBreaker;
2020
this.receiveOptions = receiveOptions;
21+
this.errorQueue = errorQueue;
2122
}
2223

2324
public void Init(Address address, TransactionSettings transactionSettings, Func<TransportMessage, bool> tryProcessMessage, Action<TransportMessage, Exception> endProcessMessage)
@@ -187,10 +188,8 @@ void ConsumeMessages(object state)
187188
}
188189
catch (Exception ex)
189190
{
190-
Logger.Error("Poison message detected, deliveryTag: " + message.DeliveryTag, ex);
191-
192-
//just ack the poison message to avoid getting stuck
193-
messageProcessedOk = true;
191+
Logger.Error($"Poison message detected. Moving message to queue '{errorQueue}'...", ex);
192+
messageProcessedOk = MovePoisonMessage(message, errorQueue);
194193
}
195194

196195
if (transportMessage != null)
@@ -276,7 +275,26 @@ void Purge()
276275
}
277276
}
278277

278+
bool MovePoisonMessage(BasicDeliverEventArgs message, string queue)
279+
{
280+
var success = false;
281+
var connection = connectionManager.GetPublishConnection();
282+
283+
using (var channel = connection.CreateModel())
284+
{
285+
try
286+
{
287+
channel.BasicPublish("", queue, false, message.BasicProperties, message.Body);
288+
success = true;
289+
}
290+
catch (Exception ex)
291+
{
292+
Logger.Error($"Failed to move poison message to queue '{queue}'. Returning message to original queue...", ex);
293+
}
294+
}
279295

296+
return success;
297+
}
280298

281299
static ILog Logger = LogManager.GetLogger(typeof(RabbitMqDequeueStrategy));
282300

@@ -293,7 +311,7 @@ void Purge()
293311
readonly ReceiveOptions receiveOptions;
294312
ushort actualPrefetchCount;
295313
bool isStopping;
296-
314+
readonly string errorQueue;
297315

298316
class ConsumeParams
299317
{
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
<?xml version="1.0" encoding="utf-8"?>
22
<packages>
33
<package id="Fody" version="1.29.4" targetFramework="net45" developmentDependency="true" />
4-
<package id="GitVersionTask" version="2.0.1" targetFramework="net45" developmentDependency="true" />
4+
<package id="GitVersionTask" version="3.5.4" targetFramework="net45" developmentDependency="true" />
55
<package id="Janitor.Fody" version="1.1.6.0" targetFramework="net45" developmentDependency="true" />
66
<package id="NServiceBus" version="5.2.0" targetFramework="net45" />
7-
<package id="NuGetPackager" version="0.5.5" targetFramework="net45" developmentDependency="true" />
7+
<package id="NuGetPackager" version="0.6.0" targetFramework="net45" developmentDependency="true" />
88
<package id="Obsolete.Fody" version="4.0.4" targetFramework="net45" developmentDependency="true" />
99
<package id="RabbitMQ.Client" version="3.6.1" targetFramework="net45" />
1010
</packages>

0 commit comments

Comments
 (0)