@@ -120,51 +120,45 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
120120 'group.id' : 'test-group' + Math . random ( ) ,
121121 'enable.auto.commit' : false ,
122122 'auto.offset.reset' : 'earliest' ,
123+ 'fetch.queue.backoff.ms' : '100' ,
123124 } ) ;
124125 await consumer . connect ( ) ;
125126 await consumer . subscribe ( { topic } ) ;
126127
127128 let messagesReceived = 0 ;
129+ let messagesMeasured = 0 ;
128130 let totalMessageSize = 0 ;
129131 let startTime ;
130132 let rate ;
133+ const skippedMessages = 100 ;
134+
135+ console . log ( "Starting consumer." ) ;
136+
131137 consumer . run ( {
132138 eachMessage : async ( { topic, partition, message } ) => {
133139 messagesReceived ++ ;
134- totalMessageSize += message . value . length ;
135- if ( messagesReceived === 1 ) {
136- consumer . pause ( [ { topic } ] ) ;
137- } else if ( messagesReceived === 2 ) {
138- startTime = hrtime ( ) ;
139- } else if ( messagesReceived === totalMessageCnt ) {
140- let elapsed = hrtime ( startTime ) ;
141- let durationNanos = elapsed [ 0 ] * 1e9 + elapsed [ 1 ] ;
142- rate = ( totalMessageSize / durationNanos ) * 1e9 / ( 1024 * 1024 ) ; /* MB/s */
143- console . log ( `Recvd ${ messagesReceived } messages, ${ totalMessageSize } bytes; rate is ${ rate } MB/s` ) ;
144- consumer . pause ( [ { topic } ] ) ;
145- // } else if (messagesReceived % 100 == 0) {
146- // console.log(`Recvd ${messagesReceived} messages, ${totalMessageSize} bytes`);
147- }
148- }
149- } ) ;
140+
141+ if ( messagesReceived >= skippedMessages ) {
142+ messagesMeasured ++ ;
143+ totalMessageSize += message . value . length ;
150144
151- // Wait until the first message is received
152- await new Promise ( ( resolve ) => {
153- let interval = setInterval ( ( ) => {
154- if ( messagesReceived > 0 ) {
155- clearInterval ( interval ) ;
156- resolve ( ) ;
145+ if ( messagesReceived === skippedMessages ) {
146+ startTime = hrtime ( ) ;
147+ } else if ( messagesMeasured === totalMessageCnt ) {
148+ let elapsed = hrtime ( startTime ) ;
149+ let durationNanos = elapsed [ 0 ] * 1e9 + elapsed [ 1 ] ;
150+ rate = ( totalMessageSize / durationNanos ) * 1e9 / ( 1024 * 1024 ) ; /* MB/s */
151+ console . log ( `Recvd ${ messagesMeasured } messages, ${ totalMessageSize } bytes; rate is ${ rate } MB/s` ) ;
152+ consumer . pause ( [ { topic } ] ) ;
153+ }
157154 }
158- } , 100 ) ;
155+ }
159156 } ) ;
160157
161- console . log ( "Starting consumer." )
162-
163158 totalMessageSize = 0 ;
164- consumer . resume ( [ { topic } ] ) ;
165159 await new Promise ( ( resolve ) => {
166160 let interval = setInterval ( ( ) => {
167- if ( messagesReceived >= totalMessageCnt ) {
161+ if ( messagesMeasured >= totalMessageCnt ) {
168162 clearInterval ( interval ) ;
169163 resolve ( ) ;
170164 }
@@ -175,7 +169,8 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
175169 return rate ;
176170}
177171
178- async function runConsumeTransformProduce ( brokers , consumeTopic , produceTopic , totalMessageCnt , messageProcessTimeMs , ctpConcurrency ) {
172+ async function runConsumeTransformProduce ( brokers , consumeTopic , produceTopic , warmupMessages , totalMessageCnt , messageProcessTimeMs , ctpConcurrency ) {
173+ console . log ( "here" ) ;
179174 const kafka = new Kafka ( {
180175 'client.id' : 'kafka-test-performance' ,
181176 'metadata.broker.list' : brokers ,
@@ -205,55 +200,55 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, t
205200 await consumer . subscribe ( { topic : consumeTopic } ) ;
206201
207202 let messagesReceived = 0 ;
203+ let messagesMeasured = 0 ;
208204 let totalMessageSize = 0 ;
209205 let startTime ;
210206 let rate ;
207+ const skippedMessages = warmupMessages ;
208+
209+ console . log ( "Starting consume-transform-produce." ) ;
210+
211211 consumer . run ( {
212212 partitionsConsumedConcurrently : ctpConcurrency ,
213213 eachMessage : async ( { topic, partition, message } ) => {
214- /* Simulate message processing for messageProcessTimeMs */
215- if ( messageProcessTimeMs > 0 ) {
216- await new Promise ( ( resolve ) => setTimeout ( resolve , messageProcessTimeMs ) ) ;
217- }
218- await producer . send ( {
219- topic : produceTopic ,
220- messages : [ { value : message . value } ] ,
221- } )
222214 messagesReceived ++ ;
223- totalMessageSize += message . value . length ;
224- if ( messagesReceived === 1 ) {
225- consumer . pause ( [ { topic } ] ) ;
226- } else if ( messagesReceived === 2 ) {
227- startTime = hrtime ( ) ;
228- } else if ( messagesReceived === totalMessageCnt ) {
229- let elapsed = hrtime ( startTime ) ;
230- let durationNanos = elapsed [ 0 ] * 1e9 + elapsed [ 1 ] ;
231- rate = ( totalMessageSize / durationNanos ) * 1e9 / ( 1024 * 1024 ) ; /* MB/s */
232- console . log ( `Recvd, transformed and sent ${ messagesReceived } messages, ${ totalMessageSize } bytes; rate is ${ rate } MB/s` ) ;
233- consumer . pause ( [ { topic } ] ) ;
234- // } else if (messagesReceived % 1 == 0) {
235- // console.log(`Recvd ${messagesReceived} messages, ${totalMessageSize} bytes`);
236- }
237- }
238- } ) ;
239215
240- // Wait until the first message is received
241- await new Promise ( ( resolve ) => {
242- let interval = setInterval ( ( ) => {
243- if ( messagesReceived > 0 ) {
244- clearInterval ( interval ) ;
245- resolve ( ) ;
216+ if ( messagesReceived >= skippedMessages ) {
217+ messagesMeasured ++ ;
218+ totalMessageSize += message . value . length ;
219+
220+ if ( messagesReceived === skippedMessages )
221+ startTime = hrtime ( ) ;
222+
223+ /* Simulate message processing for messageProcessTimeMs */
224+ if ( messageProcessTimeMs > 0 ) {
225+ await new Promise ( ( resolve ) => setTimeout ( resolve , messageProcessTimeMs ) ) ;
226+ }
227+ await producer . send ( {
228+ topic : produceTopic ,
229+ messages : [ { value : message . value } ] ,
230+ } )
231+
232+ if ( messagesMeasured === totalMessageCnt ) {
233+ let elapsed = hrtime ( startTime ) ;
234+ let durationNanos = elapsed [ 0 ] * 1e9 + elapsed [ 1 ] ;
235+ rate = ( totalMessageSize / durationNanos ) * 1e9 / ( 1024 * 1024 ) ; /* MB/s */
236+ console . log ( `Recvd, transformed and sent ${ messagesMeasured } messages, ${ totalMessageSize } bytes; rate is ${ rate } MB/s` ) ;
237+ consumer . pause ( [ { topic } ] ) ;
238+ }
239+ } else {
240+ await producer . send ( {
241+ topic : produceTopic ,
242+ messages : [ { value : message . value } ] ,
243+ } )
246244 }
247- } , 100 ) ;
245+ }
248246 } ) ;
249247
250- console . log ( "Starting consume-transform-produce." )
251-
252248 totalMessageSize = 0 ;
253- consumer . resume ( [ { topic : consumeTopic } ] ) ;
254249 await new Promise ( ( resolve ) => {
255250 let interval = setInterval ( ( ) => {
256- if ( messagesReceived >= totalMessageCnt ) {
251+ if ( messagesMeasured >= totalMessageCnt ) {
257252 clearInterval ( interval ) ;
258253 resolve ( ) ;
259254 }
0 commit comments