Skip to content

Commit 6e49c46

Browse files
committed
Add some checks for kafkaJS compability mode
1 parent ea487f9 commit 6e49c46

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

lib/kafkajs/_consumer.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ class Consumer {
343343
try {
344344
if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
345345

346-
if (this.#checkPendingSeeks)
346+
if (this.#checkPendingSeeks && this.#kafkaJSCompatibilityMode)
347347
assignment = this.#assignAsPerSeekedOffsets(assignment);
348348

349349
this.#internalClient.assign(assignment);
@@ -1099,9 +1099,10 @@ class Consumer {
10991099
this.#checkPendingSeeks = true;
11001100
this.#pendingSeeks.set(`${rdKafkaTopicPartitionOffset.topic}|${rdKafkaTopicPartitionOffset.partition}`, rdKafkaTopicPartitionOffset.offset);
11011101

1102-
/* Immediately realize the seek if we're not in compatibility mode. */
1102+
/* Immediately realize the seek if we're not in compatibility mode. And clear pending seeks.
1103+
* We don't need them for rebalance. */
11031104
if (!this.#kafkaJSCompatibilityMode) {
1104-
return this.#seekInternal();
1105+
return this.#seekInternal().then(() => this.#pendingSeeks.clear());
11051106
}
11061107
}
11071108

0 commit comments

Comments
 (0)