diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 2e44b2ca..0e5764af 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -206,17 +206,9 @@ export abstract class AbstractKafkaConsumer< } if (this.options.batchProcessingEnabled && this.messageBatchStream) { - this.messageBatchStream.on('data', async (messageBatch) => - this.consume(messageBatch.topic, messageBatch.messages), - ) - this.messageBatchStream.on('error', (error) => this.handlerError(error)) + this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error)) } else { - // biome-ignore lint/style/noNonNullAssertion: consumerStream is always created - const stream = this.consumerStream! - - // we are not waiting for the stream to complete - // because init() must return promised void - this.handleSyncStream(stream).catch((error) => this.handlerError(error)) + this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) } this.consumerStream.on('error', (error) => this.handlerError(error)) @@ -232,6 +224,16 @@ export abstract class AbstractKafkaConsumer< ) } } + private async handleSyncStreamBatch( + stream: KafkaMessageBatchStream>>, + ): Promise { + for await (const messageBatch of stream) { + await this.consume( + messageBatch.topic, + messageBatch.messages as DeserializedMessage>, + ) + } + } async close(): Promise { if (!this.consumerStream && !this.messageBatchStream) { diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 54178c0a..5ae29df8 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -14,7 +14,7 @@ export type MessageBatch = { topic: string; partition: number; message export interface KafkaMessageBatchStream extends Duplex { - // biome-ignore lint/suspicious/noExplicitAny: compatible with Duplex definition + // biome-ignore lint/suspicious/noExplicitAny: compatible with Duplex definition on(event: string | symbol, listener: (...args: any[]) => void): this on(event: 'data', listener: (chunk: MessageBatch) => void): this diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 753fe344..cf7bd405 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.8.0", + "version": "0.8.1", "engines": { "node": ">= 22.14.0" }, @@ -53,7 +53,7 @@ "dependencies": { "@lokalise/node-core": "^14.2.0", "@lokalise/universal-ts-utils": "^4.5.1", - "@platformatic/kafka": "^1.21.0" + "@platformatic/kafka": "^1.22.0" }, "peerDependencies": { "@message-queue-toolkit/core": ">=23.0.0",