Skip to content

Commit 56f29b9

Browse files
authored
Merge pull request #164 from rabbitmq/rpc-server-accept-discard
Make RPC server accept or discard request message
2 parents d72a2f0 + 0ca086f commit 56f29b9

File tree

3 files changed

+115
-35
lines changed

3 files changed

+115
-35
lines changed

src/docs/asciidoc/usage.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@ include::{test-examples}/RpcApi.java[tag=rpc-client-request]
220220
The `RpcClient#publish(Message)` method returns a `CompletableFuture<Message>` that holds the reply message.
221221
It is then possible to wait for the reply asynchronously or synchronously.
222222

223+
The RPC server has the following behavior:
224+
225+
* when receiving a message request, it calls the processing logic (handler), extracts the correlation ID, calls a reply post-processor if defined, and sends the reply message.
226+
* if all these operations succeed, the server accepts the request message (settles it with the `ACCEPTED` outcome).
227+
* if any of these operations throws an exception, the server discards the request message (the message is removed from the request queue and is https://www.rabbitmq.com/client-libraries/amqp-client-libraries#message-processing-result-outcome[dead-lettered] if configured).
228+
223229
The RPC server uses the following defaults:
224230

225231
* it uses the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`message-id` property] for the correlation ID.

src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,24 @@ public Message message(byte[] body) {
8989
.queue(builder.requestQueue())
9090
.messageHandler(
9191
(ctx, msg) -> {
92-
ctx.accept();
93-
Message reply = handler.handle(context, msg);
94-
if (reply != null && msg.replyTo() != null) {
95-
reply.to(msg.replyTo());
96-
}
97-
Object correlationId = correlationIdExtractor.apply(msg);
98-
reply = replyPostProcessor.apply(reply, correlationId);
99-
if (reply != null && reply.to() != null) {
100-
sendReply(reply);
92+
Object correlationId = null;
93+
try {
94+
Message reply = handler.handle(context, msg);
95+
if (reply != null && msg.replyTo() != null) {
96+
reply.to(msg.replyTo());
97+
}
98+
correlationId = correlationIdExtractor.apply(msg);
99+
reply = replyPostProcessor.apply(reply, correlationId);
100+
if (reply != null && reply.to() != null) {
101+
sendReply(reply);
102+
}
103+
ctx.accept();
104+
} catch (Exception e) {
105+
LOGGER.info(
106+
"Error while processing RPC request (correlation ID {}): {}",
107+
correlationId,
108+
e.getMessage());
109+
ctx.discard();
101110
}
102111
})
103112
.build();

src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java

Lines changed: 91 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20-
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
20+
import static com.rabbitmq.client.amqp.Management.ExchangeType.FANOUT;
21+
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
22+
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
2123
import static java.nio.charset.StandardCharsets.UTF_8;
2224
import static java.time.Duration.ofMillis;
25+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2326
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2428
import static org.assertj.core.api.Assertions.fail;
2529

2630
import com.rabbitmq.client.amqp.*;
31+
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
2732
import java.time.Duration;
2833
import java.util.Random;
2934
import java.util.UUID;
@@ -75,7 +80,7 @@ void rpcWithDefaults() {
7580
serverConnection.rpcServerBuilder().requestQueue(requestQueue).handler(HANDLER).build();
7681

7782
int requestCount = 100;
78-
CountDownLatch latch = new CountDownLatch(requestCount);
83+
Sync sync = sync(requestCount);
7984
IntStream.range(0, requestCount)
8085
.forEach(
8186
ignored ->
@@ -86,10 +91,10 @@ void rpcWithDefaults() {
8691
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
8792
Message response = responseFuture.get(10, TimeUnit.SECONDS);
8893
assertThat(response.body()).asString(UTF_8).isEqualTo(process(request));
89-
latch.countDown();
94+
sync.down();
9095
return null;
9196
}));
92-
Assertions.assertThat(latch).completes();
97+
assertThat(sync).completes();
9398
}
9499
}
95100

@@ -136,7 +141,7 @@ void rpcWithCustomSettings() {
136141
.build();
137142

138143
int requestCount = 100;
139-
CountDownLatch latch = new CountDownLatch(requestCount);
144+
Sync sync = sync(requestCount);
140145
IntStream.range(0, requestCount)
141146
.forEach(
142147
ignored ->
@@ -146,11 +151,13 @@ void rpcWithCustomSettings() {
146151
CompletableFuture<Message> responseFuture =
147152
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
148153
Message response = responseFuture.get(10, TimeUnit.SECONDS);
149-
assertThat(response.body()).asString(UTF_8).isEqualTo(process(request));
150-
latch.countDown();
154+
org.assertj.core.api.Assertions.assertThat(response.body())
155+
.asString(UTF_8)
156+
.isEqualTo(process(request));
157+
sync.down();
151158
return null;
152159
}));
153-
Assertions.assertThat(latch).completes();
160+
assertThat(sync).completes();
154161
}
155162
}
156163

@@ -184,7 +191,7 @@ void rpcUseCorrelationIdRequestProperty() {
184191
.build();
185192

186193
int requestCount = 100;
187-
CountDownLatch latch = new CountDownLatch(requestCount);
194+
Sync sync = sync(requestCount);
188195
IntStream.range(0, requestCount)
189196
.forEach(
190197
ignored ->
@@ -195,10 +202,10 @@ void rpcUseCorrelationIdRequestProperty() {
195202
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
196203
Message response = responseFuture.get(10, TimeUnit.SECONDS);
197204
assertThat(response.body()).asString(UTF_8).isEqualTo(process(request));
198-
latch.countDown();
205+
sync.down();
199206
return null;
200207
}));
201-
Assertions.assertThat(latch).completes();
208+
assertThat(sync).completes();
202209
}
203210
}
204211

@@ -207,16 +214,16 @@ void rpcUseCorrelationIdRequestProperty() {
207214
void rpcShouldRecoverAfterConnectionIsClosed(boolean isolateResources)
208215
throws ExecutionException, InterruptedException, TimeoutException {
209216
String clientConnectionName = UUID.randomUUID().toString();
210-
CountDownLatch clientConnectionLatch = new CountDownLatch(1);
217+
Sync clientConnectionSync = sync();
211218
String serverConnectionName = UUID.randomUUID().toString();
212-
CountDownLatch serverConnectionLatch = new CountDownLatch(1);
219+
Sync serverConnectionSync = sync();
213220

214221
BackOffDelayPolicy backOffDelayPolicy = BackOffDelayPolicy.fixed(ofMillis(100));
215222
Connection serverConnection =
216223
connectionBuilder()
217224
.name(serverConnectionName)
218225
.isolateResources(isolateResources)
219-
.listeners(recoveredListener(serverConnectionLatch))
226+
.listeners(recoveredListener(serverConnectionSync))
220227
.recovery()
221228
.backOffDelayPolicy(backOffDelayPolicy)
222229
.connectionBuilder()
@@ -226,7 +233,7 @@ void rpcShouldRecoverAfterConnectionIsClosed(boolean isolateResources)
226233
connectionBuilder()
227234
.name(clientConnectionName)
228235
.isolateResources(isolateResources)
229-
.listeners(recoveredListener(clientConnectionLatch))
236+
.listeners(recoveredListener(clientConnectionSync))
230237
.recovery()
231238
.backOffDelayPolicy(backOffDelayPolicy)
232239
.connectionBuilder()
@@ -254,7 +261,7 @@ void rpcShouldRecoverAfterConnectionIsClosed(boolean isolateResources)
254261
} catch (AmqpException e) {
255262
// OK
256263
}
257-
Assertions.assertThat(clientConnectionLatch).completes();
264+
assertThat(clientConnectionSync).completes();
258265
requestBody = request(UUID.randomUUID().toString());
259266
response = rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID()));
260267
assertThat(response.get(10, TimeUnit.SECONDS).body()).isEqualTo(process(requestBody));
@@ -263,7 +270,7 @@ void rpcShouldRecoverAfterConnectionIsClosed(boolean isolateResources)
263270
requestBody = request(UUID.randomUUID().toString());
264271
response = rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID()));
265272
assertThat(response.get(10, TimeUnit.SECONDS).body()).isEqualTo(process(requestBody));
266-
Assertions.assertThat(serverConnectionLatch).completes();
273+
assertThat(serverConnectionSync).completes();
267274
requestBody = request(UUID.randomUUID().toString());
268275
response = rpcClient.publish(rpcClient.message(requestBody).messageId(UUID.randomUUID()));
269276
assertThat(response.get(10, TimeUnit.SECONDS).body()).isEqualTo(process(requestBody));
@@ -308,7 +315,7 @@ void poisonRequestsShouldTimeout() {
308315
int requestCount = 100;
309316
AtomicInteger expectedPoisonCount = new AtomicInteger();
310317
AtomicInteger timedOutRequestCount = new AtomicInteger();
311-
CountDownLatch latch = new CountDownLatch(requestCount);
318+
Sync sync = sync(requestCount);
312319
Random random = new Random();
313320
IntStream.range(0, requestCount)
314321
.forEach(
@@ -330,18 +337,18 @@ void poisonRequestsShouldTimeout() {
330337
if (ex != null) {
331338
timedOutRequestCount.incrementAndGet();
332339
}
333-
latch.countDown();
340+
sync.down();
334341
return null;
335342
});
336343
});
337344
});
338-
Assertions.assertThat(latch).completes();
345+
assertThat(sync).completes();
339346
assertThat(timedOutRequestCount).hasPositiveValue().hasValue(expectedPoisonCount.get());
340347
}
341348
}
342349

343350
@Test
344-
void outstandingRequestsShouldCompleteExceptionallyOnRpcClientClosing() throws Exception {
351+
void outstandingRequestsShouldCompleteExceptionallyOnRpcClientClosing() {
345352
try (Connection clientConnection = environment.connectionBuilder().build();
346353
Connection serverConnection = environment.connectionBuilder().build()) {
347354

@@ -375,7 +382,7 @@ void outstandingRequestsShouldCompleteExceptionallyOnRpcClientClosing() throws E
375382
AtomicInteger timedOutRequestCount = new AtomicInteger();
376383
AtomicInteger completedRequestCount = new AtomicInteger();
377384
Random random = new Random();
378-
CountDownLatch allRequestSubmitted = new CountDownLatch(requestCount);
385+
Sync allRequestSubmitted = sync(requestCount);
379386
IntStream.range(0, requestCount)
380387
.forEach(
381388
ignored -> {
@@ -401,16 +408,74 @@ void outstandingRequestsShouldCompleteExceptionallyOnRpcClientClosing() throws E
401408
return null;
402409
});
403410
});
404-
allRequestSubmitted.countDown();
411+
allRequestSubmitted.down();
405412
});
406-
Assertions.assertThat(allRequestSubmitted).completes();
413+
assertThat(allRequestSubmitted).completes();
407414
waitAtMost(() -> completedRequestCount.get() == requestCount - expectedPoisonCount.get());
408415
assertThat(timedOutRequestCount).hasValue(0);
409416
rpcClient.close();
410417
assertThat(timedOutRequestCount).hasPositiveValue().hasValue(expectedPoisonCount.get());
411418
}
412419
}
413420

421+
@Test
422+
void errorDuringProcessingShouldDiscardMessageAndDeadLetterIfSet(TestInfo info)
423+
throws ExecutionException, InterruptedException, TimeoutException {
424+
try (Connection clientConnection = environment.connectionBuilder().build();
425+
Connection serverConnection = environment.connectionBuilder().build()) {
426+
427+
String dlx = name(info);
428+
String dlq = name(info);
429+
Management management = serverConnection.management();
430+
management.exchange(dlx).type(FANOUT).autoDelete(true).declare();
431+
management.queue(dlq).exclusive(true).declare();
432+
management.binding().sourceExchange(dlx).destinationQueue(dlq).bind();
433+
434+
String requestQueue =
435+
management.queue().exclusive(true).deadLetterExchange(dlx).declare().name();
436+
437+
Duration requestTimeout = Duration.ofSeconds(1);
438+
RpcClient rpcClient =
439+
clientConnection
440+
.rpcClientBuilder()
441+
.requestTimeout(requestTimeout)
442+
.requestAddress()
443+
.queue(requestQueue)
444+
.rpcClient()
445+
.build();
446+
447+
serverConnection
448+
.rpcServerBuilder()
449+
.requestQueue(requestQueue)
450+
.handler(
451+
(ctx, request) -> {
452+
String body = new String(request.body(), UTF_8);
453+
if (body.contains("poison")) {
454+
throw new RuntimeException("Poison message");
455+
}
456+
return HANDLER.handle(ctx, request);
457+
})
458+
.build();
459+
460+
String request = UUID.randomUUID().toString();
461+
CompletableFuture<Message> responseFuture =
462+
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
463+
Message response = responseFuture.get(10, TimeUnit.SECONDS);
464+
assertThat(response.body()).asString(UTF_8).isEqualTo(process(request));
465+
466+
assertThat(management.queueInfo(dlq)).isEmpty();
467+
468+
request = "poison";
469+
CompletableFuture<Message> poisonFuture =
470+
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
471+
waitAtMost(() -> management.queueInfo(dlq).messageCount() == 1);
472+
assertThatThrownBy(
473+
() -> poisonFuture.get(requestTimeout.multipliedBy(3).toMillis(), MILLISECONDS))
474+
.isInstanceOf(ExecutionException.class)
475+
.hasCauseInstanceOf(AmqpException.class);
476+
}
477+
}
478+
414479
private static AmqpConnectionBuilder connectionBuilder() {
415480
return (AmqpConnectionBuilder) environment.connectionBuilder();
416481
}
@@ -427,11 +492,11 @@ private static byte[] process(byte[] in) {
427492
return process(new String(in, UTF_8)).getBytes(UTF_8);
428493
}
429494

430-
private static Resource.StateListener recoveredListener(CountDownLatch latch) {
495+
private static Resource.StateListener recoveredListener(Sync sync) {
431496
return context -> {
432497
if (context.previousState() == Resource.State.RECOVERING
433498
&& context.currentState() == Resource.State.OPEN) {
434-
latch.countDown();
499+
sync.down();
435500
}
436501
};
437502
}

0 commit comments

Comments
 (0)