Skip to content

Commit 6f0a338

Browse files
committed
Move some of RabbitAmqpTemplate method up to AsyncAmqpTemplate
* Improve Client module tests
1 parent d645969 commit 6f0a338

File tree

5 files changed

+113
-6
lines changed

5 files changed

+113
-6
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/AsyncAmqpTemplate.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,79 @@
2727
* receive operations using {@link CompletableFuture}s.
2828
*
2929
* @author Gary Russell
30+
* @author Artem Bilan
31+
*
3032
* @since 2.0
3133
*
3234
*/
3335
public interface AsyncAmqpTemplate {
3436

37+
default CompletableFuture<Boolean> send(Message message) {
38+
throw new UnsupportedOperationException();
39+
}
40+
41+
default CompletableFuture<Boolean> send(String queue, Message message) {
42+
throw new UnsupportedOperationException();
43+
}
44+
45+
default CompletableFuture<Boolean> send(String exchange, @Nullable String routingKey, Message message) {
46+
throw new UnsupportedOperationException();
47+
}
48+
49+
default CompletableFuture<Boolean> convertAndSend(Object message) {
50+
throw new UnsupportedOperationException();
51+
}
52+
53+
default CompletableFuture<Boolean> convertAndSend(String queue, Object message) {
54+
throw new UnsupportedOperationException();
55+
}
56+
57+
default CompletableFuture<Boolean> convertAndSend(String exchange, @Nullable String routingKey, Object message) {
58+
throw new UnsupportedOperationException();
59+
}
60+
61+
default CompletableFuture<Boolean> convertAndSend(Object message,
62+
@Nullable MessagePostProcessor messagePostProcessor) {
63+
64+
throw new UnsupportedOperationException();
65+
}
66+
67+
default CompletableFuture<Boolean> convertAndSend(String queue, Object message,
68+
@Nullable MessagePostProcessor messagePostProcessor) {
69+
70+
throw new UnsupportedOperationException();
71+
}
72+
73+
default CompletableFuture<Boolean> convertAndSend(String exchange, @Nullable String routingKey, Object message,
74+
@Nullable MessagePostProcessor messagePostProcessor) {
75+
76+
throw new UnsupportedOperationException();
77+
}
78+
79+
default CompletableFuture<Message> receive() {
80+
throw new UnsupportedOperationException();
81+
}
82+
83+
default CompletableFuture<Message> receive(String queueName) {
84+
throw new UnsupportedOperationException();
85+
}
86+
87+
default CompletableFuture<Object> receiveAndConvert() {
88+
throw new UnsupportedOperationException();
89+
}
90+
91+
default CompletableFuture<Object> receiveAndConvert(String queueName) {
92+
throw new UnsupportedOperationException();
93+
}
94+
95+
default <T> CompletableFuture<T> receiveAndConvert(ParameterizedTypeReference<T> type) {
96+
throw new UnsupportedOperationException();
97+
}
98+
99+
default <T> CompletableFuture<T> receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) {
100+
throw new UnsupportedOperationException();
101+
}
102+
35103
/**
36104
* Send a message to the default exchange with the default routing key. If the message
37105
* contains a correlationId property, it must be unique.

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTemplate.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ public void destroy() {
156156
* @param message to publish
157157
* @return the {@link CompletableFuture} as an async result of the message publication.
158158
*/
159+
@Override
159160
public CompletableFuture<Boolean> send(Message message) {
161+
Assert.state(this.defaultExchange != null || this.defaultQueue != null,
162+
"For send with defaults, an 'exchange' (and optional 'key') or 'queue' must be provided");
160163
return doSend(this.defaultExchange, this.defaultRoutingKey, this.defaultQueue, message);
161164
}
162165

@@ -166,10 +169,12 @@ public CompletableFuture<Boolean> send(Message message) {
166169
* @param message to publish
167170
* @return the {@link CompletableFuture} as an async result of the message publication.
168171
*/
172+
@Override
169173
public CompletableFuture<Boolean> send(String queue, Message message) {
170174
return doSend(null, null, queue, message);
171175
}
172176

177+
@Override
173178
public CompletableFuture<Boolean> send(String exchange, @Nullable String routingKey, Message message) {
174179
return doSend(exchange, routingKey != null ? routingKey : this.defaultRoutingKey, null, message);
175180
}
@@ -226,30 +231,38 @@ private CompletableFuture<Boolean> doSend(@Nullable String exchange, @Nullable S
226231
* @param message to publish
227232
* @return the {@link CompletableFuture} as an async result of the message publication.
228233
*/
234+
@Override
229235
public CompletableFuture<Boolean> convertAndSend(Object message) {
236+
Assert.state(this.defaultExchange != null || this.defaultQueue != null,
237+
"For send with defaults, an 'exchange' (and optional 'key') or 'queue' must be provided");
230238
return doConvertAndSend(this.defaultExchange, this.defaultRoutingKey, this.defaultQueue, message, null);
231239
}
232240

241+
@Override
233242
public CompletableFuture<Boolean> convertAndSend(String queue, Object message) {
234243
return doConvertAndSend(null, null, queue, message, null);
235244
}
236245

246+
@Override
237247
public CompletableFuture<Boolean> convertAndSend(String exchange, @Nullable String routingKey, Object message) {
238248
return doConvertAndSend(exchange, routingKey != null ? routingKey : this.defaultRoutingKey, null, message, null);
239249
}
240250

251+
@Override
241252
public CompletableFuture<Boolean> convertAndSend(Object message,
242253
@Nullable MessagePostProcessor messagePostProcessor) {
243254

244255
return doConvertAndSend(null, null, null, message, messagePostProcessor);
245256
}
246257

258+
@Override
247259
public CompletableFuture<Boolean> convertAndSend(String queue, Object message,
248260
@Nullable MessagePostProcessor messagePostProcessor) {
249261

250262
return doConvertAndSend(null, null, queue, message, messagePostProcessor);
251263
}
252264

265+
@Override
253266
public CompletableFuture<Boolean> convertAndSend(String exchange, @Nullable String routingKey, Object message,
254267
@Nullable MessagePostProcessor messagePostProcessor) {
255268

@@ -269,11 +282,13 @@ private CompletableFuture<Boolean> doConvertAndSend(@Nullable String exchange, @
269282
return doSend(exchange, routingKey, queue, message);
270283
}
271284

285+
@Override
272286
public CompletableFuture<Message> receive() {
273287
return receive(getRequiredQueue());
274288
}
275289

276290
@SuppressWarnings("try")
291+
@Override
277292
public CompletableFuture<Message> receive(String queueName) {
278293
CompletableFuture<Message> messageFuture = new CompletableFuture<>();
279294

@@ -293,10 +308,12 @@ public CompletableFuture<Message> receive(String queueName) {
293308
.whenComplete((message, exception) -> consumer.close());
294309
}
295310

311+
@Override
296312
public CompletableFuture<Object> receiveAndConvert() {
297313
return receiveAndConvert(getRequiredQueue());
298314
}
299315

316+
@Override
300317
public CompletableFuture<Object> receiveAndConvert(String queueName) {
301318
return receive(queueName)
302319
.thenApply(this.messageConverter::fromMessage);
@@ -309,6 +326,7 @@ public CompletableFuture<Object> receiveAndConvert(String queueName) {
309326
* @param type the type to covert received result.
310327
* @return the CompletableFuture with a result.
311328
*/
329+
@Override
312330
public <T> CompletableFuture<T> receiveAndConvert(ParameterizedTypeReference<T> type) {
313331
return receiveAndConvert(getRequiredQueue(), type);
314332
}
@@ -322,6 +340,7 @@ public <T> CompletableFuture<T> receiveAndConvert(ParameterizedTypeReference<T>
322340
* @return the CompletableFuture with a result.
323341
*/
324342
@SuppressWarnings("unchecked")
343+
@Override
325344
public <T> CompletableFuture<T> receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) {
326345
SmartMessageConverter smartMessageConverter = getRequiredSmartMessageConverter();
327346
return receive(queueName)

spring-rabbitmq-client/src/test/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpAdminTests.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,6 @@
4343
@ContextConfiguration
4444
public class RabbitAmqpAdminTests extends RabbitAmqpTestBase {
4545

46-
@Autowired
47-
RabbitAmqpTemplate template;
48-
49-
@Autowired
50-
RabbitAmqpAdmin admin;
51-
5246
@Autowired
5347
@Qualifier("ds")
5448
Declarables declarables;

spring-rabbitmq-client/src/test/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTemplateTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@
2626
import org.springframework.amqp.core.Binding;
2727
import org.springframework.amqp.core.BindingBuilder;
2828
import org.springframework.amqp.core.DirectExchange;
29+
import org.springframework.amqp.core.Message;
2930
import org.springframework.amqp.core.Queue;
3031
import org.springframework.beans.factory.annotation.Autowired;
3132
import org.springframework.context.annotation.Bean;
3233
import org.springframework.context.annotation.Configuration;
3334
import org.springframework.test.context.ContextConfiguration;
3435

3536
import static org.assertj.core.api.Assertions.assertThat;
37+
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
3638

3739
/**
3840
* @author Artem Bilan
@@ -58,6 +60,17 @@ void tearDown() {
5860
this.rabbitAmqpTemplate.destroy();
5961
}
6062

63+
@Test
64+
void illegalStateOnNoDefaults() {
65+
assertThatIllegalStateException()
66+
.isThrownBy(() -> this.template.send(new Message(new byte[0])))
67+
.withMessage("For send with defaults, an 'exchange' (and optional 'key') or 'queue' must be provided");
68+
69+
assertThatIllegalStateException()
70+
.isThrownBy(() -> this.template.convertAndSend(new byte[0]))
71+
.withMessage("For send with defaults, an 'exchange' (and optional 'key') or 'queue' must be provided");
72+
}
73+
6174
@Test
6275
void defaultExchangeAndRoutingKey() {
6376
this.rabbitAmqpTemplate.setExchange("e1");

spring-rabbitmq-client/src/test/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTestBase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
2222

2323
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
24+
import org.springframework.beans.factory.annotation.Autowired;
2425
import org.springframework.context.annotation.Bean;
2526
import org.springframework.context.annotation.Configuration;
2627
import org.springframework.test.annotation.DirtiesContext;
@@ -37,6 +38,18 @@
3738
@DirtiesContext
3839
abstract class RabbitAmqpTestBase extends AbstractTestContainerTests {
3940

41+
@Autowired
42+
protected Environment environment;
43+
44+
@Autowired
45+
protected Connection connection;
46+
47+
@Autowired
48+
protected RabbitAmqpAdmin admin;
49+
50+
@Autowired
51+
protected RabbitAmqpTemplate template;
52+
4053
@Configuration
4154
public static class AmqpCommonConfig {
4255

0 commit comments

Comments
 (0)