Skip to content

Commit 85b1341

Browse files
committed
Add tests for consumer.seek()
1 parent d1c30f4 commit 85b1341

File tree

4 files changed

+522
-21
lines changed

4 files changed

+522
-21
lines changed

MIGRATION.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,9 @@
291291
* `commitOffsets` does not (YET) support sending metadata for topic partitions being committed.
292292
* `paused()` is not (YET) supported.
293293
* Custom partition assignors are not supported.
294+
* Changes to `seek`:
295+
* The restriction to call seek only after `run` is removed.
296+
* Rather than the `autoCommit` property of `run` deciding if the offset is committed, the librdkafka property `enable.auto.commit` of the consumer config is used.
294297

295298
### Admin Client
296299

lib/kafkajs/_consumer.js

Lines changed: 155 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,19 @@ class Consumer {
5454
*/
5555
#state = ConsumerState.INIT;
5656

57+
/**
58+
* Denotes if there are any new pending seeks we need to check.
59+
* @type {boolean}
60+
*/
61+
#checkPendingSeeks = false;
62+
63+
/**
64+
* Contains a mapping of topic+partition to an offset that the user wants to seek to.
65+
* The keys are of the type "<topic>|<partition>".
66+
* @type {Map<string, number>}
67+
*/
68+
#pendingSeeks = new Map();
69+
5770
/**
5871
* @constructor
5972
* @param {import("../../types/kafkajs").ConsumerConfig} kJSConfig
@@ -99,13 +112,32 @@ class Consumer {
99112
}
100113

101114
call
102-
.finally(() => {
115+
.finally(async () => {
103116
// Emit the event
104117
this.#internalClient.emit('rebalance', err, assignment);
105118

106119
try {
107120
if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
121+
122+
if (this.#checkPendingSeeks)
123+
assignment = this.#assignAsPerSeekedOffsets(assignment);
124+
108125
this.#internalClient.assign(assignment);
126+
127+
if (this.#checkPendingSeeks) {
128+
const offsetsToCommit = assignment
129+
.filter((topicPartition) => topicPartition.offset !== undefined)
130+
.map((topicPartition) => ({
131+
topic: topicPartition.topic,
132+
partition: topicPartition.partition,
133+
offset: String(topicPartition.offset),
134+
}));
135+
136+
if (offsetsToCommit.length !== 0 && this.#rdKafkaConfig.globalConfig['enable.auto.commit']) {
137+
await this.#commitOffsetsUntilNoStateErr(offsetsToCommit);
138+
}
139+
}
140+
109141
} else {
110142
this.#internalClient.unassign();
111143
}
@@ -191,9 +223,13 @@ class Consumer {
191223
}
192224

193225
globalConfig['offset_commit_cb'] = true;
194-
if (this.#kJSConfig.rebalanceListener) {
195-
globalConfig['rebalance_cb'] = this.#rebalanceCallback.bind(this);
226+
227+
if (!Object.hasOwn(this.#kJSConfig, 'rebalanceListener')) {
228+
/* We might want to do certain things to maintain internal state in rebalance listener, so we need to set it to an empty object. */
229+
this.#kJSConfig.rebalanceListener = {};
196230
}
231+
globalConfig['rebalance_cb'] = this.#rebalanceCallback.bind(this);
232+
197233
return { globalConfig, topicConfig };
198234
}
199235

@@ -417,6 +453,14 @@ class Consumer {
417453
* array/list until it can be processed, because librdkafka marks it as
418454
* 'stored'... but anyway - we can implement something like this.
419455
*/
456+
if (this.#checkPendingSeeks) {
457+
const invalidateMessage = await this.#seekInternal({ topic: m.topic, partition: m.partition });
458+
if (invalidateMessage) {
459+
/* Don't pass this message on to the user if this topic partition was seeked to. */
460+
continue;
461+
}
462+
}
463+
420464
await config.eachMessage(
421465
this.#createPayload(m)
422466
)
@@ -429,6 +473,17 @@ class Consumer {
429473
}
430474
}
431475

476+
async #commitOffsetsUntilNoStateErr(offsetsToCommit) {
477+
let err = { code: error.ErrorCodes.ERR_NO_ERROR };
478+
do {
479+
try {
480+
await this.commitOffsets(offsetsToCommit);
481+
} catch (e) {
482+
err = e;
483+
}
484+
} while (err.code && err.code === error.ErrorCodes.ERR__STATE);
485+
}
486+
432487
/**
433488
* Commit offsets for the given topic partitions. If topic partitions are not specified, commits all offsets.
434489
* @param {import("../../types/kafkajs").TopicPartitionOffsetAndMetadata[]?} topicPartitions
@@ -443,9 +498,9 @@ class Consumer {
443498
if (topicPartitions === null) {
444499
this.#internalClient.commitSync();
445500
} else {
446-
const topicPartitions = topicPartitions.map(
501+
const topicPartitionsRdKafka = topicPartitions.map(
447502
topicPartitionOffsetToRdKafka);
448-
this.#internalClient.commitSync(topicPartitions);
503+
this.#internalClient.commitSync(topicPartitionsRdKafka);
449504
}
450505
} catch (e) {
451506
if (!e.code || e.code !== error.ErrorCodes.ERR__NO_OFFSET) {
@@ -454,27 +509,109 @@ class Consumer {
454509
}
455510
}
456511

512+
/**
513+
* Apply pending seeks to topic partitions we have just obtained as a result of a rebalance.
514+
* @param {{topic: string, partition: number}[]} assignment The list of topic partitions to check for pending seeks.
515+
* @returns {{topic: string, partition: number, offset: number}[]} the new assignment with the offsets seeked to, which can be passed to assign().
516+
*/
517+
#assignAsPerSeekedOffsets(assignment) {
518+
const offsetsToCommit = [];
519+
520+
for (let i = 0; i < assignment.length; i++) {
521+
const topicPartition = assignment[i];
522+
const key = `${topicPartition.topic}|${topicPartition.partition}`;
523+
if (!this.#pendingSeeks.has(key))
524+
continue;
525+
526+
const offset = this.#pendingSeeks.get(key);
527+
this.#pendingSeeks.delete(key);
528+
529+
assignment[i].offset = offset;
530+
531+
offsetsToCommit.push({
532+
topic: topicPartition.topic,
533+
partition: topicPartition.partition,
534+
offset: String(offset),
535+
});
536+
}
537+
return assignment;
538+
}
539+
540+
/**
541+
* This method processes any pending seeks on partitions that are assigned to this consumer.
542+
* @param {{topic: string, partition: number}} messageTopicPartition If this method was triggered by a message, pass the topic partition of the message, else it's optional.
543+
* @returns whether the message that triggered this should be invalidated (if any).
544+
*/
545+
async #seekInternal(messageTopicPartition) {
546+
this.#checkPendingSeeks = false;
547+
548+
const assignment = this.assignment();
549+
const offsetsToCommit = [];
550+
let invalidateMessage = false;
551+
552+
for (const topicPartition of assignment) {
553+
const key = `${topicPartition.topic}|${topicPartition.partition}`;
554+
if (!this.#pendingSeeks.has(key))
555+
continue;
556+
557+
const offset = this.#pendingSeeks.get(key);
558+
this.#pendingSeeks.delete(key);
559+
560+
const topicPartitionOffset = {
561+
topic: topicPartition.topic,
562+
partition: topicPartition.partition,
563+
offset
564+
};
565+
566+
/* It's assumed that topicPartition is already assigned, and thus can be seeked to and committed to.
567+
* Errors are logged to detect bugs in the internal code. */
568+
this.#internalClient.seek(topicPartitionOffset, 0, err => err ? console.error(err) : null);
569+
offsetsToCommit.push({
570+
topic: topicPartition.topic,
571+
partition: topicPartition.partition,
572+
offset: String(offset),
573+
});
574+
575+
/* If we're seeking the same topic partition as in the message that triggers it, invalidate
576+
* the message. */
577+
if (messageTopicPartition && topicPartition.topic === messageTopicPartition.topic && topicPartition.partition === messageTopicPartition.partition) {
578+
invalidateMessage = true;
579+
}
580+
}
581+
582+
if (offsetsToCommit.length !== 0 && this.#rdKafkaConfig.globalConfig['enable.auto.commit']) {
583+
await this.#commitOffsetsUntilNoStateErr(offsetsToCommit);
584+
}
585+
586+
return invalidateMessage;
587+
}
588+
457589
/**
458590
* Seek to the given offset for the topic partition.
591+
* This method is completely asynchronous, and does not wait for the seek to complete.
592+
* In case any partitions that are seeked to, are not a part of the current assignment, they are stored internally.
593+
* If at any time, the consumer is assigned the partition, the seek will be performed.
594+
* Depending on the value of the librdkafka property 'enable.auto.commit', the consumer will commit the offset seeked to.
459595
* @param {import("../../types/kafkajs").TopicPartitionOffset} topicPartitionOffset
460-
* @returns {Promise<void>} a promise that resolves when the consumer has seeked.
461596
*/
462597
seek(topicPartitionOffset) {
463598
if (this.#state !== ConsumerState.CONNECTED) {
464599
throw new error.KafkaJSError('Seek can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
465600
}
466601

467-
return new Promise((resolve, reject) => {
468-
const rdKafkaTopicPartitionOffset =
469-
topicPartitionOffsetToRdKafka(topicPartitionOffset);
470-
this.#internalClient.seek(rdKafkaTopicPartitionOffset, 0, (err) => {
471-
if (err) {
472-
reject(createKafkaJsErrorFromLibRdKafkaError(err));
473-
} else {
474-
resolve();
475-
}
476-
});
477-
});
602+
const rdKafkaTopicPartitionOffset =
603+
topicPartitionOffsetToRdKafka(topicPartitionOffset);
604+
605+
if (typeof rdKafkaTopicPartitionOffset.topic !== 'string') {
606+
throw new error.KafkaJSError('Topic must be a string.', { code: error.ErrorCodes.ERR__INVALID_ARG });
607+
}
608+
609+
if (isNaN(rdKafkaTopicPartitionOffset.offset) || (rdKafkaTopicPartitionOffset.offset < 0 && rdKafkaTopicPartitionOffset.offset !== -2 && rdKafkaTopicPartitionOffset.offset !== -3)) {
610+
throw new error.KafkaJSError('Offset must be >= 0, or a special value.', { code: error.ErrorCodes.ERR__INVALID_ARG });
611+
}
612+
613+
this.#checkPendingSeeks = true;
614+
this.#pendingSeeks.set(`${rdKafkaTopicPartitionOffset.topic}|${rdKafkaTopicPartitionOffset.partition}`, rdKafkaTopicPartitionOffset.offset);
478615
}
479616

480617
async describeGroup() {
@@ -483,7 +620,7 @@ class Consumer {
483620

484621
/**
485622
* Find the assigned topic partitions for the consumer.
486-
* @returns {import("../../types").TopicPartition[]} the current assignment.
623+
* @returns {import("../../types/kafkajs").TopicPartition[]} the current assignment.
487624
*/
488625
assignment() {
489626
if (this.#state !== ConsumerState.CONNECTED) {

0 commit comments

Comments
 (0)