Skip to content

Commit 7a67b6d

Browse files
author
John Simons
committed
Merge branch 'hotfix-3.0.1'
2 parents 223a940 + 934bfcc commit 7a67b6d

14 files changed

+274
-171
lines changed

src/NServiceBus.RabbitMQ.AcceptanceTests/NServiceBus.RabbitMQ.AcceptanceTests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@
222222
<Compile Include="App_Packages\NSB.AcceptanceTests.5.2.0\Tx\When_sending_within_an_ambient_transaction.cs" />
223223
<Compile Include="App_Packages\NSB.AcceptanceTests.5.2.0\Versioning\When_multiple_versions_of_a_message_is_published.cs" />
224224
<Compile Include="App_Packages\NSB.AcceptanceTests.5.2.0\Volatile\When_sending_to_non_durable_endpoint.cs" />
225+
<Compile Include="When_a_message_is_retried_and_succeeds_with_a_reply.cs" />
225226
<Compile Include="When_using_direct_routing.cs" />
226227
<Compile Include="When_the_broker_connection_is_lost.cs" />
227228
<Compile Include="When_using_a_custom_message_id_strategy.cs" />
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
namespace NServiceBus.RabbitMQ.AcceptanceTests
2+
{
3+
using System;
4+
using NServiceBus.AcceptanceTesting;
5+
using NServiceBus.AcceptanceTests.EndpointTemplates;
6+
using NServiceBus.Config;
7+
using NServiceBus.Features;
8+
using NUnit.Framework;
9+
10+
[TestFixture]
11+
public class When_a_message_is_retried_and_succeeds_with_a_reply
12+
{
13+
[Test]
14+
public void The_message_send_to_error_queue_should_have_its_callback_receiver_header_intact()
15+
{
16+
var context = new Context();
17+
18+
Scenario.Define(context)
19+
.WithEndpoint<OriginatingEndpoint>(c => c.Given(bus => bus.Send(new Request())))
20+
.WithEndpoint<ReceivingEndpoint>()
21+
.WithEndpoint<ErrorSpyEndpoint>()
22+
.Done(c => c.Done)
23+
.AllowExceptions()
24+
.Run(TimeSpan.FromMinutes(1));
25+
26+
Assert.IsTrue(context.Done);
27+
Assert.IsTrue(context.CallbackReceiverHeader.Contains("OriginatingEndpoint"));
28+
}
29+
30+
class Request : IMessage { }
31+
32+
class Context : ScenarioContext
33+
{
34+
public bool Done { get; set; }
35+
public string CallbackReceiverHeader { get; set; }
36+
}
37+
38+
class OriginatingEndpoint : EndpointConfigurationBuilder
39+
{
40+
public OriginatingEndpoint()
41+
{
42+
EndpointSetup<DefaultServer>()
43+
.AddMapping<Request>(typeof(ReceivingEndpoint));
44+
}
45+
}
46+
47+
class ErrorSpyEndpoint : EndpointConfigurationBuilder
48+
{
49+
public ErrorSpyEndpoint()
50+
{
51+
EndpointSetup<DefaultServer>();
52+
}
53+
54+
public class SpyHandler : IHandleMessages<IMessage>
55+
{
56+
public Context Context { get; set; }
57+
58+
public IBus Bus { get; set; }
59+
60+
public void Handle(IMessage message)
61+
{
62+
Context.Done = true;
63+
Context.CallbackReceiverHeader = Bus.CurrentMessageContext.Headers["NServiceBus.RabbitMQ.CallbackQueue"];
64+
}
65+
}
66+
}
67+
68+
class ReceivingEndpoint : EndpointConfigurationBuilder
69+
{
70+
public ReceivingEndpoint()
71+
{
72+
EndpointSetup<DefaultServer>(c => c.DisableFeature<SecondLevelRetries>())
73+
.WithConfig<TransportConfig>(c =>
74+
{
75+
c.MaxRetries = 1;
76+
})
77+
.WithConfig<MessageForwardingInCaseOfFaultConfig>(c =>
78+
{
79+
c.ErrorQueue = "AMessageIsRetriedAndSucceedsWithAReply.ErrorSpyEndpoint";
80+
});
81+
}
82+
83+
public class RequestHandler : IHandleMessages<Request>
84+
{
85+
public void Handle(Request message)
86+
{
87+
throw new Exception("Simulated");
88+
}
89+
}
90+
}
91+
}
92+
}

src/NServiceBus.RabbitMQ.AcceptanceTests/When_scaling_out_senders_that_uses_callbacks.cs

Lines changed: 51 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,64 +9,64 @@
99

1010
public class When_scaling_out_senders_that_uses_callbacks
1111
{
12-
const int numMessagesToSend = 1;
12+
const int numMessagesToSend = 5;
1313

1414
[Test]
1515
public void Should_only_deliver_response_to_one_of_the_instances()
1616
{
1717
var context = new Context();
1818

1919
Scenario.Define(context)
20-
.WithEndpoint<ServerThatRespondsToCallbacks>()
21-
.WithEndpoint<ScaledOutClient>(b =>
22-
{
23-
b.CustomConfig(c => RuntimeEnvironment.MachineNameAction = () => "ScaledOutClientA");
24-
b.Given((bus, c) =>
25-
{
26-
for (var i = 0; i < numMessagesToSend; i++)
27-
{
28-
bus.Send(new MyRequest
29-
{
30-
ReturnCode = 1
31-
})
32-
.Register(m =>
33-
{
34-
if (m.ErrorCode != 1)
35-
{
36-
throw new Exception("Wrong server got the response");
37-
}
38-
c.ServerAGotTheCallback++;
39-
});
40-
}
41-
});
42-
})
43-
.WithEndpoint<ScaledOutClient>(b =>
44-
{
45-
b.CustomConfig(c => RuntimeEnvironment.MachineNameAction = () => "ScaledOutClientB");
46-
b.Given((bus, c) =>
47-
{
48-
for (var i = 0; i < numMessagesToSend; i++)
49-
{
50-
bus.Send(new MyRequest
51-
{
52-
ReturnCode = 2
53-
})
54-
.Register(m =>
55-
{
56-
if (m.ErrorCode != 2)
57-
{
58-
throw new Exception("Wrong server got the response");
59-
}
60-
c.ServerBGotTheCallback++;
61-
});
62-
}
63-
});
64-
})
65-
.Done(c => (context.ServerAGotTheCallback + context.ServerBGotTheCallback) >= numMessagesToSend*2)
66-
.Run(new RunSettings
67-
{
68-
UseSeparateAppDomains = true
69-
});
20+
.WithEndpoint<ServerThatRespondsToCallbacks>()
21+
.WithEndpoint<ScaledOutClient>(b =>
22+
{
23+
b.CustomConfig(c => RuntimeEnvironment.MachineNameAction = () => "ScaledOutClientA");
24+
b.Given((bus, c) =>
25+
{
26+
for (var i = 0; i < numMessagesToSend; i++)
27+
{
28+
bus.Send(new MyRequest
29+
{
30+
ReturnCode = 1
31+
})
32+
.Register<int>(r =>
33+
{
34+
if (r != 1)
35+
{
36+
throw new Exception("Wrong server got the response");
37+
}
38+
c.ServerAGotTheCallback++;
39+
});
40+
}
41+
});
42+
})
43+
.WithEndpoint<ScaledOutClient>(b =>
44+
{
45+
b.CustomConfig(c => RuntimeEnvironment.MachineNameAction = () => "ScaledOutClientB");
46+
b.Given((bus, c) =>
47+
{
48+
for (var i = 0; i < numMessagesToSend; i++)
49+
{
50+
bus.Send(new MyRequest
51+
{
52+
ReturnCode = 2
53+
})
54+
.Register<int>(r =>
55+
{
56+
if (r != 2)
57+
{
58+
throw new Exception("Wrong server got the response");
59+
}
60+
c.ServerBGotTheCallback++;
61+
});
62+
}
63+
});
64+
})
65+
.Done(c => (context.ServerAGotTheCallback + context.ServerBGotTheCallback) >= numMessagesToSend*2)
66+
.Run(new RunSettings
67+
{
68+
UseSeparateAppDomains = true,
69+
});
7070

7171
Assert.AreEqual(numMessagesToSend, context.ServerAGotTheCallback, "Both scaled out instances should get the callback");
7272

src/NServiceBus.RabbitMQ.Tests/ClusteringTests/ClusteredTestContext.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using Support;
1717
using Unicast;
1818
using Config;
19+
using NServiceBus.Pipeline.Contexts;
1920
using TransactionSettings = Unicast.Transport.TransactionSettings;
2021

2122
public abstract class ClusteredTestContext
@@ -232,10 +233,7 @@ void EnsureRabbitQueueExists(string queueName)
232233

233234
void SetupMessageSender()
234235
{
235-
sender = new RabbitMqMessageSender
236-
{
237-
ChannelProvider = new FakeChannelProvider(publishChannel)
238-
};
236+
sender = new RabbitMqMessageSender(null, new FakeChannelProvider(publishChannel), new IncomingContext(null, null));
239237
}
240238

241239
static RabbitMqConnectionManager SetupRabbitMqConnectionManager(string connectionString)

src/NServiceBus.RabbitMQ.Tests/RabbitMqContext.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using Config;
99
using global::RabbitMQ.Client;
1010
using NServiceBus.CircuitBreakers;
11+
using NServiceBus.Pipeline.Contexts;
1112
using NServiceBus.Support;
1213
using NServiceBus.Transports.RabbitMQ.Connection;
1314
using NUnit.Framework;
@@ -74,12 +75,7 @@ public void SetUp()
7475

7576
var channelProvider = new FakeChannelProvider(publishChannel);
7677

77-
sender = new RabbitMqMessageSender
78-
{
79-
ChannelProvider = channelProvider,
80-
RoutingTopology = routingTopology,
81-
CallbackQueue = CallbackQueue
82-
};
78+
sender = new RabbitMqMessageSender(routingTopology, channelProvider, new IncomingContext(null, null));
8379

8480
dequeueStrategy = new RabbitMqDequeueStrategy(connectionManager, new RepeatedFailuresOverTimeCircuitBreaker("UnitTest",TimeSpan.FromMinutes(2),e=>{}),
8581
new ReceiveOptions(s => SecondaryReceiveSettings.Enabled(CallbackQueue, 1), new MessageConverter(),1,1000,false,"Unit test"));

src/NServiceBus.RabbitMQ.Tests/When_sending_a_message_over_rabbitmq.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System.Text;
55
using global::RabbitMQ.Client;
66
using global::RabbitMQ.Client.Events;
7-
using NServiceBus.Support;
87
using NUnit.Framework;
98
using Unicast;
109
using Unicast.Queuing;
@@ -75,16 +74,14 @@ public void Should_set_the_reply_to_address()
7574
});
7675

7776
}
77+
7878
[Test]
79-
public void Should_populate_the_callback_header()
79+
public void Should_not_populate_the_callback_header()
8080
{
8181
Verify(new TransportMessageBuilder(),
82-
(t, r) => Assert.AreEqual("testreceiver."+RuntimeEnvironment.MachineName, t.Headers[RabbitMqMessageSender.CallbackHeaderKey]));
82+
(t, r) => Assert.IsFalse(t.Headers.ContainsKey(RabbitMqMessageSender.CallbackHeaderKey)));
8383

8484
}
85-
86-
87-
8885

8986
[Test]
9087
public void Should_set_correlation_id_if_present()

0 commit comments

Comments
 (0)