Skip to content

Commit fa49bbd

Browse files
committed
Seek implementation
1 parent c1bb835 commit fa49bbd

File tree

3 files changed

+47
-20
lines changed

3 files changed

+47
-20
lines changed

examples/kafkajs/consumer.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ async function consumerStart() {
3636
})
3737

3838
if (++batch % 100 == 0) {
39+
await consumer.seek({
40+
topic,
41+
partition,
42+
offset: -2
43+
});
3944
await consumer.commitOffsets();
4045
batch = 0;
4146
}

lib/kafkajs/_consumer.js

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
const LibrdKafkaError = require('../error');
12
const RdKafka = require('../rdkafka');
23
const { kafkaJSToRdKafkaConfig } = require('./_common');
34

@@ -29,7 +30,7 @@ class Consumer {
2930
async #finalizedConfig() {
3031
const config = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
3132
if (this.#kJSConfig.groupId != null) {
32-
config["group.id"] = this.#kJSConfig.groupId;
33+
config['group.id'] = this.#kJSConfig.groupId;
3334
}
3435
config['offset_commit_cb'] = true;
3536
return config;
@@ -43,20 +44,20 @@ class Consumer {
4344
this.#state = ConsumerState.CONNECTED;
4445

4546
// Resolve the promise.
46-
this.#connectPromiseFunc["resolve"]();
47+
this.#connectPromiseFunc['resolve']();
4748
}
4849

4950
#errorCb(args) {
5051
console.log('error', args);
5152
if (this.#state === ConsumerState.CONNECTING) {
52-
this.#connectPromiseFunc["reject"](args);
53+
this.#connectPromiseFunc['reject'](args);
5354
} else {
5455
// do nothing for now.
5556
}
5657
}
5758

5859
#notImplemented() {
59-
throw new Error("Not implemented");
60+
throw new Error('Not implemented');
6061
}
6162

6263
#createPayload(message) {
@@ -66,7 +67,7 @@ class Consumer {
6667
}
6768

6869
let timestamp = message.timestamp ? new Date(message.timestamp).toISOString()
69-
: "";
70+
: '';
7071

7172
var headers = undefined;
7273
if (message.headers) {
@@ -113,9 +114,17 @@ class Consumer {
113114
});
114115
}
115116

117+
#topicPartitionOffsetToRdKafka(tpo) {
118+
return {
119+
topic: tpo.topic,
120+
partition: tpo.partition,
121+
offset: Number(tpo.offset),
122+
}
123+
}
124+
116125
async connect() {
117126
if (this.#state !== ConsumerState.INIT) {
118-
return Promise.reject("Connect has already been called elsewhere.");
127+
return Promise.reject('Connect has already been called elsewhere.');
119128
}
120129

121130
this.#state = ConsumerState.CONNECTING;
@@ -126,9 +135,9 @@ class Consumer {
126135

127136
return new Promise((resolve, reject) => {
128137
this.#connectPromiseFunc = {resolve, reject};
129-
console.log("Connecting....");
138+
console.log('Connecting....');
130139
this.#internalClient.connect();
131-
console.log("connect() called");
140+
console.log('connect() called');
132141
});
133142
}
134143

@@ -142,7 +151,7 @@ class Consumer {
142151

143152
async run(config) {
144153
if (this.#state !== ConsumerState.CONNECTED) {
145-
throw new Error("Run must be called in state CONNECTED.");
154+
throw new Error('Run must be called in state CONNECTED.');
146155
}
147156

148157
while (this.#state === ConsumerState.CONNECTED) {
@@ -156,21 +165,34 @@ class Consumer {
156165
}
157166

158167
commitOffsets(topicPartitions = null) {
159-
if (topicPartitions == null) {
160-
this.#internalClient.commitSync();
161-
} else {
162-
const topicPartitions = topicPartitions.map((tpo) => ({
163-
topic: tpo.topic,
164-
partition: tpo.partition,
165-
offset: Number(tpo.offset),
166-
}))
167-
this.#internalClient.commitSync(topicPartitions);
168+
try {
169+
if (topicPartitions == null) {
170+
this.#internalClient.commitSync();
171+
} else {
172+
const topicPartitions = topicPartitions.map(
173+
this.#topicPartitionOffsetToRdKafka);
174+
this.#internalClient.commitSync(topicPartitions);
175+
}
176+
} catch (e) {
177+
if (!e.code || e.code != LibrdKafkaError.codes.ERR__NO_OFFSET) {
178+
throw e;
179+
}
168180
}
169181
return Promise.resolve()
170182
}
171183

172184
seek(topicPartitionOffset) {
173-
this.#notImplemented();
185+
return new Promise((resolve, reject) => {
186+
const rdKafkaTopicPartitionOffset =
187+
this.#topicPartitionOffsetToRdKafka(topicPartitionOffset);
188+
this.#internalClient.seek(rdKafkaTopicPartitionOffset, 0, (err) => {
189+
if (err) {
190+
reject(new Error(`Seek error code ${err.code}`));
191+
} else {
192+
resolve();
193+
}
194+
});
195+
}).catch(console.error); // Default handler
174196
}
175197

176198
async describeGroup() {

types/kafkajs.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ export type Consumer = {
413413
stop(): Promise<void>
414414
run(config?: ConsumerRunConfig): Promise<void>
415415
commitOffsets(topicPartitions: Array<TopicPartitionOffsetAndMetadata>): Promise<void>
416-
seek(topicPartitionOffset: TopicPartitionOffset): void
416+
seek(topicPartitionOffset: TopicPartitionOffset): Promise<void>
417417
describeGroup(): Promise<GroupDescription>
418418
pause(topics: Array<{ topic: string; partitions?: number[] }>): void
419419
paused(): TopicPartitions[]

0 commit comments

Comments
 (0)