Skip to content

Commit 5e3df28

Browse files
committed
Merge branch '1.x.x-stable'
2 parents 68e4045 + 404846d commit 5e3df28

File tree

3 files changed

+75
-13
lines changed

3 files changed

+75
-13
lines changed

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515

1616
package com.rabbitmq.perf;
1717

18+
import com.rabbitmq.client.AMQP;
1819
import com.rabbitmq.client.AMQP.BasicProperties;
1920
import com.rabbitmq.client.Channel;
2021
import com.rabbitmq.client.DefaultConsumer;
2122
import com.rabbitmq.client.Envelope;
2223
import com.rabbitmq.client.ShutdownSignalException;
2324

25+
import java.io.ByteArrayInputStream;
26+
import java.io.DataInputStream;
2427
import java.io.IOException;
2528
import java.util.Collections;
2629
import java.util.HashMap;
@@ -44,11 +47,12 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
4447
private final CountDownLatch latch = new CountDownLatch(1);
4548
private final Map<String, String> consumerTagBranchMap = Collections.synchronizedMap(new HashMap<String, String>());
4649
private final ConsumerLatency consumerLatency;
50+
private final TimestampExtractor timestampExtractor;
4751

4852
public Consumer(Channel channel, String id,
4953
List<String> queueNames, int txSize, boolean autoAck,
50-
int multiAckEvery, Stats stats, float rateLimit, int msgLimit, int timeLimit,
51-
int consumerLatencyInMicroSeconds) {
54+
int multiAckEvery, Stats stats, float rateLimit, int msgLimit, final int timeLimit,
55+
int consumerLatencyInMicroSeconds, boolean extractTimestampFromHeader) {
5256

5357
this.channel = channel;
5458
this.id = id;
@@ -67,6 +71,24 @@ public Consumer(Channel channel, String id,
6771
} else {
6872
this.consumerLatency = new BusyWaitConsumerLatency(consumerLatencyInMicroSeconds * 1000);
6973
}
74+
if (extractTimestampFromHeader) {
75+
this.timestampExtractor = new TimestampExtractor() {
76+
@Override
77+
public long extract(BasicProperties properties, byte[] body) throws IOException {
78+
Object timestamp = properties.getHeaders().get(Producer.TIMESTAMP_HEADER);
79+
return timestamp == null ? Long.MAX_VALUE : (Long) timestamp;
80+
}
81+
};
82+
} else {
83+
this.timestampExtractor = new TimestampExtractor() {
84+
@Override
85+
public long extract(BasicProperties properties, byte[] body) throws IOException {
86+
DataInputStream d = new DataInputStream(new ByteArrayInputStream(body));
87+
d.readInt();
88+
return d.readLong();
89+
}
90+
};
91+
}
7092
}
7193

7294
public void run() {
@@ -108,13 +130,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
108130
msgCount++;
109131

110132
if (msgLimit == 0 || msgCount <= msgLimit) {
111-
long msgNano;
112-
Object timestamp = properties.getHeaders().get("timestamp");
113-
if (timestamp == null) {
114-
msgNano = Long.MAX_VALUE;
115-
} else {
116-
msgNano = (Long) timestamp;
117-
}
133+
long msgNano = timestampExtractor.extract(properties, body);
134+
118135

119136
long nano = System.nanoTime();
120137

@@ -209,4 +226,10 @@ public void simulateLatency() {
209226
while(System.nanoTime() - start < delay);
210227
}
211228
}
229+
230+
private interface TimestampExtractor {
231+
232+
long extract(BasicProperties properties, byte[] body) throws IOException;
233+
234+
}
212235
}

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,16 +260,20 @@ public Producer createProducer(Connection connection, Stats stats, String id) th
260260
channel.exchangeDeclare(exchangeName, exchangeType);
261261
}
262262
MessageBodySource messageBodySource = null;
263+
boolean timestampInHeader;
263264
if (bodyFiles.size() > 0) {
264265
messageBodySource = new LocalFilesMessageBodySource(bodyFiles, bodyContentType);
266+
timestampInHeader = true;
265267
} else {
266268
messageBodySource = new TimeSequenceMessageBodySource(minMsgSize);
269+
timestampInHeader = false;
267270
}
268271
final Producer producer = new Producer(channel, exchangeName, id,
269272
randomRoutingKey, flags, producerTxSize,
270273
producerRateLimit, producerMsgCount,
271274
timeLimit,
272-
confirm, confirmTimeout, messageBodySource, stats);
275+
confirm, confirmTimeout, messageBodySource,
276+
timestampInHeader, stats);
273277
channel.addReturnListener(producer);
274278
channel.addConfirmListener(producer);
275279
return producer;
@@ -281,9 +285,18 @@ public Consumer createConsumer(Connection connection, Stats stats, String id) th
281285
List<String> generatedQueueNames = configureQueues(connection, id);
282286
if (consumerPrefetch > 0) channel.basicQos(consumerPrefetch);
283287
if (channelPrefetch > 0) channel.basicQos(channelPrefetch, true);
288+
289+
boolean timestampInHeader;
290+
if (bodyFiles.size() > 0) {
291+
timestampInHeader = true;
292+
} else {
293+
timestampInHeader = false;
294+
}
295+
284296
return new Consumer(channel, id, generatedQueueNames,
285297
consumerTxSize, autoAck, multiAckEvery,
286-
stats, consumerRateLimit, consumerMsgCount, timeLimit, consumerLatencyInMicroseconds);
298+
stats, consumerRateLimit, consumerMsgCount, timeLimit,
299+
consumerLatencyInMicroseconds, timestampInHeader);
287300
}
288301

289302
public boolean shouldConfigureQueues() {

src/main/java/com/rabbitmq/perf/Producer.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
public class Producer extends ProducerConsumerBase implements Runnable, ReturnListener,
3333
ConfirmListener
3434
{
35+
36+
public static final String TIMESTAMP_HEADER = "timestamp";
3537
private final Channel channel;
3638
private final String exchangeName;
3739
private final String id;
@@ -46,6 +48,7 @@ public class Producer extends ProducerConsumerBase implements Runnable, ReturnLi
4648

4749
private final MessageBodySource messageBodySource;
4850

51+
private final PropertiesBuilderProcessor propertiesBuilderProcessor;
4952
private Semaphore confirmPool;
5053
private int confirmTimeout;
5154
private final SortedSet<Long> unconfirmedSet =
@@ -54,7 +57,8 @@ public class Producer extends ProducerConsumerBase implements Runnable, ReturnLi
5457
public Producer(Channel channel, String exchangeName, String id, boolean randomRoutingKey,
5558
List<?> flags, int txSize,
5659
float rateLimit, int msgLimit, int timeLimit,
57-
long confirm, int confirmTimeout, MessageBodySource messageBodySource, Stats stats)
60+
long confirm, int confirmTimeout, MessageBodySource messageBodySource,
61+
boolean timestampInHeader, Stats stats)
5862
throws IOException {
5963

6064
this.channel = channel;
@@ -68,6 +72,22 @@ public Producer(Channel channel, String exchangeName, String id, boolean randomR
6872
this.msgLimit = msgLimit;
6973
this.timeLimit = 1000L * timeLimit;
7074
this.messageBodySource = messageBodySource;
75+
if (timestampInHeader) {
76+
this.propertiesBuilderProcessor = new PropertiesBuilderProcessor() {
77+
@Override
78+
public AMQP.BasicProperties.Builder process(AMQP.BasicProperties.Builder builder) {
79+
builder.headers(Collections.<String, Object>singletonMap(TIMESTAMP_HEADER, System.nanoTime()));
80+
return builder;
81+
}
82+
};
83+
} else {
84+
this.propertiesBuilderProcessor = new PropertiesBuilderProcessor() {
85+
@Override
86+
public AMQP.BasicProperties.Builder process(AMQP.BasicProperties.Builder builder) {
87+
return builder;
88+
}
89+
};
90+
}
7191
if (confirm > 0) {
7292
this.confirmPool = new Semaphore((int)confirm);
7393
this.confirmTimeout = confirmTimeout;
@@ -173,7 +193,7 @@ private void publish(MessageBodySource.MessageBodyAndContentType messageBodyAndC
173193
propertiesBuilder.contentType(messageBodyAndContentType.getContentType());
174194
}
175195

176-
propertiesBuilder.headers(Collections.<String, Object>singletonMap("timestamp", System.nanoTime()));
196+
propertiesBuilder = this.propertiesBuilderProcessor.process(propertiesBuilder);
177197

178198
unconfirmedSet.add(channel.getNextPublishSeqNo());
179199
channel.basicPublish(exchangeName, randomRoutingKey ? UUID.randomUUID().toString() : id,
@@ -182,4 +202,10 @@ private void publish(MessageBodySource.MessageBodyAndContentType messageBodyAndC
182202
messageBodyAndContentType.getBody());
183203
}
184204

205+
private interface PropertiesBuilderProcessor {
206+
207+
AMQP.BasicProperties.Builder process(AMQP.BasicProperties.Builder builder);
208+
209+
}
210+
185211
}

0 commit comments

Comments
 (0)