Skip to content

Commit 818ce7b

Browse files
committed
Add tests for consumer.pause() and consumer.resume()
1 parent 85b1341 commit 818ce7b

File tree

4 files changed

+759
-30
lines changed

4 files changed

+759
-30
lines changed

MIGRATION.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,11 +289,13 @@
289289
* The `partitionsConsumedConcurrently` property is not supported (YET).
290290
* The `eachBatch` method is not supported.
291291
* `commitOffsets` does not (YET) support sending metadata for topic partitions being committed.
292-
* `paused()` is not (YET) supported.
292+
* `paused()` is supported without any changes.
293293
* Custom partition assignors are not supported.
294294
* Changes to `seek`:
295-
* The restriction to call seek only after `run` is removed.
295+
* The restriction to call seek only after `run` is removed. It can be called any time.
296296
* Rather than the `autoCommit` property of `run` deciding if the offset is committed, the librdkafka property `enable.auto.commit` of the consumer config is used.
297+
* `pause` and `resume` MUST be called after the consumer group is joined. In practice, this means it can be called whenever `consumer.assignment()` has a non-zero size, or within the `eachMessage`
298+
callback.
297299

298300
### Admin Client
299301

lib/kafkajs/_consumer.js

Lines changed: 86 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ class Consumer {
6767
*/
6868
#pendingSeeks = new Map();
6969

70+
/**
71+
* Stores the list of paused partitions, as a set of JSON.stringify'd TopicPartition objects.
72+
* @type {Set<string>}
73+
*/
74+
#pausedPartitions = new Set();
75+
7076
/**
7177
* @constructor
7278
* @param {import("../../types/kafkajs").ConsumerConfig} kJSConfig
@@ -300,7 +306,7 @@ class Consumer {
300306
headers
301307
},
302308
heartbeat: async () => { /* no op */ },
303-
pause: () => this.pause([{ topic: message.topic, partitions: [message.partition] }]),
309+
pause: this.pause.bind(this, [{ topic: message.topic, partitions: [message.partition] }]),
304310
};
305311
}
306312

@@ -443,33 +449,56 @@ class Consumer {
443449

444450
while (this.#state === ConsumerState.CONNECTED) {
445451
const m = await this.#consumeSingle();
446-
if (m) {
447-
/* TODO: add partitionsConsumedConcurrently-based concurrency here.
448-
* If we maintain a map of topic partitions to promises, and a counter,
449-
* we can probably achieve it with the correct guarantees of ordering
450-
* though to maximize performance, we need to consume only from partitions for which
451-
* an eachMessage call is not already going.
452-
* It's risky to consume, and then store the message in something like an
453-
* array/list until it can be processed, because librdkafka marks it as
454-
* 'stored'... but anyway - we can implement something like this.
455-
*/
456-
if (this.#checkPendingSeeks) {
457-
const invalidateMessage = await this.#seekInternal({ topic: m.topic, partition: m.partition });
458-
if (invalidateMessage) {
459-
/* Don't pass this message on to the user if this topic partition was seeked to. */
460-
continue;
461-
}
452+
if (!m) {
453+
continue;
454+
}
455+
456+
/* TODO: add partitionsConsumedConcurrently-based concurrency here.
457+
* If we maintain a map of topic partitions to promises, and a counter,
458+
* we can probably achieve it with the correct guarantees of ordering
459+
* though to maximize performance, we need to consume only from partitions for which
460+
* an eachMessage call is not already going.
461+
* It's risky to consume, and then store the message in something like an
462+
* array/list until it can be processed, because librdkafka marks it as
463+
* 'stored'... but anyway - we can implement something like this.
464+
*/
465+
466+
/* Make pending seeks 'concrete'. */
467+
if (this.#checkPendingSeeks) {
468+
const invalidateMessage = await this.#seekInternal({ topic: m.topic, partition: m.partition });
469+
if (invalidateMessage) {
470+
/* Don't pass this message on to the user if this topic partition was seeked to. */
471+
continue;
462472
}
473+
}
463474

475+
try {
464476
await config.eachMessage(
465477
this.#createPayload(m)
466478
)
467-
/* TODO: another check we need to do here is to see how kafkaJS is handling
468-
* commits. Are they commmitting after a message is _processed_?
469-
* In that case we need to turn off librdkafka's auto-commit, and commit
470-
* inside this function.
471-
*/
479+
} catch (e) {
480+
/* It's not only possible, but expected that an error will be thrown by eachMessage.
481+
* This is especially true since the pattern of pause() followed by throwing an error
482+
* is encouraged. To meet the API contract, we seek one offset backward at this point (which
483+
* means seeking to the message offset). */
484+
this.seek({
485+
topic: m.topic,
486+
partition: m.partition,
487+
offset: m.offset,
488+
});
472489
}
490+
491+
/* Force a immediate seek here. It's possible that there are no more messages to be passed to the user,
492+
* but the user seeked in the call to eachMessage, or else we encountered the error catch block.
493+
* In that case, the results of that seek will never be reflected unless we do this. */
494+
if (this.#checkPendingSeeks)
495+
await this.#seekInternal();
496+
497+
/* TODO: another check we need to do here is to see how kafkaJS is handling
498+
* commits. Are they commmitting after a message is _processed_?
499+
* In that case we need to turn off librdkafka's auto-commit, and commit
500+
* inside this function.
501+
*/
473502
}
474503
}
475504

@@ -646,13 +675,18 @@ class Consumer {
646675
* all partitions for the given topic. If topic partition(s) are already paused
647676
* this method has no effect.
648677
* @param {{topic: string, partitions?: number[]}[]} topics
678+
* @returns {Function} a function that can be called to resume the given topic partitions.
649679
*/
650680
pause(topics) {
651681
if (this.#state !== ConsumerState.CONNECTED) {
652682
throw new error.KafkaJSError('Pause can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
653683
}
654684

655685
for (let topic of topics) {
686+
if (typeof topic.topic !== 'string') {
687+
throw new error.KafkaJSError('Topic must be a string.', { code: error.ErrorCodes.ERR__INVALID_ARG });
688+
}
689+
656690
if (!topic.partitions) {
657691
topic.partitions = this.#getAllAssignedPartition(topic.topic);
658692
}
@@ -662,12 +696,31 @@ class Consumer {
662696
if (topics.length === 0) {
663697
return;
664698
}
665-
666699
this.#internalClient.pause(topics);
700+
701+
topics.map(JSON.stringify).forEach(topicPartition => this.#pausedPartitions.add(topicPartition));
702+
703+
return () => this.resume(topics);
667704
}
668705

706+
/**
707+
* Returns the list of paused topic partitions.
708+
* @returns {{topic: string, partitions: number[]}[]} a list of paused topic partitions.
709+
*/
669710
paused() {
670-
notImplemented();
711+
const topicToPartitions = Array
712+
.from(this.#pausedPartitions.values())
713+
.map(JSON.parse)
714+
.reduce(
715+
(acc, { topic, partition }) => {
716+
if (!acc[topic]) {
717+
acc[topic] = [];
718+
}
719+
acc[topic].push(partition);
720+
return acc;
721+
},
722+
{});
723+
return Array.from(Object.entries(topicToPartitions), ([topic, partitions]) => ({ topic, partitions }));
671724
}
672725

673726

@@ -683,13 +736,22 @@ class Consumer {
683736
}
684737

685738
for (let topic of topics) {
739+
if (typeof topic.topic !== 'string') {
740+
throw new error.KafkaJSError('Topic must be a string.', { code: error.ErrorCodes.ERR__INVALID_ARG });
741+
}
742+
686743
if (!topic.partitions) {
687744
topic.partitions = this.#getAllAssignedPartition(topic.topic);
688745
}
689746
}
690747

691748
topics = this.#flattenTopicPartitions(topics);
749+
if (topics.length === 0) {
750+
return;
751+
}
692752
this.#internalClient.resume(topics);
753+
754+
topics.map(JSON.stringify).forEach(topicPartition => this.#pausedPartitions.delete(topicPartition));
693755
}
694756

695757
on(/* eventName, listener */) {

0 commit comments

Comments
 (0)