Skip to content

Commit 4e3f65a

Browse files
committed
Support priority flag
Priority for sent messages can be specified with the -flag option, e.g.: --flag priority=10 If other flags need to be supported, use --flag several times, e.g.: --flag priority=10 --flag persistent Fixes #62
1 parent 21f41a2 commit 4e3f65a

File tree

4 files changed

+169
-9
lines changed

4 files changed

+169
-9
lines changed

pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@
167167
<scope>test</scope>
168168
</dependency>
169169

170-
171170
<dependency>
172171
<groupId>ch.qos.logback</groupId>
173172
<artifactId>logback-classic</artifactId>

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ private static boolean boolArg(CommandLine cmd, String opt, boolean def) {
364364
}
365365

366366
private static List<?> lstArg(CommandLine cmd, char opt) {
367-
String[] vals = cmd.getOptionValues('f');
367+
String[] vals = cmd.getOptionValues(opt);
368368
if (vals == null) {
369369
vals = new String[] {};
370370
}

src/main/java/com/rabbitmq/perf/Producer.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import java.util.concurrent.Semaphore;
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.concurrent.atomic.AtomicBoolean;
32-
import java.util.function.UnaryOperator;
32+
import java.util.function.Function;
3333

3434
public class Producer extends ProducerConsumerBase implements Runnable, ReturnListener,
3535
ConfirmListener
@@ -49,7 +49,7 @@ public class Producer extends ProducerConsumerBase implements Runnable, ReturnLi
4949

5050
private final MessageBodySource messageBodySource;
5151

52-
private final UnaryOperator<AMQP.BasicProperties.Builder> propertiesBuilderProcessor;
52+
private final Function<AMQP.BasicProperties.Builder, AMQP.BasicProperties.Builder> propertiesBuilderProcessor;
5353
private Semaphore confirmPool;
5454
private int confirmTimeout;
5555
private final SortedSet<Long> unconfirmedSet =
@@ -70,24 +70,36 @@ public Producer(Channel channel, String exchangeName, String id, boolean randomR
7070
this.randomRoutingKey = randomRoutingKey;
7171
this.mandatory = flags.contains("mandatory");
7272
this.persistent = flags.contains("persistent");
73+
74+
Function<AMQP.BasicProperties.Builder, AMQP.BasicProperties.Builder> builderProcessor = Function.identity();
75+
for (Object flag : flags) {
76+
if (flag != null && flag.toString().startsWith("priority=")) {
77+
final Integer priority = Integer.valueOf(flag.toString().substring(
78+
flag.toString().indexOf("=") + 1
79+
));
80+
builderProcessor = builderProcessor.andThen(builder -> {
81+
builder.priority(priority);
82+
return builder;
83+
});
84+
}
85+
}
7386
this.txSize = txSize;
7487
this.rateLimit = rateLimit;
7588
this.msgLimit = msgLimit;
7689
this.messageBodySource = messageBodySource;
7790
if (tsp.isTimestampInHeader()) {
78-
this.propertiesBuilderProcessor = builder -> {
79-
builder.headers(Collections.<String, Object>singletonMap(TIMESTAMP_HEADER, tsp.getCurrentTime()));
91+
builderProcessor = builderProcessor.andThen(builder -> {
92+
builder.headers(Collections.singletonMap(TIMESTAMP_HEADER, tsp.getCurrentTime()));
8093
return builder;
81-
};
82-
} else {
83-
this.propertiesBuilderProcessor = UnaryOperator.identity();
94+
});
8495
}
8596
if (confirm > 0) {
8697
this.confirmPool = new Semaphore((int)confirm);
8798
this.confirmTimeout = confirmTimeout;
8899
}
89100
this.stats = stats;
90101
this.completionHandler = completionHandler;
102+
this.propertiesBuilderProcessor = builderProcessor;
91103
}
92104

93105
public void handleReturn(int replyCode,
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.perf;
17+
18+
import com.rabbitmq.client.AMQP.BasicProperties;
19+
import com.rabbitmq.client.Channel;
20+
import org.junit.Before;
21+
import org.junit.Test;
22+
import org.mockito.ArgumentCaptor;
23+
import org.mockito.Captor;
24+
import org.mockito.Mock;
25+
import org.mockito.MockitoAnnotations;
26+
27+
import static java.util.Arrays.asList;
28+
import static org.hamcrest.Matchers.is;
29+
import static org.hamcrest.Matchers.nullValue;
30+
import static org.junit.Assert.assertThat;
31+
import static org.mockito.ArgumentMatchers.any;
32+
import static org.mockito.ArgumentMatchers.anyString;
33+
import static org.mockito.ArgumentMatchers.eq;
34+
import static org.mockito.Mockito.verify;
35+
36+
public class ProducerTest {
37+
38+
@Mock
39+
Channel channel;
40+
41+
@Captor
42+
private ArgumentCaptor<BasicProperties> propertiesCaptor;
43+
44+
@Before
45+
public void init() {
46+
MockitoAnnotations.initMocks(this);
47+
}
48+
49+
@Test
50+
public void flagNone() throws Exception {
51+
flagProducer().run();
52+
53+
verify(channel).basicPublish(anyString(), anyString(),
54+
eq(false), eq(false), propertiesCaptor.capture(),
55+
any(byte[].class)
56+
);
57+
58+
assertThat(props().getDeliveryMode(), nullValue());
59+
assertThat(props().getPriority(), nullValue());
60+
}
61+
62+
@Test
63+
public void flagPersistent() throws Exception {
64+
flagProducer("persistent").run();
65+
66+
verify(channel).basicPublish(anyString(), anyString(),
67+
eq(false), eq(false), propertiesCaptor.capture(),
68+
any(byte[].class)
69+
);
70+
71+
assertThat(props().getDeliveryMode(), is(2));
72+
assertThat(props().getPriority(), nullValue());
73+
}
74+
75+
@Test
76+
public void flagMandatory() throws Exception {
77+
flagProducer("mandatory").run();
78+
79+
verify(channel).basicPublish(anyString(), anyString(),
80+
eq(true), eq(false), propertiesCaptor.capture(),
81+
any(byte[].class)
82+
);
83+
84+
assertThat(props().getDeliveryMode(), nullValue());
85+
assertThat(props().getPriority(), nullValue());
86+
}
87+
88+
@Test
89+
public void flagPriority() throws Exception {
90+
flagProducer("priority=10").run();
91+
92+
verify(channel).basicPublish(anyString(), anyString(),
93+
eq(false), eq(false), propertiesCaptor.capture(),
94+
any(byte[].class)
95+
);
96+
97+
assertThat(props().getDeliveryMode(), nullValue());
98+
assertThat(props().getPriority(), is(10));
99+
}
100+
101+
@Test
102+
public void flagPersistentMandatoryPriority() throws Exception {
103+
flagProducer("persistent", "mandatory", "priority=10").run();
104+
105+
verify(channel).basicPublish(anyString(), anyString(),
106+
eq(true), eq(false), propertiesCaptor.capture(),
107+
any(byte[].class)
108+
);
109+
110+
assertThat(props().getDeliveryMode(), is(2));
111+
assertThat(props().getPriority(), is(10));
112+
}
113+
114+
Producer flagProducer(String... flags) {
115+
return new Producer(
116+
channel, "exchange", "id", false,
117+
asList(flags),
118+
0, 0.0f, 1,
119+
-1, 30,
120+
new TimeSequenceMessageBodySource(new TimestampProvider(false, false), 1000),
121+
new TimestampProvider(false, false),
122+
stats(),
123+
new MulticastSet.CompletionHandler() {
124+
125+
@Override
126+
public void waitForCompletion() {
127+
}
128+
129+
@Override
130+
public void countDown() {
131+
}
132+
}
133+
);
134+
}
135+
136+
BasicProperties props() {
137+
return propertiesCaptor.getValue();
138+
}
139+
140+
private Stats stats() {
141+
return new Stats(0) {
142+
143+
@Override
144+
protected void report(long now) {
145+
146+
}
147+
};
148+
}
149+
}

0 commit comments

Comments
 (0)