Skip to content

Commit 9c3d302

Browse files
authored
Waiting for a batch to be processed before accepting more messages (#371)
* Waiting for a batch to be processed before accepting more messages * Release prepare + dep update * Fixing type issue
1 parent c36b3aa commit 9c3d302

File tree

3 files changed

+15
-13
lines changed

3 files changed

+15
-13
lines changed

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -206,17 +206,9 @@ export abstract class AbstractKafkaConsumer<
206206
}
207207

208208
if (this.options.batchProcessingEnabled && this.messageBatchStream) {
209-
this.messageBatchStream.on('data', async (messageBatch) =>
210-
this.consume(messageBatch.topic, messageBatch.messages),
211-
)
212-
this.messageBatchStream.on('error', (error) => this.handlerError(error))
209+
this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error))
213210
} else {
214-
// biome-ignore lint/style/noNonNullAssertion: consumerStream is always created
215-
const stream = this.consumerStream!
216-
217-
// we are not waiting for the stream to complete
218-
// because init() must return promised void
219-
this.handleSyncStream(stream).catch((error) => this.handlerError(error))
211+
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
220212
}
221213

222214
this.consumerStream.on('error', (error) => this.handlerError(error))
@@ -232,6 +224,16 @@ export abstract class AbstractKafkaConsumer<
232224
)
233225
}
234226
}
227+
private async handleSyncStreamBatch(
228+
stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
229+
): Promise<void> {
230+
for await (const messageBatch of stream) {
231+
await this.consume(
232+
messageBatch.topic,
233+
messageBatch.messages as DeserializedMessage<SupportedMessageValues<TopicsConfig>>,
234+
)
235+
}
236+
}
235237

236238
async close(): Promise<void> {
237239
if (!this.consumerStream && !this.messageBatchStream) {

packages/kafka/lib/utils/KafkaMessageBatchStream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export type MessageBatch<TMessage> = { topic: string; partition: number; message
1414

1515
export interface KafkaMessageBatchStream<TMessage extends MessageWithTopicAndPartition>
1616
extends Duplex {
17-
// biome-ignore lint/suspicious/noExplicitAny: compatible with Duplex definition
17+
// biome-ignore lint/suspicious/noExplicitAny: compatible with Duplex definition
1818
on(event: string | symbol, listener: (...args: any[]) => void): this
1919
on(event: 'data', listener: (chunk: MessageBatch<TMessage>) => void): this
2020

packages/kafka/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
3-
"version": "0.8.0",
3+
"version": "0.8.1",
44
"engines": {
55
"node": ">= 22.14.0"
66
},
@@ -53,7 +53,7 @@
5353
"dependencies": {
5454
"@lokalise/node-core": "^14.2.0",
5555
"@lokalise/universal-ts-utils": "^4.5.1",
56-
"@platformatic/kafka": "^1.21.0"
56+
"@platformatic/kafka": "^1.22.0"
5757
},
5858
"peerDependencies": {
5959
"@message-queue-toolkit/core": ">=23.0.0",

0 commit comments

Comments
 (0)