Skip to content

Commit 20a1321

Browse files
committed
rebalanceListener
1 parent fa49bbd commit 20a1321

File tree

2 files changed

+58
-5
lines changed

2 files changed

+58
-5
lines changed

examples/kafkajs/consumer.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,28 @@ const { Kafka } = require('../..').KafkaJS
22
//const { Kafka } = require('kafkajs')
33

44
async function consumerStart() {
5+
let consumer;
56
const kafka = new Kafka({
67
brokers: ['pkc-8w6ry7.us-west-2.aws.devel.cpdev.cloud:9092'],
78
ssl: true,
89
sasl: {
910
mechanism: 'plain',
1011
},
12+
rebalanceListener: {
13+
onPartitionsAssigned: async (assignment) => {
14+
console.log(`Assigned partitions ${JSON.stringify(assignment)}`);
15+
},
16+
onPartitionsRevoked: async (assignment) => {
17+
console.log(`Revoked partitions ${JSON.stringify(assignment)}`);
18+
await consumer.commitOffsets();
19+
}
20+
},
1121
rdKafka: {
1222
'enable.auto.commit': false
1323
}
1424
});
1525

16-
const consumer = kafka.consumer({ groupId: 'test-group' });
26+
consumer = kafka.consumer({ groupId: 'test-group' });
1727

1828
await consumer.connect();
1929
console.log("Connected successfully");
@@ -50,7 +60,7 @@ async function consumerStart() {
5060
const disconnect = () => {
5161
process.off('SIGINT', disconnect);
5262
process.off('SIGTERM', disconnect);
53-
consumer.disconnect().then(() => {
63+
consumer.disconnect().finally(() => {
5464
console.log("Disconnected successfully");
5565
});
5666
}

lib/kafkajs/_consumer.js

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,54 @@ class Consumer {
2929

3030
async #finalizedConfig() {
3131
const config = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
32-
if (this.#kJSConfig.groupId != null) {
32+
if (this.#kJSConfig.groupId) {
3333
config['group.id'] = this.#kJSConfig.groupId;
3434
}
3535
config['offset_commit_cb'] = true;
36+
if (this.#kJSConfig.rebalanceListener) {
37+
config['rebalance_cb'] = (err, assignment) => {
38+
// Create the librdkafka error
39+
err = LibrdKafkaError.create(err);
40+
41+
let call;
42+
switch(err.code) {
43+
case LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS:
44+
call = (this.#kJSConfig.rebalanceListener.onPartitionsAssigned ?
45+
this.#kJSConfig.rebalanceListener.onPartitionsAssigned(assignment) :
46+
Promise.resolve()).catch(console.error);
47+
break;
48+
case LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS:
49+
call = (this.#kJSConfig.rebalanceListener.onPartitionsRevoked ?
50+
this.#kJSConfig.rebalanceListener.onPartitionsRevoked(assignment) :
51+
Promise.resolve()).catch(console.error);
52+
break;
53+
default:
54+
call = Promise.reject().catch(() => {
55+
console.error(`Unexpected rebalanceListener error code ${err.code}`);
56+
});
57+
break;
58+
}
59+
60+
call
61+
.finally(() => {
62+
// Emit the event
63+
this.#internalClient.emit('rebalance', err, assignment);
64+
65+
try {
66+
if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
67+
this.#internalClient.assign(assignment);
68+
} else {
69+
this.#internalClient.unassign();
70+
}
71+
} catch (e) {
72+
// Ignore exceptions if we are not connected
73+
if (this.#internalClient.isConnected()) {
74+
this.#internalClient.emit('rebalance.error', e);
75+
}
76+
}
77+
});
78+
};
79+
}
3680
return config;
3781
}
3882

@@ -164,7 +208,7 @@ class Consumer {
164208
}
165209
}
166210

167-
commitOffsets(topicPartitions = null) {
211+
async commitOffsets(topicPartitions = null) {
168212
try {
169213
if (topicPartitions == null) {
170214
this.#internalClient.commitSync();
@@ -178,7 +222,6 @@ class Consumer {
178222
throw e;
179223
}
180224
}
181-
return Promise.resolve()
182225
}
183226

184227
seek(topicPartitionOffset) {

0 commit comments

Comments
 (0)