Skip to content

Commit 433fdf4

Browse files
authored
Kafka manual group join (#311)
* Adding manual group join * Release prepare + lint fix
1 parent d9f2518 commit 433fdf4

File tree

2 files changed

+8
-1
lines changed

2 files changed

+8
-1
lines changed

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ export abstract class AbstractKafkaConsumer<
112112

113113
try {
114114
const { handlers, ...consumeOptions } = this.options // Handlers cannot be passed to consume method
115+
116+
// https://github.com/platformatic/kafka/blob/main/docs/consumer.md#my-consumer-is-not-receiving-any-message-when-the-application-restarts
117+
await this.consumer.joinGroup({
118+
sessionTimeout: consumeOptions.sessionTimeout,
119+
rebalanceTimeout: consumeOptions.rebalanceTimeout,
120+
heartbeatInterval: consumeOptions.heartbeatInterval,
121+
})
115122
this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics })
116123
} catch (error) {
117124
throw new InternalError({

packages/kafka/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
3-
"version": "0.3.0",
3+
"version": "0.3.1",
44
"engines": {
55
"node": ">= 22.14.0"
66
},

0 commit comments

Comments
 (0)