1515
1616package com .rabbitmq .perf ;
1717
18- import com .rabbitmq .client .AMQP ;
1918import com .rabbitmq .client .AMQP .BasicProperties ;
2019import com .rabbitmq .client .Channel ;
2120import com .rabbitmq .client .DefaultConsumer ;
3130import java .util .Map ;
3231import java .util .concurrent .CountDownLatch ;
3332import java .util .concurrent .TimeUnit ;
33+ import java .util .function .BiFunction ;
3434
3535public class Consumer extends ProducerConsumerBase implements Runnable {
3636
@@ -47,7 +47,7 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
4747 private final CountDownLatch latch = new CountDownLatch (1 );
4848 private final Map <String , String > consumerTagBranchMap = Collections .synchronizedMap (new HashMap <String , String >());
4949 private final ConsumerLatency consumerLatency ;
50- private final TimestampExtractor timestampExtractor ;
50+ private final BiFunction < BasicProperties , byte [], Long > timestampExtractor ;
5151 private final TimestampProvider timestampProvider ;
5252
5353 public Consumer (Channel channel , String id ,
@@ -77,21 +77,20 @@ public Consumer(Channel channel, String id,
7777 }
7878
7979 if (timestampProvider .isTimestampInHeader ()) {
80- this .timestampExtractor = new TimestampExtractor () {
81- @ Override
82- public long extract (BasicProperties properties , byte [] body ) throws IOException {
80+ this .timestampExtractor = (properties , body ) -> {
8381 Object timestamp = properties .getHeaders ().get (Producer .TIMESTAMP_HEADER );
8482 return timestamp == null ? Long .MAX_VALUE : (Long ) timestamp ;
85- }
8683 };
8784 } else {
88- this .timestampExtractor = new TimestampExtractor () {
89- @ Override
90- public long extract (BasicProperties properties , byte [] body ) throws IOException {
91- DataInputStream d = new DataInputStream (new ByteArrayInputStream (body ));
85+ this .timestampExtractor = (properties , body ) -> {
86+ DataInputStream d = new DataInputStream (new ByteArrayInputStream (body ));
87+ try {
9288 d .readInt ();
9389 return d .readLong ();
90+ } catch (IOException e ) {
91+ throw new RuntimeException ("Error while extracting timestamp from body" );
9492 }
93+
9594 };
9695 }
9796 }
@@ -135,8 +134,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
135134 msgCount ++;
136135
137136 if (msgLimit == 0 || msgCount <= msgLimit ) {
138- long msg_ts = timestampExtractor .extract (properties , body );
139- long now_ts = timestampProvider .getCurrentTime ();
137+ long messageTimestamp = timestampExtractor .apply (properties , body );
138+ long nowTimestamp = timestampProvider .getCurrentTime ();
140139
141140 if (!autoAck ) {
142141 if (multiAckEvery == 0 ) {
@@ -150,10 +149,10 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
150149 channel .txCommit ();
151150 }
152151
153- now = System .currentTimeMillis ();
154-
155- long diff_time = timestampProvider .getDifference (now_ts , msg_ts );
152+ long diff_time = timestampProvider .getDifference (nowTimestamp , messageTimestamp );
156153 stats .handleRecv (id .equals (envelope .getRoutingKey ()) ? diff_time : 0L );
154+
155+ now = System .currentTimeMillis ();
157156 if (rateLimit > 0.0f ) {
158157 delay (now );
159158 }
@@ -231,9 +230,4 @@ public void simulateLatency() {
231230 }
232231 }
233232
234- private interface TimestampExtractor {
235-
236- long extract (BasicProperties properties , byte [] body ) throws IOException ;
237-
238- }
239233}
0 commit comments