Skip to content

Commit c1bb835

Browse files
committed
commitOffsets implementation
1 parent b2f451d commit c1bb835

File tree

2 files changed

+29
-6
lines changed

2 files changed

+29
-6
lines changed

examples/kafkajs/consumer.js

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ async function consumerStart() {
77
ssl: true,
88
sasl: {
99
mechanism: 'plain',
10-
username: '<fill>',
11-
password: '<fill>',
10+
},
11+
rdKafka: {
12+
'enable.auto.commit': false
1213
}
1314
});
1415

@@ -23,6 +24,7 @@ async function consumerStart() {
2324
]
2425
})
2526

27+
var batch = 0;
2628
consumer.run({
2729
eachMessage: async ({ topic, partition, message }) => {
2830
console.log({
@@ -32,13 +34,21 @@ async function consumerStart() {
3234
key: message.key?.toString(),
3335
value: message.value.toString(),
3436
})
37+
38+
if (++batch % 100 == 0) {
39+
await consumer.commitOffsets();
40+
batch = 0;
41+
}
3542
},
3643
});
3744

38-
const disconnect = () =>
45+
const disconnect = () => {
46+
process.off('SIGINT', disconnect);
47+
process.off('SIGTERM', disconnect);
3948
consumer.disconnect().then(() => {
4049
console.log("Disconnected successfully");
4150
});
51+
}
4252
process.on('SIGINT', disconnect);
4353
process.on('SIGTERM', disconnect);
4454
}

lib/kafkajs/_consumer.js

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class Consumer {
3131
if (this.#kJSConfig.groupId != null) {
3232
config["group.id"] = this.#kJSConfig.groupId;
3333
}
34+
config['offset_commit_cb'] = true;
3435
return config;
3536
}
3637

@@ -101,8 +102,10 @@ class Consumer {
101102
async #consumeSingle() {
102103
return new Promise((resolve, reject) => {
103104
this.#internalClient.consume(1, function(err, messages) {
104-
if (err)
105+
if (err) {
105106
reject(`Consume error code ${err.code}`);
107+
return;
108+
}
106109

107110
const message = messages[0];
108111
resolve(message);
@@ -152,8 +155,18 @@ class Consumer {
152155
}
153156
}
154157

155-
async commitOffsets(topicPartitions) {
156-
this.#notImplemented();
158+
commitOffsets(topicPartitions = null) {
159+
if (topicPartitions == null) {
160+
this.#internalClient.commitSync();
161+
} else {
162+
const topicPartitions = topicPartitions.map((tpo) => ({
163+
topic: tpo.topic,
164+
partition: tpo.partition,
165+
offset: Number(tpo.offset),
166+
}))
167+
this.#internalClient.commitSync(topicPartitions);
168+
}
169+
return Promise.resolve()
157170
}
158171

159172
seek(topicPartitionOffset) {

0 commit comments

Comments
 (0)