Skip to content

Commit caaa980

Browse files
committed
Calculate latency based on header
The latency to receive a message was based on a value in the message payload. This latency is then used to conditionally display the metrics on received messages. This worked well as long as the payload contained the time when the message was sent (the default) but it doesn't work anymore when specifying a given payload. Feature #23 introduced content based on a file, so the latency wasn't correctly calculated. Latency is now calculated based on an header value, which is always sent, whatever the MessageBodySource is used. Fixes #50
1 parent 89b9ca9 commit caaa980

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import com.rabbitmq.client.Envelope;
2222
import com.rabbitmq.client.ShutdownSignalException;
2323

24-
import java.io.ByteArrayInputStream;
25-
import java.io.DataInputStream;
2624
import java.io.IOException;
2725
import java.util.Collections;
2826
import java.util.HashMap;
@@ -110,9 +108,14 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
110108
msgCount++;
111109

112110
if (msgLimit == 0 || msgCount <= msgLimit) {
113-
DataInputStream d = new DataInputStream(new ByteArrayInputStream(body));
114-
d.readInt();
115-
long msgNano = d.readLong();
111+
long msgNano;
112+
Object timestamp = properties.getHeaders().get("timestamp");
113+
if (timestamp == null) {
114+
msgNano = Long.MAX_VALUE;
115+
} else {
116+
msgNano = (Long) timestamp;
117+
}
118+
116119
long nano = System.nanoTime();
117120

118121
if (!autoAck) {

src/main/java/com/rabbitmq/perf/Producer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ private void publish(MessageBodySource.MessageBodyAndContentType messageBodyAndC
161161
propertiesBuilder.contentType(messageBodyAndContentType.getContentType());
162162
}
163163

164+
propertiesBuilder.headers(Collections.<String, Object>singletonMap("timestamp", System.nanoTime()));
165+
164166
unconfirmedSet.add(channel.getNextPublishSeqNo());
165167
channel.basicPublish(exchangeName, randomRoutingKey ? UUID.randomUUID().toString() : id,
166168
mandatory, false,

0 commit comments

Comments
 (0)