Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit 8819069

Browse files
committed
Polish queue specification
Closes #44
1 parent fe3b494 commit 8819069

File tree

6 files changed

+175
-112
lines changed

6 files changed

+175
-112
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ ext {
4444
mockitoVersion = '2.23.4'
4545
junitPlatformVersion = '1.3.2'
4646
junitJupiterVersion = '5.3.2'
47+
assertjVersion = '3.11.1'
4748

4849
javadocLinks = ["http://docs.oracle.com/javase/7/docs/api/",
4950
"http://docs.oracle.com/javaee/6/api/",
@@ -127,6 +128,7 @@ configure(allprojects) { project ->
127128

128129
testCompile "io.projectreactor:reactor-test:$reactorCoreVersion"
129130
testCompile "org.mockito:mockito-core:$mockitoVersion"
131+
testCompile "org.assertj:assertj-core:${assertjVersion}"
130132
testRuntime "org.slf4j:jcl-over-slf4j:$slf4jVersion"
131133
testRuntime "org.slf4j:slf4j-api:$slf4jVersion"
132134
testRuntime "ch.qos.logback:logback-classic:$logbackVersion"

src/docs/asciidoc/api-guide.adoc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,18 @@ include::{test-examples}/ApiGuideSender.java[tag=resource-declaration]
112112
Note the `Sender#declare*` methods return their respective AMQP results
113113
wrapped into a `Mono`.
114114

115+
[NOTE]
116+
====
117+
For queue creation, note that if a queue specification has a
118+
null name, the queue to be created will have a server-generated name
119+
and will be non-durable, exclusive, and auto-delete. If you want
120+
a queue to have a server-generated name but other parameters,
121+
specify an empty name `""` and set the parameters accordingly on
122+
the `QueueSpecification` instance. For more information about queues,
123+
see the https://www.rabbitmq.com/queues.html[official documentation].
124+
====
125+
126+
115127
One can also use the `ResourcesSpecification` factory class
116128
with a static import to reduce boilerplate code. Combined with
117129
`Mono` chaining and `Sender#declare` shortcuts, it allows for condensed syntax:

src/docs/asciidoc/new.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,5 @@
1919
* Handle error signal of `connectionMono` subscription to enable proper error handling
2020
* Use Reactor 3.2.3.RELEASE
2121
* Use Java client 5.5.1
22+
23+

src/main/java/reactor/rabbitmq/QueueSpecification.java

Lines changed: 62 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017 Pivotal Software Inc, All Rights Reserved.
2+
* Copyright (c) 2017-2019 Pivotal Software Inc, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,59 +20,19 @@
2020

2121
/**
2222
* Fluent API to specify the creation of a queue.
23+
* <p>
24+
* If a queue specification has a null name, the queue to be created
25+
* will have a server-generated name and will be non-durable, exclusive, and
26+
* auto-delete. To have more control over the properties of a queue with
27+
* a server-generated name, specify a non-null, empty string name, <code>""</code>.
2328
*/
2429
public class QueueSpecification {
2530

26-
static class NullNameQueueSpecification extends QueueSpecification {
27-
28-
NullNameQueueSpecification() {
29-
this.name = null;
30-
this.durable = false;
31-
this.exclusive = true;
32-
this.autoDelete = true;
33-
}
34-
35-
@Override
36-
public QueueSpecification name(String name) {
37-
if (name == null) {
38-
return this;
39-
}
40-
return QueueSpecification.queue(name)
41-
.durable(durable)
42-
.exclusive(exclusive)
43-
.autoDelete(autoDelete);
44-
}
45-
46-
@Override
47-
public QueueSpecification durable(boolean durable) {
48-
if (this.durable != durable) {
49-
throw new IllegalArgumentException("once a queue has null name, durable is always false");
50-
}
51-
return this;
52-
}
53-
54-
@Override
55-
public QueueSpecification exclusive(boolean exclusive) {
56-
if (this.exclusive != exclusive) {
57-
throw new IllegalArgumentException("once a queue has null name, exclusive is always true");
58-
}
59-
return this;
60-
}
61-
62-
@Override
63-
public QueueSpecification autoDelete(boolean autoDelete) {
64-
if (this.autoDelete != autoDelete) {
65-
throw new IllegalArgumentException("once a queue has null name, autoDelete is always true");
66-
}
67-
return this;
68-
}
69-
}
70-
71-
String name;
72-
boolean durable = false;
73-
boolean exclusive = false;
74-
boolean autoDelete = false;
75-
Map<String, Object> arguments;
31+
protected String name;
32+
protected boolean durable = false;
33+
protected boolean exclusive = false;
34+
protected boolean autoDelete = false;
35+
protected Map<String, Object> arguments;
7636

7737
public static QueueSpecification queue() {
7838
return new NullNameQueueSpecification();
@@ -84,7 +44,7 @@ public static QueueSpecification queue(String name) {
8444

8545
public QueueSpecification name(String queue) {
8646
if (queue == null) {
87-
return new NullNameQueueSpecification();
47+
return new NullNameQueueSpecification().arguments(this.arguments);
8848
}
8949

9050
this.name = queue;
@@ -130,4 +90,54 @@ public boolean isAutoDelete() {
13090
public Map<String, Object> getArguments() {
13191
return arguments;
13292
}
93+
94+
/**
95+
* Internal class to handle queues with a null name.
96+
* Those queues always have a server-generated name are non-durable,
97+
* exclusive, and auto-delete.
98+
*/
99+
private static class NullNameQueueSpecification extends QueueSpecification {
100+
101+
NullNameQueueSpecification() {
102+
this.name = null;
103+
this.durable = false;
104+
this.exclusive = true;
105+
this.autoDelete = true;
106+
}
107+
108+
@Override
109+
public QueueSpecification name(String name) {
110+
if (name == null) {
111+
return this;
112+
}
113+
return QueueSpecification.queue(name)
114+
.durable(durable)
115+
.exclusive(exclusive)
116+
.autoDelete(autoDelete);
117+
}
118+
119+
@Override
120+
public QueueSpecification durable(boolean durable) {
121+
if (this.durable != durable) {
122+
throw new IllegalArgumentException("Once a queue has a null name, durable is always false");
123+
}
124+
return this;
125+
}
126+
127+
@Override
128+
public QueueSpecification exclusive(boolean exclusive) {
129+
if (this.exclusive != exclusive) {
130+
throw new IllegalArgumentException("Once a queue has a null name, exclusive is always true");
131+
}
132+
return this;
133+
}
134+
135+
@Override
136+
public QueueSpecification autoDelete(boolean autoDelete) {
137+
if (this.autoDelete != autoDelete) {
138+
throw new IllegalArgumentException("Once a queue has a null name, autoDelete is always true");
139+
}
140+
return this;
141+
}
142+
}
133143
}

src/main/java/reactor/rabbitmq/Sender.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,7 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929
import reactor.core.CoreSubscriber;
30-
import reactor.core.publisher.Flux;
31-
import reactor.core.publisher.FluxOperator;
32-
import reactor.core.publisher.Mono;
33-
import reactor.core.publisher.SignalType;
34-
import reactor.core.publisher.Operators;
30+
import reactor.core.publisher.*;
3531
import reactor.core.scheduler.Scheduler;
3632
import reactor.core.scheduler.Schedulers;
3733

@@ -188,18 +184,50 @@ public RpcClient rpcClient(String exchange, String routingKey, Supplier<String>
188184
return new RpcClient(connectionMono.map(CHANNEL_CREATION_FUNCTION).cache(), exchange, routingKey, correlationIdProvider);
189185
}
190186

187+
/**
188+
* Declare a queue following the specification.
189+
*
190+
* @param specification the specification of the queue
191+
* @return a mono wrapping the result of the declaration
192+
* @see QueueSpecification
193+
*/
191194
public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification specification) {
192195
return this.declareQueue(specification, null);
193196
}
194197

198+
/**
199+
* Declare a queue following the specification and the resource management options.
200+
*
201+
* @param specification the specification of the queue
202+
* @param options options for resource management
203+
* @return a mono wrapping the result of the declaration
204+
* @see QueueSpecification
205+
* @see ResourceManagementOptions
206+
*/
195207
public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification specification, ResourceManagementOptions options) {
196208
return this.declareQueue(specification, options);
197209
}
198210

211+
/**
212+
* Declare a queue following the specification.
213+
*
214+
* @param specification the specification of the queue
215+
* @return a mono wrapping the result of the declaration
216+
* @see QueueSpecification
217+
*/
199218
public Mono<AMQP.Queue.DeclareOk> declareQueue(QueueSpecification specification) {
200219
return this.declareQueue(specification, null);
201220
}
202221

222+
/**
223+
* Declare a queue following the specification and the resource management options.
224+
*
225+
* @param specification the specification of the queue
226+
* @param options options for resource management
227+
* @return a mono wrapping the result of the declaration
228+
* @see QueueSpecification
229+
* @see ResourceManagementOptions
230+
*/
203231
public Mono<AMQP.Queue.DeclareOk> declareQueue(QueueSpecification specification, ResourceManagementOptions options) {
204232
Mono<? extends Channel> channelMono = getChannelMonoForResourceManagement(options);
205233

0 commit comments

Comments
 (0)