@@ -60,8 +60,22 @@ function genericProduceToTopic(producer, topic, messages) {
6060
6161async function runConsumer ( consumer , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , actionOnMessages ) {
6262 const handlers = installHandlers ( totalMessageCnt === - 1 ) ;
63- await consumer . connect ( ) ;
64- await consumer . subscribe ( { topic } ) ;
63+ while ( true ) {
64+ try {
65+ await consumer . connect ( ) ;
66+ break ;
67+ } catch ( e ) {
68+ console . error ( `Error connecting consumer: ${ e } ` ) ;
69+ }
70+ }
71+ while ( true ) {
72+ try {
73+ await consumer . subscribe ( { topic } ) ;
74+ break ;
75+ } catch ( e ) {
76+ console . error ( `Error subscribing consumer: ${ e } ` ) ;
77+ }
78+ }
6579
6680 let messagesReceived = 0 ;
6781 let messagesMeasured = 0 ;
@@ -73,6 +87,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
7387 let startTime ;
7488 let rate ;
7589 let consumptionStopped = false ;
90+ let lastMessageReceivedAt ;
7691 const skippedMessages = warmupMessages ;
7792 const decoder = new TextDecoder ( 'utf-8' ) ;
7893
@@ -111,11 +126,16 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
111126 if ( consumptionStopped )
112127 return ;
113128 consumptionStopped = true ;
114- let durationNanos = Number ( hrtime . bigint ( ) - startTime ) ;
129+ const now = lastMessageReceivedAt || hrtime . bigint ( ) ;
130+ let durationNanos = Number ( now - startTime ) ;
115131 durationSeconds = durationNanos / 1e9 ;
116132 rate = ( totalMessageSize / durationNanos ) * 1e9 / ( 1024 * 1024 ) ; /* MB/s */
117133 console . log ( `Recvd ${ messagesMeasured } messages in ${ durationSeconds } seconds, ${ totalMessageSize } bytes; rate is ${ rate } MB/s` ) ;
118- consumer . pause ( [ { topic } ] ) ;
134+ try {
135+ consumer . pause ( [ { topic } ] ) ;
136+ } catch ( e ) {
137+ console . error ( `Error pausing consumer: ${ e } ` ) ;
138+ }
119139 }
120140
121141 console . log ( "Starting consumer." ) ;
@@ -153,6 +173,8 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
153173 consumeMethod = {
154174 partitionsConsumedConcurrently,
155175 eachBatch : async ( { batch } ) => {
176+ if ( ! batch . messages )
177+ return ;
156178 const messagesBeforeBatch = messagesReceived ;
157179 const topic = batch . topic ;
158180 const partition = batch . partition ;
@@ -170,6 +192,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
170192 messages = messages . slice ( messages . length - messagesMeasured ) ;
171193 }
172194 const now = Date . now ( ) ;
195+ lastMessageReceivedAt = hrtime . bigint ( ) ;
173196 messagesBase = messagesMeasured - messages . length ;
174197 let i = 1 ;
175198 for ( const message of messages ) {
@@ -187,7 +210,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
187210
188211 if ( actionOnMessages ) {
189212 await actionOnMessages ( batch . messages ) ;
190- if ( messagesMeasured > 0 && messages . length > 0 ) {
213+ if ( messagesMeasured > 0 && messages && messages . length > 0 ) {
191214 let i = 1 ;
192215 const now = Date . now ( ) ;
193216 for ( const message of messages ) {
0 commit comments