Skip to content

Commit 65cb9a4

Browse files
micdennyPliner
authored andcommitted
Legacy RPC conventions (#896)
1 parent f167945 commit 65cb9a4

File tree

7 files changed

+160
-37
lines changed

7 files changed

+160
-37
lines changed

Source/EasyNetQ.Tests/Integration/RequestResponseTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,16 @@ public void Should_be_able_to_make_a_request_and_receive_response_to_customly_de
151151
}
152152

153153
// First start the EasyNetQ.Tests.SimpleService console app.
154-
// Run this test. You should see 1000 response messages on the SimpleService
155-
// and then 1000 messages appear back here.
154+
// Run this test. You should see 200 response messages on the SimpleService
155+
// and then 200 messages appear back here.
156156
[Fact][Explicit("Needs a Rabbit instance on localhost to work")]
157157
public void Should_be_able_to_make_many_async_requests()
158158
{
159-
const int numberOfCalls = 500;
159+
const int numberOfCalls = 200;
160160
var countdownEvent = new CountdownEvent(numberOfCalls);
161161
var count = 0;
162162

163-
for (int i = 0; i < 1000; i++)
163+
for (int i = 0; i < numberOfCalls; i++)
164164
{
165165
var request = new TestAsyncRequestMessage { Text = "Hello async from the client! " + i };
166166

Source/EasyNetQ.Tests/Integration/RpcTests.cs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ public void Should_throw_when_requesting_over_long_message()
6060

6161
bus.Request<MessageWithVeryVEryVEryLongNameThatWillMostCertainlyBreakAmqpsSilly255CharacterNameLimitThatIsAlmostCertainToBeReachedWithGenericTypes, RpcRequest>(
6262
new MessageWithVeryVEryVEryLongNameThatWillMostCertainlyBreakAmqpsSilly255CharacterNameLimitThatIsAlmostCertainToBeReachedWithGenericTypes());
63-
64-
Thread.Sleep(2000);
6563
});
6664
}
6765

@@ -75,8 +73,6 @@ public void Should_throw_when_responding_to_over_long_message()
7573

7674
bus.Request<RpcRequest, MessageWithVeryVEryVEryLongNameThatWillMostCertainlyBreakAmqpsSilly255CharacterNameLimitThatIsAlmostCertainToBeReachedWithGenericTypes>(
7775
new RpcRequest());
78-
79-
Thread.Sleep(2000);
8076
});
8177
}
8278

@@ -92,12 +88,9 @@ public void Should_reply_with_the_exception_using_classes_with_parameterless_con
9288
var request = new RpcRequest { Value = 5 };
9389

9490
var response = bus.Request<RpcRequest, RpcResponse>(request);
95-
96-
Thread.Sleep(2000);
9791
});
98-
Assert.IsType<AggregateException>(ex);
99-
Assert.NotNull(ex.InnerException);
100-
Assert.Equal("Simulated Exception!", ex.InnerException.Message);
92+
Assert.IsType<EasyNetQResponderException>(ex);
93+
Assert.Equal("Simulated Exception!", ex.Message);
10194
}
10295

10396
[Fact, Explicit("Requires a RabbitMQ instance on localhost")]
@@ -112,12 +105,9 @@ public void Should_reply_with_the_exception_using_string_as_message()
112105
var request = "Hello";
113106

114107
var response = bus.Request<string, string>(request);
115-
116-
Thread.Sleep(2000);
117108
});
118-
Assert.IsType<AggregateException>(ex);
119-
Assert.NotNull(ex.InnerException);
120-
Assert.Equal("Simulated Exception!", ex.InnerException.Message);
109+
Assert.IsType<EasyNetQResponderException>(ex);
110+
Assert.Equal("Simulated Exception!", ex.Message);
121111
}
122112

123113
[Fact, Explicit("Requires a RabbitMQ instance on localhost")]
@@ -132,12 +122,9 @@ public void Should_reply_with_the_exception_using_classes_without_parameterless_
132122
var request = new RpcRequest { Value = 5 };
133123

134124
var response = bus.Request<RpcRequest, RpcResponseWithoutParameterlessConstructor>(request);
135-
136-
Thread.Sleep(2000);
137125
});
138-
Assert.IsType<AggregateException>(ex);
139-
Assert.NotNull(ex.InnerException);
140-
Assert.Equal("Simulated Exception!", ex.InnerException.Message);
126+
Assert.IsType<EasyNetQResponderException>(ex);
127+
Assert.Equal("Simulated Exception!", ex.Message);
141128
}
142129
}
143130
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
using EasyNetQ.Logging;
2+
using EasyNetQ.Tests.ProducerTests.Very.Long.Namespace.Certainly.Longer.Than.The255.Char.Length.That.RabbitMQ.Likes.That.Will.Certainly.Cause.An.AMQP.Exception.If.We.Dont.Do.Something.About.It.And.Stop.It.From.Happening;
3+
using System;
4+
using System.Threading;
5+
using Xunit;
6+
7+
namespace EasyNetQ.Tests.Integration.LegacyConventionsTests
8+
{
9+
public class RpcWithLegacyConventionsTests : IDisposable
10+
{
11+
private class RpcRequest
12+
{
13+
public int Value { get; set; }
14+
}
15+
16+
private class RpcResponse
17+
{
18+
public int Value { get; set; }
19+
}
20+
21+
private IBus bus;
22+
23+
public RpcWithLegacyConventionsTests()
24+
{
25+
LogProvider.SetCurrentLogProvider(ConsoleLogProvider.Instance);
26+
27+
bus = RabbitHutch.CreateBus("host=localhost", x => x.EnableLegacyConventions());
28+
}
29+
30+
public void Dispose()
31+
{
32+
bus.Dispose();
33+
}
34+
35+
[Fact, Explicit("Requires a RabbitMQ instance on localhost")]
36+
public void Should_be_able_to_publish_and_receive_response()
37+
{
38+
bus.Respond<RpcRequest, RpcResponse>(req => new RpcResponse { Value = req.Value });
39+
var request = new RpcRequest { Value = 5 };
40+
var response = bus.Request<RpcRequest, RpcResponse>(request);
41+
42+
Assert.NotNull(response);
43+
Assert.True(request.Value == response.Value);
44+
}
45+
46+
[Fact, Explicit("Requires a RabbitMQ instance on localhost")]
47+
public void Should_throw_when_requesting_over_long_message()
48+
{
49+
Assert.Throws<EasyNetQException>(() =>
50+
{
51+
bus.Respond<MessageWithVeryVEryVEryLongNameThatWillMostCertainlyBreakAmqpsSilly255CharacterNameLimitThatIsAlmostCertainToBeReachedWithGenericTypes, RpcRequest>(
52+
req => new RpcRequest());
53+
54+
bus.Request<MessageWithVeryVEryVEryLongNameThatWillMostCertainlyBreakAmqpsSilly255CharacterNameLimitThatIsAlmostCertainToBeReachedWithGenericTypes, RpcRequest>(
55+
new MessageWithVeryVEryVEryLongNameThatWillMostCertainlyBreakAmqpsSilly255CharacterNameLimitThatIsAlmostCertainToBeReachedWithGenericTypes());
56+
});
57+
}
58+
59+
[Fact, Explicit("Requires a RabbitMQ instance on localhost")]
60+
public void Should_throw_when_responding_to_over_long_message()
61+
{
62+
Assert.Throws<EasyNetQException>(() =>
63+
{
64+
bus.Respond<RpcRequest, MessageWithVeryVEryVEryLongNameThatWillMostCertainlyBreakAmqpsSilly255CharacterNameLimitThatIsAlmostCertainToBeReachedWithGenericTypes>(
65+
req => new MessageWithVeryVEryVEryLongNameThatWillMostCertainlyBreakAmqpsSilly255CharacterNameLimitThatIsAlmostCertainToBeReachedWithGenericTypes());
66+
67+
bus.Request<RpcRequest, MessageWithVeryVEryVEryLongNameThatWillMostCertainlyBreakAmqpsSilly255CharacterNameLimitThatIsAlmostCertainToBeReachedWithGenericTypes>(
68+
new RpcRequest());
69+
});
70+
}
71+
}
72+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using EasyNetQ.DI;
2+
3+
namespace EasyNetQ
4+
{
5+
public static class ConventionsExtensions
6+
{
7+
/// <summary>
8+
/// Shortcut for EnableLegacyTypeNaming() + EnableLegacyRpcConventions()
9+
/// </summary>
10+
public static IServiceRegister EnableLegacyConventions(this IServiceRegister serviceRegister)
11+
{
12+
return serviceRegister
13+
.EnableLegacyTypeNaming()
14+
.EnableLegacyRpcConventions();
15+
}
16+
}
17+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using EasyNetQ.Topology;
2+
3+
namespace EasyNetQ
4+
{
5+
public class LegacyRpcConventions : Conventions
6+
{
7+
public LegacyRpcConventions(ITypeNameSerializer typeNameSerializer)
8+
: base(typeNameSerializer)
9+
{
10+
RpcResponseExchangeNamingConvention = type => Exchange.GetDefault().Name;
11+
}
12+
}
13+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using EasyNetQ.DI;
2+
3+
namespace EasyNetQ
4+
{
5+
public static class LegacyRpcConventionsExtensions
6+
{
7+
public static IServiceRegister EnableLegacyRpcConventions(this IServiceRegister serviceRegister)
8+
{
9+
return serviceRegister.Register<IConventions, LegacyRpcConventions>();
10+
}
11+
}
12+
}

Source/EasyNetQ/Producer/Rpc.cs

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -121,19 +121,19 @@ protected void RegisterErrorHandling<TResponse>(Guid correlationId, Timer timer,
121121

122122
bool isFaulted = false;
123123
string exceptionMessage = "The exception message has not been specified.";
124-
if(msg.Properties.HeadersPresent)
124+
if (msg.Properties.HeadersPresent)
125125
{
126-
if(msg.Properties.Headers.ContainsKey(isFaultedKey))
126+
if (msg.Properties.Headers.ContainsKey(isFaultedKey))
127127
{
128128
isFaulted = Convert.ToBoolean(msg.Properties.Headers[isFaultedKey]);
129129
}
130-
if(msg.Properties.Headers.ContainsKey(exceptionMessageKey))
130+
if (msg.Properties.Headers.ContainsKey(exceptionMessageKey))
131131
{
132132
exceptionMessage = Encoding.UTF8.GetString((byte[])msg.Properties.Headers[exceptionMessageKey]);
133133
}
134134
}
135135

136-
if(isFaulted)
136+
if (isFaulted)
137137
{
138138
tcs.TrySetException(new EasyNetQResponderException(exceptionMessage));
139139
}
@@ -170,14 +170,15 @@ protected virtual string SubscribeToResponse<TRequest, TResponse>()
170170
exclusive: true,
171171
autoDelete: true);
172172

173-
var exchange = DeclareRpcExchange(conventions.RpcResponseExchangeNamingConvention(responseType));
174-
175-
advancedBus.Bind(exchange, queue, queue.Name);
173+
var exchange = DeclareAndBindRpcExchange(
174+
conventions.RpcResponseExchangeNamingConvention(responseType),
175+
queue,
176+
queue.Name);
176177

177178
advancedBus.Consume<TResponse>(queue, (message, messageReceivedInfo) => Task.Factory.StartNew(() =>
178179
{
179180
ResponseAction responseAction;
180-
if(responseActions.TryRemove(message.Properties.CorrelationId, out responseAction))
181+
if (responseActions.TryRemove(message.Properties.CorrelationId, out responseAction))
181182
{
182183
responseAction.OnSuccess(message);
183184
}
@@ -203,8 +204,9 @@ protected virtual void RequestPublish<TRequest>(TRequest request, string routing
203204
where TRequest : class
204205
{
205206
var requestType = typeof(TRequest);
206-
var exchange = publishExchangeDeclareStrategy.DeclareExchange(conventions.RpcRequestExchangeNamingConvention(requestType), ExchangeType.Direct);
207-
207+
208+
var exchange = DeclareRpcExchange(conventions.RpcRequestExchangeNamingConvention(requestType));
209+
208210
var requestMessage = new Message<TRequest>(request)
209211
{
210212
Properties =
@@ -241,9 +243,12 @@ public virtual IDisposable Respond<TRequest, TResponse>(Func<TRequest, Task<TRes
241243

242244
var routingKey = configuration.QueueName ?? conventions.RpcRoutingKeyNamingConvention(requestType);
243245

244-
var exchange = advancedBus.ExchangeDeclare(conventions.RpcRequestExchangeNamingConvention(requestType), ExchangeType.Direct);
245246
var queue = advancedBus.QueueDeclare(routingKey);
246-
advancedBus.Bind(exchange, queue, routingKey);
247+
248+
var exchange = DeclareAndBindRpcExchange(
249+
conventions.RpcRequestExchangeNamingConvention(requestType),
250+
queue,
251+
routingKey);
247252

248253
return advancedBus.Consume<TRequest>(queue, (requestMessage, messageReceivedInfo) => ExecuteResponder(responder, requestMessage),
249254
c => c.WithPrefetchCount(configuration.PrefetchCount));
@@ -261,7 +266,7 @@ protected Task ExecuteResponder<TRequest, TResponse>(Func<TRequest, Task<TRespon
261266
{
262267
if (task.IsFaulted || task.IsCanceled)
263268
{
264-
var exception = task.IsCanceled
269+
var exception = task.IsCanceled
265270
? new EasyNetQResponderException("The responder task was cancelled.")
266271
: task.Exception?.InnerException ?? new EasyNetQResponderException("The responder faulted while dispatching the message.");
267272

@@ -326,7 +331,24 @@ protected virtual void OnResponderFailure<TRequest, TResponse>(IMessage<TRequest
326331

327332
private IExchange DeclareRpcExchange(string exchangeName)
328333
{
329-
return publishExchangeDeclareStrategy.DeclareExchange(exchangeName, ExchangeType.Direct);
334+
if (exchangeName != Exchange.GetDefault().Name)
335+
{
336+
return publishExchangeDeclareStrategy.DeclareExchange(exchangeName, ExchangeType.Direct);
337+
}
338+
else
339+
{
340+
return Exchange.GetDefault();
341+
}
342+
}
343+
344+
private IExchange DeclareAndBindRpcExchange(string exchangeName, IQueue queue, string routingKey)
345+
{
346+
var exchange = DeclareRpcExchange(exchangeName);
347+
if (exchange != Exchange.GetDefault())
348+
{
349+
advancedBus.Bind(exchange, queue, routingKey);
350+
}
351+
return exchange;
330352
}
331353
}
332354
}

0 commit comments

Comments
 (0)