1515
1616package com .rabbitmq .perf ;
1717
18+ import com .rabbitmq .client .AMQP ;
1819import com .rabbitmq .client .AMQP .BasicProperties ;
1920import com .rabbitmq .client .Channel ;
2021import com .rabbitmq .client .DefaultConsumer ;
2122import com .rabbitmq .client .Envelope ;
2223import com .rabbitmq .client .ShutdownSignalException ;
2324
25+ import java .io .ByteArrayInputStream ;
26+ import java .io .DataInputStream ;
2427import java .io .IOException ;
2528import java .util .Collections ;
2629import java .util .HashMap ;
@@ -44,11 +47,12 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
4447 private final CountDownLatch latch = new CountDownLatch (1 );
4548 private final Map <String , String > consumerTagBranchMap = Collections .synchronizedMap (new HashMap <String , String >());
4649 private final ConsumerLatency consumerLatency ;
50+ private final TimestampExtractor timestampExtractor ;
4751
4852 public Consumer (Channel channel , String id ,
4953 List <String > queueNames , int txSize , boolean autoAck ,
50- int multiAckEvery , Stats stats , float rateLimit , int msgLimit , int timeLimit ,
51- int consumerLatencyInMicroSeconds ) {
54+ int multiAckEvery , Stats stats , float rateLimit , int msgLimit , final int timeLimit ,
55+ int consumerLatencyInMicroSeconds , boolean extractTimestampFromHeader ) {
5256
5357 this .channel = channel ;
5458 this .id = id ;
@@ -67,6 +71,24 @@ public Consumer(Channel channel, String id,
6771 } else {
6872 this .consumerLatency = new BusyWaitConsumerLatency (consumerLatencyInMicroSeconds * 1000 );
6973 }
74+ if (extractTimestampFromHeader ) {
75+ this .timestampExtractor = new TimestampExtractor () {
76+ @ Override
77+ public long extract (BasicProperties properties , byte [] body ) throws IOException {
78+ Object timestamp = properties .getHeaders ().get (Producer .TIMESTAMP_HEADER );
79+ return timestamp == null ? Long .MAX_VALUE : (Long ) timestamp ;
80+ }
81+ };
82+ } else {
83+ this .timestampExtractor = new TimestampExtractor () {
84+ @ Override
85+ public long extract (BasicProperties properties , byte [] body ) throws IOException {
86+ DataInputStream d = new DataInputStream (new ByteArrayInputStream (body ));
87+ d .readInt ();
88+ return d .readLong ();
89+ }
90+ };
91+ }
7092 }
7193
7294 public void run () {
@@ -108,13 +130,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
108130 msgCount ++;
109131
110132 if (msgLimit == 0 || msgCount <= msgLimit ) {
111- long msgNano ;
112- Object timestamp = properties .getHeaders ().get ("timestamp" );
113- if (timestamp == null ) {
114- msgNano = Long .MAX_VALUE ;
115- } else {
116- msgNano = (Long ) timestamp ;
117- }
133+ long msgNano = timestampExtractor .extract (properties , body );
134+
118135
119136 long nano = System .nanoTime ();
120137
@@ -209,4 +226,10 @@ public void simulateLatency() {
209226 while (System .nanoTime () - start < delay );
210227 }
211228 }
229+
230+ private interface TimestampExtractor {
231+
232+ long extract (BasicProperties properties , byte [] body ) throws IOException ;
233+
234+ }
212235}
0 commit comments