Skip to content

Commit b1b45d4

Browse files
committed
Add --message-properties option
Some examples: --message-properties deliveryMode=2,timestamp=2007-12-03T10:15:30+01:00 --message-properties priority=10,header1=value1,header2=value2 Non-property keys are used as headers. Supported keys: contentType, contentEncoding, headers, deliveryMode, priority, correlationId, replyTo, expiration, messageId, timestamp, type, userId, appId, clusterId. [#153995213] Fixes #62
1 parent 4e3f65a commit b1b45d4

File tree

5 files changed

+328
-39
lines changed

5 files changed

+328
-39
lines changed

src/main/java/com/rabbitmq/perf/LocalFilesMessageBodySource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class LocalFilesMessageBodySource implements MessageBodySource {
2929
private final String contentType;
3030

3131
public LocalFilesMessageBodySource(List<String> filesNames, String contentType) throws IOException {
32-
bodies = new ArrayList<byte[]>(filesNames.size());
32+
bodies = new ArrayList<>(filesNames.size());
3333
for (String fileName : filesNames) {
3434
File file = new File(fileName.trim());
3535
if (!file.exists() || file.isDirectory()) {
@@ -56,7 +56,7 @@ public LocalFilesMessageBodySource(List<String> filesNames) throws IOException {
5656
}
5757

5858
@Override
59-
public MessageBodyAndContentType create(int sequenceNumber) throws IOException {
59+
public MessageBodyAndContentType create(int sequenceNumber) {
6060
return new MessageBodyAndContentType(
6161
bodies.get(sequenceNumber % bodies.size()), contentType
6262
);

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public class MulticastParams {
7474
private int queueSequenceFrom = -1;
7575
private int queueSequenceTo = -1;
7676

77+
private Map<String, Object> messageProperties = null;
78+
7779
private TopologyHandler topologyHandler;
7880

7981
private int heartbeatSenderThreads = -1;
@@ -211,6 +213,10 @@ public void setConsumerLatencyInMicroseconds(int consumerLatencyInMicroseconds)
211213
this.consumerLatencyInMicroseconds = consumerLatencyInMicroseconds;
212214
}
213215

216+
public void setMessageProperties(Map<String, Object> messageProperties) {
217+
this.messageProperties = messageProperties;
218+
}
219+
214220
public int getConsumerCount() {
215221
return consumerCount;
216222
}
@@ -331,7 +337,7 @@ public Producer createProducer(Connection connection, Stats stats, MulticastSet.
331337
randomRoutingKey, flags, producerTxSize,
332338
producerRateLimit, producerMsgCount,
333339
confirm, confirmTimeout, messageBodySource,
334-
tsp, stats, completionHandler);
340+
tsp, stats, messageProperties, completionHandler);
335341
channel.addReturnListener(producer);
336342
channel.addConfirmListener(producer);
337343
this.topologyHandler.next();

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
106106
String queueArgs = strArg(cmd, "qa", null);
107107
int consumerLatencyInMicroseconds = intArg(cmd, 'L', 0);
108108
int heartbeatSenderThreads = intArg(cmd, "hst", -1);
109+
String messageProperties = strArg(cmd, "mp", null);
109110

110111
String uri = strArg(cmd, 'h', "amqp://localhost");
111112
String urisParameter = strArg(cmd, 'H', null);
@@ -202,12 +203,13 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
202203
p.setUseMillis( useMillis);
203204
p.setBodyFiles( bodyFiles == null ? null : asList(bodyFiles.split(",")));
204205
p.setBodyContentType( bodyContentType);
205-
p.setQueueArguments(queueArguments(queueArgs));
206+
p.setQueueArguments(convertKeyValuePairs(queueArgs));
206207
p.setConsumerLatencyInMicroseconds(consumerLatencyInMicroseconds);
207208
p.setQueuePattern(queuePattern);
208209
p.setQueueSequenceFrom(from);
209210
p.setQueueSequenceTo(to);
210211
p.setHeartbeatSenderThreads(heartbeatSenderThreads);
212+
p.setMessageProperties(convertKeyValuePairs(messageProperties));
211213

212214
MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p);
213215

@@ -336,6 +338,7 @@ private static Options getOptions() {
336338
options.addOption(new Option("F", "queue-pattern-from", true, "sequence start for queue pattern (included)"));
337339
options.addOption(new Option("T", "queue-pattern-to", true, "sequence end for queue pattern (included)"));
338340
options.addOption(new Option("hst", "heartbeat-sender-threads", true, "number of threads for producers and consumers heartbeat senders"));
341+
options.addOption(new Option("mp", "message-properties", true, "message properties as key/pair values, separated by commas"));
339342
return options;
340343
}
341344

@@ -371,20 +374,20 @@ private static List<?> lstArg(CommandLine cmd, char opt) {
371374
return asList(vals);
372375
}
373376

374-
private static Map<String, Object> queueArguments(String arg) {
377+
private static Map<String, Object> convertKeyValuePairs(String arg) {
375378
if (arg == null || arg.trim().isEmpty()) {
376379
return null;
377380
}
378-
Map<String, Object> queueArguments = new HashMap<>();
381+
Map<String, Object> properties = new HashMap<>();
379382
for (String entry : arg.split(",")) {
380383
String [] keyValue = entry.split("=");
381384
try {
382-
queueArguments.put(keyValue[0], Long.parseLong(keyValue[1]));
385+
properties.put(keyValue[0], Long.parseLong(keyValue[1]));
383386
} catch(NumberFormatException e) {
384-
queueArguments.put(keyValue[0], keyValue[1]);
387+
properties.put(keyValue[0], keyValue[1]);
385388
}
386389
}
387-
return queueArguments;
390+
return properties;
388391
}
389392

390393
private static String getExchangeName(CommandLine cmd, String def) {

src/main/java/com/rabbitmq/perf/Producer.java

Lines changed: 115 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,22 @@
2121
import com.rabbitmq.client.ReturnListener;
2222

2323
import java.io.IOException;
24+
import java.time.OffsetDateTime;
25+
import java.util.Arrays;
26+
import java.util.Collection;
2427
import java.util.Collections;
28+
import java.util.Date;
29+
import java.util.HashMap;
2530
import java.util.List;
31+
import java.util.Map;
2632
import java.util.SortedSet;
2733
import java.util.TreeSet;
2834
import java.util.UUID;
2935
import java.util.concurrent.Semaphore;
3036
import java.util.concurrent.TimeUnit;
3137
import java.util.concurrent.atomic.AtomicBoolean;
3238
import java.util.function.Function;
39+
import static java.util.stream.Collectors.toMap;
3340

3441
public class Producer extends ProducerConsumerBase implements Runnable, ReturnListener,
3542
ConfirmListener
@@ -63,7 +70,8 @@ public Producer(Channel channel, String exchangeName, String id, boolean randomR
6370
float rateLimit, int msgLimit,
6471
long confirm, int confirmTimeout,
6572
MessageBodySource messageBodySource,
66-
TimestampProvider tsp, Stats stats, MulticastSet.CompletionHandler completionHandler) {
73+
TimestampProvider tsp, Stats stats, Map<String, Object> messageProperties,
74+
MulticastSet.CompletionHandler completionHandler) {
6775
this.channel = channel;
6876
this.exchangeName = exchangeName;
6977
this.id = id;
@@ -72,26 +80,15 @@ public Producer(Channel channel, String exchangeName, String id, boolean randomR
7280
this.persistent = flags.contains("persistent");
7381

7482
Function<AMQP.BasicProperties.Builder, AMQP.BasicProperties.Builder> builderProcessor = Function.identity();
75-
for (Object flag : flags) {
76-
if (flag != null && flag.toString().startsWith("priority=")) {
77-
final Integer priority = Integer.valueOf(flag.toString().substring(
78-
flag.toString().indexOf("=") + 1
79-
));
80-
builderProcessor = builderProcessor.andThen(builder -> {
81-
builder.priority(priority);
82-
return builder;
83-
});
84-
}
85-
}
8683
this.txSize = txSize;
8784
this.rateLimit = rateLimit;
8885
this.msgLimit = msgLimit;
8986
this.messageBodySource = messageBodySource;
9087
if (tsp.isTimestampInHeader()) {
91-
builderProcessor = builderProcessor.andThen(builder -> {
92-
builder.headers(Collections.singletonMap(TIMESTAMP_HEADER, tsp.getCurrentTime()));
93-
return builder;
94-
});
88+
builderProcessor = builderProcessor.andThen(builder -> builder.headers(Collections.singletonMap(TIMESTAMP_HEADER, tsp.getCurrentTime())));
89+
}
90+
if (messageProperties != null && !messageProperties.isEmpty()) {
91+
builderProcessor = builderProcessorWithMessageProperties(messageProperties, builderProcessor);
9592
}
9693
if (confirm > 0) {
9794
this.confirmPool = new Semaphore((int)confirm);
@@ -102,6 +99,108 @@ public Producer(Channel channel, String exchangeName, String id, boolean randomR
10299
this.propertiesBuilderProcessor = builderProcessor;
103100
}
104101

102+
private Function<AMQP.BasicProperties.Builder, AMQP.BasicProperties.Builder> builderProcessorWithMessageProperties(
103+
Map<String, Object> messageProperties,
104+
Function<AMQP.BasicProperties.Builder, AMQP.BasicProperties.Builder> builderProcessor) {
105+
if (messageProperties.containsKey("contentType")) {
106+
String value = messageProperties.get("contentType").toString();
107+
builderProcessor = builderProcessor.andThen(builder -> builder.contentType(value));
108+
}
109+
if (messageProperties.containsKey("contentEncoding")) {
110+
String value = messageProperties.get("contentEncoding").toString();
111+
builderProcessor = builderProcessor.andThen(builder -> builder.contentEncoding(value));
112+
}
113+
if (messageProperties.containsKey("deliveryMode")) {
114+
Integer value = ((Number) messageProperties.get("deliveryMode")).intValue();
115+
builderProcessor = builderProcessor.andThen(builder -> builder.deliveryMode(value));
116+
}
117+
if (messageProperties.containsKey("priority")) {
118+
Integer value = ((Number) messageProperties.get("priority")).intValue();
119+
builderProcessor = builderProcessor.andThen(builder -> builder.priority(value));
120+
}
121+
if (messageProperties.containsKey("correlationId")) {
122+
String value = messageProperties.get("correlationId").toString();
123+
builderProcessor = builderProcessor.andThen(builder -> builder.correlationId(value));
124+
}
125+
if (messageProperties.containsKey("replyTo")) {
126+
String value = messageProperties.get("replyTo").toString();
127+
builderProcessor = builderProcessor.andThen(builder -> builder.replyTo(value));
128+
}
129+
if (messageProperties.containsKey("expiration")) {
130+
String value = messageProperties.get("expiration").toString();
131+
builderProcessor = builderProcessor.andThen(builder -> builder.expiration(value));
132+
}
133+
if (messageProperties.containsKey("messageId")) {
134+
String value = messageProperties.get("messageId").toString();
135+
builderProcessor = builderProcessor.andThen(builder -> builder.messageId(value));
136+
}
137+
if (messageProperties.containsKey("timestamp")) {
138+
String value = messageProperties.get("timestamp").toString();
139+
Date timestamp = Date.from(OffsetDateTime.parse(value).toInstant());
140+
builderProcessor = builderProcessor.andThen(builder -> builder.timestamp(timestamp));
141+
}
142+
if (messageProperties.containsKey("type")) {
143+
String value = messageProperties.get("type").toString();
144+
builderProcessor = builderProcessor.andThen(builder -> builder.type(value));
145+
}
146+
if (messageProperties.containsKey("userId")) {
147+
String value = messageProperties.get("userId").toString();
148+
builderProcessor = builderProcessor.andThen(builder -> builder.userId(value));
149+
}
150+
if (messageProperties.containsKey("appId")) {
151+
String value = messageProperties.get("appId").toString();
152+
builderProcessor = builderProcessor.andThen(builder -> builder.appId(value));
153+
}
154+
if (messageProperties.containsKey("clusterId")) {
155+
String value = messageProperties.get("clusterId").toString();
156+
builderProcessor = builderProcessor.andThen(builder -> builder.clusterId(value));
157+
}
158+
159+
final Map<String, Object> headers = messageProperties.entrySet().stream()
160+
.filter(entry -> !isPropertyKey(entry.getKey()))
161+
.collect(toMap(e -> e.getKey(), e -> e.getValue()));
162+
163+
if (!headers.isEmpty()) {
164+
builderProcessor = builderProcessor.andThen(builder -> {
165+
// we merge if there are already some headers
166+
AMQP.BasicProperties properties = builder.build();
167+
Map<String, Object> existingHeaders = properties.getHeaders();
168+
if (existingHeaders != null && !existingHeaders.isEmpty()) {
169+
Map<String, Object> newHeaders = new HashMap<>();
170+
newHeaders.putAll(existingHeaders);
171+
newHeaders.putAll(headers);
172+
builder = builder.headers(newHeaders);
173+
} else {
174+
builder = builder.headers(headers);
175+
}
176+
return builder;
177+
});
178+
}
179+
180+
return builderProcessor;
181+
}
182+
183+
private static final Collection<String> MESSAGE_PROPERTIES_KEYS = Arrays.asList(
184+
"contentType",
185+
"contentEncoding",
186+
"headers",
187+
"deliveryMode",
188+
"priority",
189+
"correlationId",
190+
"replyTo",
191+
"expiration",
192+
"messageId",
193+
"timestamp",
194+
"type",
195+
"userId",
196+
"appId",
197+
"clusterId"
198+
);
199+
200+
private boolean isPropertyKey(String key) {
201+
return MESSAGE_PROPERTIES_KEYS.contains(key);
202+
}
203+
105204
public void handleReturn(int replyCode,
106205
String replyText,
107206
String exchange,

0 commit comments

Comments
 (0)