1+ import ConfluentRDKafka , { KafkaJS as ConfluentKafka } from '@confluentinc/kafka-javascript'
12import RDKafka from '@platformatic/rdkafka'
23import { printResults , Tracker , type Result } from 'cronometro'
34import { Kafka as KafkaJS , logLevel } from 'kafkajs'
45import { randomUUID } from 'node:crypto'
56import { Consumer , MessagesStreamModes , PromiseWithResolvers } from '../src/index.ts'
67import { brokers , topic } from './utils/definitions.ts'
78
8- const iterations = 10000
9+ const iterations = 100000
10+ const maxBytes = 2048
911
1012function rdkafkaEvented ( ) : Promise < Result > {
1113 const { promise, resolve, reject } = PromiseWithResolvers < Result > ( )
@@ -18,7 +20,7 @@ function rdkafkaEvented (): Promise<Result> {
1820 'metadata.broker.list' : brokers . join ( ',' ) ,
1921 'enable.auto.commit' : false ,
2022 'fetch.min.bytes' : 1 ,
21- 'fetch.message.max.bytes' : 200 ,
23+ 'fetch.message.max.bytes' : maxBytes ,
2224 'fetch.wait.max.ms' : 10
2325 } ,
2426 { 'auto.offset.reset' : 'earliest' }
@@ -78,7 +80,106 @@ function rdkafkaStream (): Promise<Result> {
7880 'metadata.broker.list' : brokers . join ( ',' ) ,
7981 'enable.auto.commit' : false ,
8082 'fetch.min.bytes' : 1 ,
81- 'fetch.message.max.bytes' : 200 ,
83+ 'fetch.message.max.bytes' : maxBytes ,
84+ 'fetch.wait.max.ms' : 10
85+ } ,
86+ { 'auto.offset.reset' : 'earliest' } ,
87+ { topics : [ topic ] , waitInterval : 0 , highWaterMark : 1024 , objectMode : true }
88+ )
89+
90+ let i = 0
91+ let last = process . hrtime . bigint ( )
92+ stream . on ( 'data' , ( ) => {
93+ i ++
94+ tracker . track ( last )
95+ last = process . hrtime . bigint ( )
96+
97+ if ( i === iterations ) {
98+ stream . removeAllListeners ( 'data' )
99+ stream . pause ( )
100+
101+ stream . destroy ( )
102+ resolve ( tracker . results )
103+ }
104+ } )
105+
106+ stream . on ( 'error' , reject )
107+
108+ return promise
109+ }
110+
111+ function confluentRdkafkaEvented ( ) : Promise < Result > {
112+ const { promise, resolve, reject } = PromiseWithResolvers < Result > ( )
113+ const tracker = new Tracker ( )
114+
115+ const consumer = new ConfluentRDKafka . KafkaConsumer (
116+ {
117+ 'client.id' : 'benchmarks' ,
118+ 'group.id' : randomUUID ( ) ,
119+ 'metadata.broker.list' : brokers . join ( ',' ) ,
120+ 'enable.auto.commit' : false ,
121+ 'fetch.min.bytes' : 1 ,
122+ 'fetch.message.max.bytes' : maxBytes ,
123+ 'fetch.wait.max.ms' : 10
124+ } ,
125+ { 'auto.offset.reset' : 'earliest' }
126+ )
127+
128+ let i = 0
129+ let last = process . hrtime . bigint ( )
130+ consumer . on ( 'data' , ( ) => {
131+ i ++
132+ tracker . track ( last )
133+ last = process . hrtime . bigint ( )
134+
135+ if ( i === iterations ) {
136+ consumer . removeAllListeners ( 'data' )
137+ consumer . pause ( [
138+ {
139+ topic,
140+ partition : 0
141+ } ,
142+ {
143+ topic,
144+ partition : 1
145+ } ,
146+ {
147+ topic,
148+ partition : 2
149+ }
150+ ] )
151+
152+ setTimeout ( ( ) => {
153+ consumer . disconnect ( )
154+ resolve ( tracker . results )
155+ } , 100 )
156+ }
157+ } )
158+
159+ consumer . on ( 'ready' , ( ) => {
160+ consumer . subscribe ( [ topic ] )
161+ consumer . consume ( )
162+ } )
163+
164+ consumer . on ( 'event.error' , reject )
165+
166+ consumer . connect ( )
167+
168+ return promise
169+ }
170+
171+ function confluentRdkafkaStream ( ) : Promise < Result > {
172+ const { promise, resolve, reject } = PromiseWithResolvers < Result > ( )
173+ const tracker = new Tracker ( )
174+
175+ const stream = ConfluentRDKafka . KafkaConsumer . createReadStream (
176+ {
177+ 'client.id' : 'benchmarks' ,
178+ 'group.id' : randomUUID ( ) ,
179+ 'metadata.broker.list' : brokers . join ( ',' ) ,
180+ 'enable.auto.commit' : false ,
181+ 'fetch.min.bytes' : 1 ,
182+ 'fetch.message.max.bytes' : maxBytes ,
82183 'fetch.wait.max.ms' : 10
83184 } ,
84185 { 'auto.offset.reset' : 'earliest' } ,
@@ -111,7 +212,7 @@ async function kafkajs (): Promise<Result> {
111212 const tracker = new Tracker ( )
112213
113214 const client = new KafkaJS ( { clientId : 'benchmarks' , brokers, logLevel : logLevel . ERROR } )
114- const consumer = client . consumer ( { groupId : randomUUID ( ) , maxWaitTimeInMs : 10 , maxBytes : 200 } )
215+ const consumer = client . consumer ( { groupId : randomUUID ( ) , maxWaitTimeInMs : 10 , maxBytes } )
115216
116217 await consumer . connect ( )
117218 await consumer . subscribe ( { topics : [ topic ] , fromBeginning : true } )
@@ -123,13 +224,54 @@ async function kafkajs (): Promise<Result> {
123224 await consumer . run ( {
124225 autoCommit : false ,
125226 partitionsConsumedConcurrently : 1 ,
126- async eachMessage ( { pause } ) {
227+ async eachMessage ( ) {
228+ i ++
229+ tracker . track ( last )
230+ last = process . hrtime . bigint ( )
231+
232+ if ( i === iterations ) {
233+ consumer . disconnect ( )
234+ resolve ( tracker . results )
235+ }
236+ }
237+ } )
238+
239+ return promise
240+ }
241+
242+ async function confluentKafkaJS ( ) : Promise < Result > {
243+ const { promise, resolve } = PromiseWithResolvers < Result > ( )
244+ const tracker = new Tracker ( )
245+
246+ const client = new ConfluentKafka . Kafka ( {
247+ kafkaJS : {
248+ clientId : 'benchmarks' ,
249+ brokers
250+ }
251+ } )
252+ const consumer = client . consumer ( {
253+ kafkaJS : {
254+ groupId : randomUUID ( ) ,
255+ maxWaitTimeInMs : 10 ,
256+ maxBytes,
257+ fromBeginning : true ,
258+ autoCommit : false
259+ }
260+ } )
261+
262+ await consumer . connect ( )
263+ await consumer . subscribe ( { topics : [ topic ] } )
264+
265+ let i = 0
266+ let last = process . hrtime . bigint ( )
267+ await consumer . run ( {
268+ partitionsConsumedConcurrently : 1 ,
269+ async eachMessage ( ) {
127270 i ++
128271 tracker . track ( last )
129272 last = process . hrtime . bigint ( )
130273
131274 if ( i === iterations ) {
132- pause ( )
133275 consumer . disconnect ( )
134276 resolve ( tracker . results )
135277 }
@@ -148,7 +290,7 @@ async function platformaticKafka (): Promise<Result> {
148290 groupId : randomUUID ( ) ,
149291 bootstrapBrokers : brokers ,
150292 minBytes : 1 ,
151- maxBytes : 200 ,
293+ maxBytes,
152294 maxWaitTime : 10 ,
153295 autocommit : false
154296 } )
@@ -163,8 +305,10 @@ async function platformaticKafka (): Promise<Result> {
163305 last = process . hrtime . bigint ( )
164306
165307 if ( i === iterations ) {
166- consumer . close ( true , ( ) => {
167- resolve ( tracker . results )
308+ process . nextTick ( ( ) => {
309+ consumer . close ( true , ( ) => {
310+ resolve ( tracker . results )
311+ } )
168312 } )
169313 }
170314 } )
@@ -177,7 +321,10 @@ async function platformaticKafka (): Promise<Result> {
177321const results = {
178322 'node-rdkafka (evented)' : await rdkafkaEvented ( ) ,
179323 'node-rdkafka (stream)' : await rdkafkaStream ( ) ,
180- kafkajs : await kafkajs ( ) ,
324+ '@confluentinc/kafka-javascript (node-rdkafka, evented)' : await confluentRdkafkaEvented ( ) ,
325+ '@confluentinc/kafka-javascript (node-rdkafka, stream)' : await confluentRdkafkaStream ( ) ,
326+ KafkaJS : await kafkajs ( ) ,
327+ '@confluentinc/kafka-javascript (KafkaJS)' : await confluentKafkaJS ( ) ,
181328 '@platformatic/kafka' : await platformaticKafka ( )
182329}
183330
0 commit comments