Skip to content

Commit 581de31

Browse files
Merge pull request #24 from rabbitmq/rabbitmq-perf-test-23
Provide a way to load message payload from disk
2 parents 396fe62 + 70db754 commit 581de31

File tree

7 files changed

+309
-46
lines changed

7 files changed

+309
-46
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.perf;
17+
18+
import java.io.*;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
22+
/**
23+
*
24+
*/
25+
public class LocalFilesMessageBodySource implements MessageBodySource {
26+
27+
private final List<byte[]> bodies;
28+
29+
private final String contentType;
30+
31+
public LocalFilesMessageBodySource(List<String> filesNames, String contentType) throws IOException {
32+
bodies = new ArrayList<byte[]>(filesNames.size());
33+
for (String fileName : filesNames) {
34+
File file = new File(fileName.trim());
35+
if (!file.exists() || file.isDirectory()) {
36+
throw new IllegalArgumentException(fileName + " isn't a valid body file.");
37+
}
38+
BufferedInputStream inputStream = null;
39+
try {
40+
inputStream = new BufferedInputStream(new FileInputStream(file));
41+
byte [] body = new byte[(int) file.length()];
42+
inputStream.read(body, 0, body.length);
43+
bodies.add(body);
44+
} finally {
45+
if (inputStream != null) {
46+
inputStream.close();
47+
}
48+
}
49+
50+
}
51+
this.contentType = contentType;
52+
}
53+
54+
public LocalFilesMessageBodySource(List<String> filesNames) throws IOException {
55+
this(filesNames, null);
56+
}
57+
58+
@Override
59+
public MessageBodyAndContentType create(int sequenceNumber) throws IOException {
60+
return new MessageBodyAndContentType(
61+
bodies.get(sequenceNumber % bodies.size()), contentType
62+
);
63+
}
64+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.perf;
17+
18+
import java.io.IOException;
19+
20+
/**
21+
* Sources produce message bodies and content type
22+
* used by publishers.
23+
*/
24+
public interface MessageBodySource {
25+
26+
MessageBodyAndContentType create(int sequenceNumber) throws IOException;
27+
28+
class MessageBodyAndContentType {
29+
private final byte [] body;
30+
private final String contentType;
31+
32+
public MessageBodyAndContentType(byte[] body, String contentType) {
33+
this.body = body;
34+
this.contentType = contentType;
35+
}
36+
37+
public byte[] getBody() {
38+
return body;
39+
}
40+
41+
public String getContentType() {
42+
return contentType;
43+
}
44+
}
45+
46+
}

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ public class MulticastParams {
5454
private boolean autoAck = true;
5555
private boolean autoDelete = false;
5656

57+
private List<String> bodyFiles = new ArrayList<String>();
58+
private String bodyContentType = null;
59+
5760
private boolean predeclared;
5861

5962
public void setExchangeType(String exchangeType) {
@@ -201,18 +204,36 @@ public boolean getRandomRoutingKey() {
201204
return randomRoutingKey;
202205
}
203206

207+
public void setBodyFiles(List<String> bodyFiles) {
208+
if (bodyFiles == null) {
209+
this.bodyFiles = new ArrayList<String>();
210+
} else {
211+
this.bodyFiles = new ArrayList<String>(bodyFiles);
212+
}
213+
}
214+
215+
public void setBodyContentType(String bodyContentType) {
216+
this.bodyContentType = bodyContentType;
217+
}
218+
204219
public Producer createProducer(Connection connection, Stats stats, String id) throws IOException {
205220
Channel channel = connection.createChannel();
206221
if (producerTxSize > 0) channel.txSelect();
207222
if (confirm >= 0) channel.confirmSelect();
208223
if (!predeclared || !exchangeExists(connection, exchangeName)) {
209224
channel.exchangeDeclare(exchangeName, exchangeType);
210225
}
226+
MessageBodySource messageBodySource = null;
227+
if (bodyFiles.size() > 0) {
228+
messageBodySource = new LocalFilesMessageBodySource(bodyFiles, bodyContentType);
229+
} else {
230+
messageBodySource = new TimeSequenceMessageBodySource(minMsgSize);
231+
}
211232
final Producer producer = new Producer(channel, exchangeName, id,
212233
randomRoutingKey, flags, producerTxSize,
213234
producerRateLimit, producerMsgCount,
214-
minMsgSize, timeLimit,
215-
confirm, stats);
235+
timeLimit,
236+
confirm, messageBodySource, stats);
216237
channel.addReturnListener(producer);
217238
channel.addConfirmListener(producer);
218239
return producer;

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import com.rabbitmq.client.ConnectionFactory;
3333

34+
import static java.util.Arrays.asList;
3435

3536
public class PerfTest {
3637

@@ -73,6 +74,8 @@ public static void main(String[] args) {
7374
List<?> flags = lstArg(cmd, 'f');
7475
int frameMax = intArg(cmd, 'M', 0);
7576
int heartbeat = intArg(cmd, 'b', 0);
77+
String bodyFiles = strArg(cmd, 'B', null);
78+
String bodyContentType = strArg(cmd, 'T', null);
7679
boolean predeclared = cmd.hasOption('p');
7780

7881
String uri = strArg(cmd, 'h', "amqp://localhost");
@@ -83,7 +86,7 @@ public static void main(String[] args) {
8386
for(int i = 0; i< urisArray.length; i++) {
8487
urisArray[i] = urisArray[i].trim();
8588
}
86-
uris = Arrays.asList(urisArray);
89+
uris = asList(urisArray);
8790
} else {
8891
uris = Collections.singletonList(uri);
8992
}
@@ -124,11 +127,13 @@ public static void main(String[] args) {
124127
p.setProducerChannelCount( producerChannelCount);
125128
p.setProducerMsgCount( producerMsgCount);
126129
p.setProducerTxSize( producerTxSize);
127-
p.setQueueNames( Arrays.asList(queueNames.split(",")));
130+
p.setQueueNames( asList(queueNames.split(",")));
128131
p.setRoutingKey( routingKey);
129132
p.setRandomRoutingKey( randomRoutingKey);
130133
p.setProducerRateLimit( producerRateLimit);
131134
p.setTimeLimit( timeLimit);
135+
p.setBodyFiles( bodyFiles == null ? null : asList(bodyFiles.split(",")));
136+
p.setBodyContentType( bodyContentType);
132137

133138
MulticastSet set = new MulticastSet(stats, factory, p, testID, uris);
134139
set.run(true);
@@ -185,6 +190,9 @@ private static Options getOptions() {
185190
options.addOption(new Option("M", "framemax", true, "frame max"));
186191
options.addOption(new Option("b", "heartbeat", true, "heartbeat interval"));
187192
options.addOption(new Option("p", "predeclared", false,"allow use of predeclared objects"));
193+
options.addOption(new Option("B", "body", true, "comma-separated list of files to use in message bodies"));
194+
options.addOption(new Option("T", "bodyContenType", true, "body content-type"));
195+
188196
return options;
189197
}
190198

@@ -205,7 +213,7 @@ private static List<?> lstArg(CommandLine cmd, char opt) {
205213
if (vals == null) {
206214
vals = new String[] {};
207215
}
208-
return Arrays.asList(vals);
216+
return asList(vals);
209217
}
210218

211219
private static class PrintlnStats extends Stats {

src/main/java/com/rabbitmq/perf/Producer.java

Lines changed: 28 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@
1818
import com.rabbitmq.client.AMQP;
1919
import com.rabbitmq.client.Channel;
2020
import com.rabbitmq.client.ConfirmListener;
21-
import com.rabbitmq.client.MessageProperties;
2221
import com.rabbitmq.client.ReturnListener;
2322

24-
import java.io.ByteArrayOutputStream;
25-
import java.io.DataOutputStream;
2623
import java.io.IOException;
2724
import java.util.Collections;
2825
import java.util.List;
@@ -47,30 +44,30 @@ public class Producer extends ProducerConsumerBase implements Runnable, ReturnLi
4744

4845
private final Stats stats;
4946

50-
private final byte[] message;
47+
private final MessageBodySource messageBodySource;
5148

5249
private Semaphore confirmPool;
5350
private final SortedSet<Long> unconfirmedSet =
5451
Collections.synchronizedSortedSet(new TreeSet<Long>());
5552

5653
public Producer(Channel channel, String exchangeName, String id, boolean randomRoutingKey,
5754
List<?> flags, int txSize,
58-
float rateLimit, int msgLimit, int minMsgSize, int timeLimit,
59-
long confirm, Stats stats)
55+
float rateLimit, int msgLimit, int timeLimit,
56+
long confirm, MessageBodySource messageBodySource, Stats stats)
6057
throws IOException {
6158

62-
this.channel = channel;
63-
this.exchangeName = exchangeName;
64-
this.id = id;
65-
this.randomRoutingKey = randomRoutingKey;
66-
this.mandatory = flags.contains("mandatory");
67-
this.immediate = flags.contains("immediate");
68-
this.persistent = flags.contains("persistent");
69-
this.txSize = txSize;
70-
this.rateLimit = rateLimit;
71-
this.msgLimit = msgLimit;
72-
this.timeLimit = 1000L * timeLimit;
73-
this.message = new byte[minMsgSize];
59+
this.channel = channel;
60+
this.exchangeName = exchangeName;
61+
this.id = id;
62+
this.randomRoutingKey = randomRoutingKey;
63+
this.mandatory = flags.contains("mandatory");
64+
this.immediate = flags.contains("immediate");
65+
this.persistent = flags.contains("persistent");
66+
this.txSize = txSize;
67+
this.rateLimit = rateLimit;
68+
this.msgLimit = msgLimit;
69+
this.timeLimit = 1000L * timeLimit;
70+
this.messageBodySource = messageBodySource;
7471
if (confirm > 0) {
7572
this.confirmPool = new Semaphore((int)confirm);
7673
}
@@ -136,7 +133,7 @@ public void run() {
136133
if (confirmPool != null) {
137134
confirmPool.acquire();
138135
}
139-
publish(createMessage(totalMsgCount));
136+
publish(messageBodySource.create(totalMsgCount));
140137
totalMsgCount++;
141138
msgCount++;
142139

@@ -154,33 +151,23 @@ public void run() {
154151
}
155152
}
156153

157-
private void publish(byte[] msg)
154+
private void publish(MessageBodySource.MessageBodyAndContentType messageBodyAndContentType)
158155
throws IOException {
159156

157+
AMQP.BasicProperties.Builder propertiesBuilder = new AMQP.BasicProperties.Builder();
158+
if (persistent) {
159+
propertiesBuilder.deliveryMode(2);
160+
}
161+
162+
if (messageBodyAndContentType.getContentType() != null) {
163+
propertiesBuilder.contentType(messageBodyAndContentType.getContentType());
164+
}
165+
160166
unconfirmedSet.add(channel.getNextPublishSeqNo());
161167
channel.basicPublish(exchangeName, randomRoutingKey ? UUID.randomUUID().toString() : id,
162168
mandatory, immediate,
163-
persistent ? MessageProperties.MINIMAL_PERSISTENT_BASIC : MessageProperties.MINIMAL_BASIC,
164-
msg);
165-
}
166-
167-
private byte[] createMessage(int sequenceNumber)
168-
throws IOException {
169-
170-
ByteArrayOutputStream acc = new ByteArrayOutputStream();
171-
DataOutputStream d = new DataOutputStream(acc);
172-
long nano = System.nanoTime();
173-
d.writeInt(sequenceNumber);
174-
d.writeLong(nano);
175-
d.flush();
176-
acc.flush();
177-
byte[] m = acc.toByteArray();
178-
if (m.length <= message.length) {
179-
System.arraycopy(m, 0, message, 0, m.length);
180-
return message;
181-
} else {
182-
return m;
183-
}
169+
propertiesBuilder.build(),
170+
messageBodyAndContentType.getBody());
184171
}
185172

186173
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.perf;
17+
18+
import java.io.ByteArrayOutputStream;
19+
import java.io.DataOutputStream;
20+
import java.io.IOException;
21+
22+
/**
23+
*
24+
*/
25+
public class TimeSequenceMessageBodySource implements MessageBodySource {
26+
27+
private final byte[] message;
28+
29+
public TimeSequenceMessageBodySource(int minMsgSize) {
30+
this.message = new byte[minMsgSize];
31+
}
32+
33+
@Override
34+
public MessageBodyAndContentType create(int sequenceNumber) throws IOException {
35+
ByteArrayOutputStream acc = new ByteArrayOutputStream();
36+
DataOutputStream d = new DataOutputStream(acc);
37+
long nano = System.nanoTime();
38+
d.writeInt(sequenceNumber);
39+
d.writeLong(nano);
40+
d.flush();
41+
acc.flush();
42+
byte[] m = acc.toByteArray();
43+
if (m.length <= message.length) {
44+
System.arraycopy(m, 0, message, 0, m.length);
45+
return new MessageBodyAndContentType(message, null);
46+
} else {
47+
return new MessageBodyAndContentType(m, null);
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)