1- const { Kafka, CompressionTypes , ErrorCodes } = require ( '../../' ) . KafkaJS ;
1+ const { Kafka, ErrorCodes } = require ( '../../' ) . KafkaJS ;
22const { randomBytes } = require ( 'crypto' ) ;
33const { hrtime } = require ( 'process' ) ;
44
5+ module . exports = {
6+ runProducer,
7+ runConsumer,
8+ runConsumeTransformProduce,
9+ } ;
10+
511async function runProducer ( brokers , topic , batchSize , warmupMessages , totalMessageCnt , msgSize , compression ) {
612 let totalMessagesSent = 0 ;
713 let totalBytesSent = 0 ;
@@ -61,7 +67,6 @@ async function runProducer(brokers, topic, batchSize, warmupMessages, totalMessa
6167 }
6268 await Promise . all ( promises ) ;
6369 }
64- console . log ( { messagesDispatched, totalMessageCnt} )
6570 let elapsed = hrtime ( startTime ) ;
6671 let durationNanos = elapsed [ 0 ] * 1e9 + elapsed [ 1 ] ;
6772 let rate = ( totalBytesSent / durationNanos ) * 1e9 / ( 1024 * 1024 ) ; /* MB/s */
@@ -81,7 +86,7 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
8186 'group.id' : 'test-group' + Math . random ( ) ,
8287 'enable.auto.commit' : false ,
8388 'auto.offset.reset' : 'earliest' ,
84- } ) ;
89+ } ) ;
8590 await consumer . connect ( ) ;
8691 await consumer . subscribe ( { topic } ) ;
8792
@@ -101,8 +106,8 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
101106 rate = ( totalMessageSize / durationNanos ) * 1e9 / ( 1024 * 1024 ) ; /* MB/s */
102107 console . log ( `Recvd ${ messagesReceived } messages, ${ totalMessageSize } bytes; rate is ${ rate } MB/s` ) ;
103108 consumer . pause ( [ { topic } ] ) ;
104- // } else if (messagesReceived % 100 == 0) {
105- // console.log(`Recvd ${messagesReceived} messages, ${totalMessageSize} bytes`);
109+ // } else if (messagesReceived % 100 == 0) {
110+ // console.log(`Recvd ${messagesReceived} messages, ${totalMessageSize} bytes`);
106111 }
107112 }
108113 } ) ;
@@ -135,15 +140,77 @@ async function runConsumer(brokers, topic, totalMessageCnt) {
135140 return rate ;
136141}
137142
138- const brokers = process . env . KAFKA_BROKERS || 'localhost:9092' ;
139- const topic = process . env . KAFKA_TOPIC || 'test-topic' ;
140- const messageCount = process . env . MESSAGE_COUNT ? + process . env . MESSAGE_COUNT : 1000000 ;
141- const messageSize = process . env . MESSAGE_SIZE ? + process . env . MESSAGE_SIZE : 256 ;
142- const batchSize = process . env . BATCH_SIZE ? + process . env . BATCH_SIZE : 100 ;
143- const compression = process . env . COMPRESSION || CompressionTypes . NONE ;
144- const warmupMessages = process . env . WARMUP_MESSAGES ? + process . env . WARMUP_MESSAGES : ( batchSize * 10 ) ;
145-
146- runProducer ( brokers , topic , batchSize , warmupMessages , messageCount , messageSize , compression ) . then ( async ( producerRate ) => {
147- const consumerRate = await runConsumer ( brokers , topic , messageCount ) ;
148- console . log ( producerRate , consumerRate ) ;
149- } ) ;
143+ async function runConsumeTransformProduce ( brokers , consumeTopic , produceTopic , totalMessageCnt ) {
144+ const kafka = new Kafka ( {
145+ 'client.id' : 'kafka-test-performance' ,
146+ 'metadata.broker.list' : brokers ,
147+ } ) ;
148+
149+ const producer = kafka . producer ( {
150+ /* We want things to be flushed immediately as we'll be awaiting this. */
151+ 'linger.ms' : 0
152+ } ) ;
153+ await producer . connect ( ) ;
154+
155+ const consumer = kafka . consumer ( {
156+ 'group.id' : 'test-group' + Math . random ( ) ,
157+ 'enable.auto.commit' : false ,
158+ 'auto.offset.reset' : 'earliest' ,
159+ } ) ;
160+ await consumer . connect ( ) ;
161+ await consumer . subscribe ( { topic : consumeTopic } ) ;
162+
163+ let messagesReceived = 0 ;
164+ let totalMessageSize = 0 ;
165+ let startTime ;
166+ let rate ;
167+ consumer . run ( {
168+ eachMessage : async ( { topic, partition, message } ) => {
169+ await producer . send ( {
170+ topic : produceTopic ,
171+ messages : [ { value : message . value } ] ,
172+ } )
173+ messagesReceived ++ ;
174+ totalMessageSize += message . value . length ;
175+ if ( messagesReceived === 1 ) {
176+ consumer . pause ( [ { topic } ] ) ;
177+ } else if ( messagesReceived === totalMessageCnt ) {
178+ let elapsed = hrtime ( startTime ) ;
179+ let durationNanos = elapsed [ 0 ] * 1e9 + elapsed [ 1 ] ;
180+ rate = ( totalMessageSize / durationNanos ) * 1e9 / ( 1024 * 1024 ) ; /* MB/s */
181+ console . log ( `Recvd, transformed and sent ${ messagesReceived } messages, ${ totalMessageSize } bytes; rate is ${ rate } MB/s` ) ;
182+ consumer . pause ( [ { topic } ] ) ;
183+ // } else if (messagesReceived % 1 == 0) {
184+ // console.log(`Recvd ${messagesReceived} messages, ${totalMessageSize} bytes`);
185+ }
186+ }
187+ } ) ;
188+
189+ // Wait until the first message is received
190+ await new Promise ( ( resolve ) => {
191+ let interval = setInterval ( ( ) => {
192+ if ( messagesReceived > 0 ) {
193+ clearInterval ( interval ) ;
194+ resolve ( ) ;
195+ }
196+ } , 100 ) ;
197+ } ) ;
198+
199+ console . log ( "Starting consume-transform-produce." )
200+
201+ totalMessageSize = 0 ;
202+ startTime = hrtime ( ) ;
203+ consumer . resume ( [ { topic : consumeTopic } ] ) ;
204+ await new Promise ( ( resolve ) => {
205+ let interval = setInterval ( ( ) => {
206+ if ( messagesReceived >= totalMessageCnt ) {
207+ clearInterval ( interval ) ;
208+ resolve ( ) ;
209+ }
210+ } , 1000 ) ;
211+ } ) ;
212+
213+ await consumer . disconnect ( ) ;
214+ await producer . disconnect ( ) ;
215+ return rate ;
216+ }
0 commit comments