Skip to content

Commit b504a0a

Browse files
committed
Add isRequesterAlive check in request / response support
1 parent fdc1488 commit b504a0a

File tree

9 files changed

+198
-7
lines changed

9 files changed

+198
-7
lines changed

src/main/java/com/rabbitmq/client/amqp/Publisher.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ public interface Publisher extends AutoCloseable, Resource {
3232
/**
3333
* Create a message meant to be published by the publisher instance.
3434
*
35-
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should
36-
* not be modified or even reused.
35+
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should not
36+
* be modified or even reused.
3737
*
3838
* @return a message
3939
*/
@@ -42,8 +42,8 @@ public interface Publisher extends AutoCloseable, Resource {
4242
/**
4343
* Create a message meant to be published by the publisher instance.
4444
*
45-
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should
46-
* not be modified or even reused.
45+
* <p>Once published with the {@link #publish(Message, Callback)} the message instance should not
46+
* be modified or even reused.
4747
*
4848
* @param body message body
4949
* @return a message with the provided body

src/main/java/com/rabbitmq/client/amqp/RpcServer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,20 @@ interface Handler {
4141
/** Request processing context. */
4242
interface Context {
4343

44+
/**
45+
* Tell whether the requester is still able to receive the response.
46+
*
47+
* <p>The call assumes a reply-to queue address has been set on the request message and checks
48+
* whether this queue still exists or not.
49+
*
50+
* <p>A time-consuming request handler can use this call from time to time to make sure it still
51+
* worth keeping processing the request.
52+
*
53+
* @param request the incoming request
54+
* @return true if the requester is still considered alive, false otherwise
55+
*/
56+
boolean isRequesterAlive(Message request);
57+
4458
/**
4559
* Create a message meant to be published by the underlying publisher instance.
4660
*

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -713,11 +713,18 @@ private static class DefaultQueueInfo implements QueueInfo {
713713

714714
@SuppressWarnings("unchecked")
715715
private DefaultQueueInfo(Map<String, Object> response) {
716+
QueueType queueType;
716717
this.name = (String) response.get("name");
717718
this.durable = (Boolean) response.get("durable");
718719
this.autoDelete = (Boolean) response.get("auto_delete");
719720
this.exclusive = (Boolean) response.get("exclusive");
720-
this.type = QueueType.valueOf(((String) response.get("type")).toUpperCase(Locale.ENGLISH));
721+
try {
722+
queueType = QueueType.valueOf(((String) response.get("type")).toUpperCase(Locale.ENGLISH));
723+
} catch (Exception e) {
724+
// this happens for reply-to queues, no need to make the type public
725+
queueType = null;
726+
}
727+
this.type = queueType;
721728
this.arguments = Map.copyOf((Map<String, Object>) response.get("arguments"));
722729
this.leader = (String) response.get("leader");
723730
String[] members = (String[]) response.get("replicas");

src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,30 @@ final class AmqpRpcServer implements RpcServer {
6666

6767
Context context =
6868
new Context() {
69+
70+
@Override
71+
public boolean isRequesterAlive(Message message) {
72+
String replyToAddr = message.replyTo();
73+
String replyToQueue = Utils.extractQueueName(replyToAddr);
74+
boolean replyToOk = true;
75+
if (replyToQueue != null) {
76+
try {
77+
connection.management().queueInfo(replyToQueue);
78+
} catch (AmqpException.AmqpEntityDoesNotExistException e) {
79+
replyToOk = false;
80+
} catch (Exception e) {
81+
if (LOGGER.isWarnEnabled()) {
82+
LOGGER.warn(
83+
"Error while checking reply queue '{}' ({}): {}",
84+
replyToQueue,
85+
replyToAddr,
86+
e.getMessage());
87+
}
88+
}
89+
}
90+
return replyToOk;
91+
}
92+
6993
@Override
7094
public Message message() {
7195
return publisher.message();
@@ -95,6 +119,7 @@ public Message message(byte[] body) {
95119
} else {
96120
this.replyPostProcessor = builder.replyPostProcessor();
97121
}
122+
98123
this.consumer =
99124
this.connection
100125
.consumerBuilder()
@@ -104,8 +129,9 @@ public Message message(byte[] body) {
104129
Object correlationId = null;
105130
try {
106131
Message reply = handler.handle(context, msg);
107-
if (reply != null && msg.replyTo() != null) {
108-
reply.to(msg.replyTo());
132+
String replyToAddr = msg.replyTo();
133+
if (reply != null && replyToAddr != null) {
134+
reply.to(replyToAddr);
109135
}
110136
correlationId = correlationIdExtractor.apply(msg);
111137
reply = replyPostProcessor.apply(reply, correlationId);

src/main/java/com/rabbitmq/client/amqp/impl/UriUtils.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,38 @@ private static String encode(String value, BitSet safeCharacters) {
106106
}
107107
return buf.toString();
108108
}
109+
110+
static String decode(final CharSequence content) {
111+
return decode(content, false);
112+
}
113+
114+
private static String decode(final CharSequence content, final boolean plusAsBlank) {
115+
if (content == null) {
116+
return null;
117+
}
118+
final ByteBuffer bb = ByteBuffer.allocate(content.length());
119+
final CharBuffer cb = CharBuffer.wrap(content);
120+
while (cb.hasRemaining()) {
121+
final char c = cb.get();
122+
if (c == '%' && cb.remaining() >= 2) {
123+
final char uc = cb.get();
124+
final char lc = cb.get();
125+
final int u = Character.digit(uc, RADIX);
126+
final int l = Character.digit(lc, RADIX);
127+
if (u != -1 && l != -1) {
128+
bb.put((byte) ((u << 4) + l));
129+
} else {
130+
bb.put((byte) '%');
131+
bb.put((byte) uc);
132+
bb.put((byte) lc);
133+
}
134+
} else if (plusAsBlank && c == '+') {
135+
bb.put((byte) ' ');
136+
} else {
137+
bb.put((byte) c);
138+
}
139+
}
140+
bb.flip();
141+
return StandardCharsets.UTF_8.decode(bb).toString();
142+
}
109143
}

src/main/java/com/rabbitmq/client/amqp/impl/Utils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,4 +367,12 @@ interface RunnableWithException {
367367

368368
void run() throws Exception;
369369
}
370+
371+
static String extractQueueName(String address) {
372+
if (address == null || !address.startsWith("/queues/")) {
373+
return null;
374+
} else {
375+
return UriUtils.decode(address.replaceFirst("/queues/", ""));
376+
}
377+
}
370378
}

src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,87 @@ void rpcWithDefaults() {
107107
}
108108
}
109109

110+
@Test
111+
void rpcIsRequesterAliveShouldReturnTrueIfPublisherStillOpen() throws Exception {
112+
try (Connection clientConnection = environment.connectionBuilder().build();
113+
Connection serverConnection = environment.connectionBuilder().build()) {
114+
115+
String requestQueue = serverConnection.management().queue().exclusive(true).declare().name();
116+
117+
RpcClient rpcClient =
118+
clientConnection
119+
.rpcClientBuilder()
120+
.requestAddress()
121+
.queue(requestQueue)
122+
.rpcClient()
123+
.build();
124+
125+
List<Boolean> calls = new CopyOnWriteArrayList<>();
126+
serverConnection
127+
.rpcServerBuilder()
128+
.requestQueue(requestQueue)
129+
.handler(
130+
(ctx, request) -> {
131+
calls.add(ctx.isRequesterAlive(request));
132+
return HANDLER.handle(ctx, request);
133+
})
134+
.build();
135+
136+
String request = UUID.randomUUID().toString();
137+
CompletableFuture<Message> responseFuture =
138+
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
139+
Message response = responseFuture.get(10, TimeUnit.SECONDS);
140+
assertThat(response.body()).asString(UTF_8).isEqualTo(process(request));
141+
assertThat(calls).containsExactly(true);
142+
}
143+
}
144+
145+
@Test
146+
void rpcIsRequesterAliveShouldReturnFalseIfPublisherClosed() {
147+
try (Connection clientConnection = environment.connectionBuilder().build();
148+
Connection serverConnection = environment.connectionBuilder().build()) {
149+
150+
String requestQueue = serverConnection.management().queue().exclusive(true).declare().name();
151+
152+
RpcClient rpcClient =
153+
clientConnection
154+
.rpcClientBuilder()
155+
.requestAddress()
156+
.queue(requestQueue)
157+
.rpcClient()
158+
.build();
159+
160+
Sync requestReceivedSync = sync();
161+
Sync requesterClosedSync = sync();
162+
163+
List<Boolean> calls = new CopyOnWriteArrayList<>();
164+
serverConnection
165+
.rpcServerBuilder()
166+
.requestQueue(requestQueue)
167+
.handler(
168+
(ctx, request) -> {
169+
calls.add(ctx.isRequesterAlive(request));
170+
requestReceivedSync.down();
171+
requesterClosedSync.await(Duration.ofSeconds(20));
172+
calls.add(ctx.isRequesterAlive(request));
173+
return null;
174+
})
175+
.build();
176+
177+
String request = UUID.randomUUID().toString();
178+
CompletableFuture<Message> responseFuture =
179+
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
180+
assertThat(requestReceivedSync).completes();
181+
assertThat(calls).containsExactly(true);
182+
rpcClient.close();
183+
assertThat(responseFuture).completesExceptionallyWithin(Duration.ofSeconds(10));
184+
requesterClosedSync.down();
185+
waitAtMost(() -> calls.size() == 2);
186+
assertThat(calls).containsExactly(true, false);
187+
waitAtMost(() -> serverConnection.management().queueInfo(requestQueue).messageCount() == 0);
188+
}
189+
}
190+
110191
@Test
111192
void rpcWithCustomSettings() {
112193
try (Connection clientConnection = environment.connectionBuilder().build();

src/test/java/com/rabbitmq/client/amqp/impl/UriUtilsTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,13 @@ void encodeParamTest(String param, String expected) {
5555
void encodeNonUnreservedTest(String param, String expected) {
5656
assertThat(encodeNonUnreserved(param)).isEqualTo(expected);
5757
}
58+
59+
@ParameterizedTest
60+
@CsvSource({
61+
"test,test",
62+
"amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjc1MDQ2NDQAAAJ6AAAAAGi1jj8%3D.0T5a3Sa%2BQ7ZRPeMFMi%2BJ0A%3D%3D,amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjc1MDQ2NDQAAAJ6AAAAAGi1jj8=.0T5a3Sa+Q7ZRPeMFMi+J0A=="
63+
})
64+
void decodeTest(String in, String expected) {
65+
assertThat(decode(in)).isEqualTo(expected);
66+
}
5867
}

src/test/java/com/rabbitmq/client/amqp/impl/UtilsTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import static com.rabbitmq.client.amqp.impl.Utils.checkMessageAnnotations;
21+
import static com.rabbitmq.client.amqp.impl.Utils.extractQueueName;
2122
import static java.util.Map.of;
2223
import static org.assertj.core.api.Assertions.assertThat;
2324
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2425

2526
import org.junit.jupiter.api.Test;
2627
import org.junit.jupiter.params.ParameterizedTest;
28+
import org.junit.jupiter.params.provider.CsvSource;
2729
import org.junit.jupiter.params.provider.ValueSource;
2830

2931
public class UtilsTest {
@@ -100,4 +102,14 @@ void validateBrokerVersionParsing41AndLater(String brokerVersion) {
100102
assertThat(Utils.is4_1_OrMore(brokerVersion)).isTrue();
101103
assertThat(Utils.supportFilterExpressions(brokerVersion)).isTrue();
102104
}
105+
106+
@ParameterizedTest
107+
@CsvSource({
108+
"/queues/q1,q1",
109+
"/exchanges/foo,",
110+
"/queues/amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjc1MDQ2NDQAAAJ6AAAAAGi1jj8%3D.0T5a3Sa%2BQ7ZRPeMFMi%2BJ0A%3D%3D,amq.rabbitmq.reply-to.g1h2AA9yZXBseUAxMjc1MDQ2NDQAAAJ6AAAAAGi1jj8=.0T5a3Sa+Q7ZRPeMFMi+J0A=="
111+
})
112+
void extractQueueNameTest(String address, String expected) {
113+
assertThat(extractQueueName(address)).isEqualTo(expected);
114+
}
103115
}

0 commit comments

Comments
 (0)