3535import java .nio .charset .StandardCharsets ;
3636import java .util .concurrent .*;
3737import java .util .concurrent .atomic .AtomicBoolean ;
38+ import java .util .concurrent .atomic .AtomicReference ;
3839import org .slf4j .Logger ;
3940import org .slf4j .LoggerFactory ;
4041
@@ -88,6 +89,43 @@ public static void main(String[] args) throws IOException {
8889 management .queue ().name (q ).type (QUORUM ).declare ();
8990 management .binding ().sourceExchange (e ).destinationQueue (q ).key (rk ).bind ();
9091
92+ java .util .function .Consumer <Message > recordMessage =
93+ msg -> {
94+ try {
95+ long time = readLong (msg .body ());
96+ metrics .latency (System .currentTimeMillis () - time , TimeUnit .MILLISECONDS );
97+ } catch (Exception ex ) {
98+ // not able to read the body, maybe not a message from the
99+ // tool
100+ }
101+ };
102+
103+ int initialCredits = 1000 ;
104+ Consumer .MessageHandler handler ;
105+ int disposeEvery = 1 ;
106+
107+ if (disposeEvery <= 1 ) {
108+ handler =
109+ (context , message ) -> {
110+ recordMessage .accept (message );
111+ context .accept ();
112+ };
113+ } else {
114+ AtomicReference <Consumer .BatchContext > batch = new AtomicReference <>();
115+ handler =
116+ (context , message ) -> {
117+ recordMessage .accept (message );
118+ if (batch .get () == null ) {
119+ batch .set (context .batch ());
120+ }
121+ batch .get ().add (context );
122+ if (batch .get ().size () == disposeEvery ) {
123+ batch .get ().accept ();
124+ batch .set (null );
125+ }
126+ };
127+ }
128+
91129 connection
92130 .consumerBuilder ()
93131 .listeners (
@@ -97,18 +135,8 @@ public static void main(String[] args) throws IOException {
97135 }
98136 })
99137 .queue (q )
100- .initialCredits (1000 )
101- .messageHandler (
102- (context , message ) -> {
103- context .accept ();
104- try {
105- long time = readLong (message .body ());
106- metrics .latency (System .currentTimeMillis () - time , TimeUnit .MILLISECONDS );
107- } catch (Exception ex ) {
108- // not able to read the body, maybe not a message from the
109- // tool
110- }
111- })
138+ .initialCredits (initialCredits )
139+ .messageHandler (handler )
112140 .build ();
113141
114142 executorService .submit (
0 commit comments