Skip to content

Commit 0e01187

Browse files
committed
GH-3002: Add RPC support to RabbitAmqpTemplate
Fixes: #3002 * Implement `sendAndReceive` & `receiveAndReply` operations in the `RabbitAmqpTemplate` * Expose contracts to the `AsyncAmqpTemplate` * Some NullAway fixes for `AsyncAmqpTemplate` hierarchy * Move DLQ objects for testing to the common `RabbitAmqpTestBase`
1 parent 434410f commit 0e01187

File tree

8 files changed

+412
-106
lines changed

8 files changed

+412
-106
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/AsyncAmqpTemplate.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,30 @@ default CompletableFuture<Object> receiveAndConvert(String queueName) {
9292
throw new UnsupportedOperationException();
9393
}
9494

95-
default <T> CompletableFuture<T> receiveAndConvert(ParameterizedTypeReference<T> type) {
95+
default <T> CompletableFuture<T> receiveAndConvert(@Nullable ParameterizedTypeReference<T> type) {
9696
throw new UnsupportedOperationException();
9797
}
9898

99-
default <T> CompletableFuture<T> receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) {
99+
default <T> CompletableFuture<T> receiveAndConvert(String queueName, @Nullable ParameterizedTypeReference<T> type) {
100+
throw new UnsupportedOperationException();
101+
}
102+
103+
default <R, S> CompletableFuture<Boolean> receiveAndReply(ReceiveAndReplyCallback<R, S> callback) {
104+
throw new UnsupportedOperationException();
105+
}
106+
107+
/**
108+
* Perform a server-side RPC functionality.
109+
* The request message must have a {@code replyTo} property.
110+
* The request {@code messageId} property is used for correlation.
111+
* The callback might not produce a reply with the meaning nothing to answer.
112+
* @param queueName the queue to consume request.
113+
* @param callback an application callback to handle request and produce reply.
114+
* @return the completion status: true if no errors and reply has been produced.
115+
* @param <R> the request body type.
116+
* @param <S> the response body type
117+
*/
118+
default <R, S> CompletableFuture<Boolean> receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) {
100119
throw new UnsupportedOperationException();
101120
}
102121

@@ -240,8 +259,9 @@ <C> CompletableFuture<C> convertSendAndReceiveAsType(String exchange, String rou
240259
* @param <C> the expected result type.
241260
* @return the {@link CompletableFuture}.
242261
*/
243-
<C> CompletableFuture<C> convertSendAndReceiveAsType(Object object, MessagePostProcessor messagePostProcessor,
244-
ParameterizedTypeReference<C> responseType);
262+
<C> CompletableFuture<C> convertSendAndReceiveAsType(Object object,
263+
@Nullable MessagePostProcessor messagePostProcessor,
264+
@Nullable ParameterizedTypeReference<C> responseType);
245265

246266
/**
247267
* Convert the object to a message and send it to the default exchange with the

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(String exchange,
464464

465465
@Override
466466
public <C> RabbitConverterFuture<C> convertSendAndReceiveAsType(Object object,
467-
MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType) {
467+
@Nullable MessagePostProcessor messagePostProcessor, @Nullable ParameterizedTypeReference<C> responseType) {
468468

469469
return convertSendAndReceiveAsType(this.template.getExchange(), this.template.getRoutingKey(), object,
470470
messagePostProcessor, responseType);

0 commit comments

Comments
 (0)