Skip to content

Commit 2f4bd76

Browse files
committed
Add naive batching (without resolution handling)
1 parent 9a2f9c4 commit 2f4bd76

File tree

1 file changed

+111
-50
lines changed

1 file changed

+111
-50
lines changed

lib/kafkajs/_consumer.js

Lines changed: 111 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -617,52 +617,60 @@ class Consumer {
617617
}
618618

619619
/**
620-
* Converts a message returned by node-rdkafka into a message that can be used by the eachBatch callback.
621-
* @param {import("../..").Message} message
620+
* Converts a list of messages returned by node-rdkafka into a message that can be used by the eachBatch callback.
621+
* @param {import("../..").Message[]} messages - must not be empty. Must contain messages from the same topic and partition.
622622
* @returns {import("../../types/kafkajs").EachBatchPayload}
623-
* @note Unlike the KafkaJS consumer, a batch here is for API compatibility only. It is always a single message.
624623
*/
625-
#createBatchPayload(message) {
626-
let key = message.key;
627-
if (typeof key === 'string') {
628-
key = Buffer.from(key);
629-
}
624+
#createBatchPayload(messages) {
625+
const topic = messages[0].topic;
626+
const partition = messages[0].partition;
627+
628+
const messagesConverted = [];
629+
for (let i = 0; i < messages.length; i++) {
630+
const message = messages[i];
631+
let key = message.key;
632+
if (typeof key === 'string') {
633+
key = Buffer.from(key);
634+
}
630635

631-
let timestamp = message.timestamp ? String(message.timestamp) : '';
636+
let timestamp = message.timestamp ? String(message.timestamp) : '';
632637

633-
let headers;
634-
if (message.headers) {
635-
headers = {}
636-
for (const [key, value] of Object.entries(message.headers)) {
637-
if (!Object.hasOwn(headers, key)) {
638-
headers[key] = value;
639-
} else if (headers[key].constructor === Array) {
640-
headers[key].push(value);
641-
} else {
642-
headers[key] = [headers[key], value];
638+
let headers;
639+
if (message.headers) {
640+
headers = {}
641+
for (const [key, value] of Object.entries(message.headers)) {
642+
if (!Object.hasOwn(headers, key)) {
643+
headers[key] = value;
644+
} else if (headers[key].constructor === Array) {
645+
headers[key].push(value);
646+
} else {
647+
headers[key] = [headers[key], value];
648+
}
643649
}
644650
}
645-
}
646651

647-
const messageConverted = {
648-
key,
649-
value: message.value,
650-
timestamp,
651-
attributes: 0,
652-
offset: String(message.offset),
653-
size: message.size,
654-
leaderEpoch: message.leaderEpoch,
655-
headers
656-
};
652+
const messageConverted = {
653+
key,
654+
value: message.value,
655+
timestamp,
656+
attributes: 0,
657+
offset: String(message.offset),
658+
size: message.size,
659+
leaderEpoch: message.leaderEpoch,
660+
headers
661+
};
662+
663+
messagesConverted.push(messageConverted);
664+
}
657665

658666
const batch = {
659-
topic: message.topic,
660-
partition: message.partition,
667+
topic,
668+
partition,
661669
highWatermark: '-1001', // Invalid - we don't fetch it
662-
messages: [messageConverted],
670+
messages: messagesConverted,
663671
isEmpty: () => false,
664-
firstOffset: () => messageConverted.offset,
665-
lastOffset: () => messageConverted.offset,
672+
firstOffset: () => messagesConverted[0].offset,
673+
lastOffset: () => messagesConverted[messagesConverted.length - 1].offset,
666674
offsetLag: () => notImplemented(),
667675
offsetLagLow: () => notImplemented(),
668676
};
@@ -672,7 +680,7 @@ class Consumer {
672680
_messageResolved: false,
673681
resolveOffset: () => { returnPayload._messageResolved = true; },
674682
heartbeat: async () => { /* no op */ },
675-
pause: this.pause.bind(this, [{ topic: message.topic, partitions: [message.partition] }]),
683+
pause: this.pause.bind(this, [{ topic, partitions: [partition] }]),
676684
commitOffsetsIfNecessary: async () => { /* no op */ },
677685
uncommittedOffsets: () => notImplemented(),
678686
isRunning: () => this.#running,
@@ -685,7 +693,7 @@ class Consumer {
685693
/**
686694
* Consumes a single message from the internal consumer.
687695
* @param {number} savedIndex - the index of the message in the cache to return.
688-
* @returns {Promise<import("../..").Message>} a promise that resolves to a single message.
696+
* @returns {Promise<import("../..").Message | null>} a promise that resolves to a single message or null.
689697
* @note this method caches messages as well, but returns only a single message.
690698
*/
691699
async #consumeSingleCached(savedIndex) {
@@ -726,6 +734,52 @@ class Consumer {
726734
});
727735
}
728736

737+
/**
738+
* Consumes a single message from the internal consumer.
739+
* @param {number} savedIndex - the index of the message in the cache to return.
740+
* @param {number} size - the number of messages to fetch.
741+
* @returns {Promise<import("../..").Message[] | null>} a promise that resolves to a list of messages or null.
742+
* @note this method caches messages as well.
743+
* @sa #consumeSingleCached
744+
*/
745+
async #consumeCachedN(savedIndex, size) {
746+
const msgs = this.#messageCache.nextN(savedIndex, size);
747+
if (msgs) {
748+
return msgs;
749+
}
750+
751+
/* It's possible that we get msgs = null, but that's because partitionConcurrency
752+
* exceeds the number of partitions containing messages. So in this case,
753+
* we should not call for new fetches, rather, try to focus on what we have left.
754+
*/
755+
if (!msgs && this.#messageCache.pendingSize() !== 0) {
756+
return null;
757+
}
758+
759+
if (this.#fetchInProgress) {
760+
return null;
761+
}
762+
763+
this.#fetchInProgress = true;
764+
return new Promise((resolve, reject) => {
765+
this.#internalClient.consume(this.#messageCache.maxSize, (err, messages) => {
766+
this.#fetchInProgress = false;
767+
if (err) {
768+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
769+
return;
770+
}
771+
this.#messageCache.addMessages(messages);
772+
const msgsList = this.#messageCache.nextN(-1, size);
773+
if (messages.length === this.#messageCache.maxSize) {
774+
this.#messageCache.increaseMaxSize();
775+
} else {
776+
this.#messageCache.decreaseMaxSize(messages.length);
777+
}
778+
resolve(msgsList);
779+
});
780+
});
781+
}
782+
729783
/**
730784
* Consumes n messages from the internal consumer.
731785
* @returns {Promise<import("../..").Message[]>} a promise that resolves to a list of messages.
@@ -964,15 +1018,15 @@ class Consumer {
9641018
}
9651019

9661020
/**
967-
* Processes a batch message (a single message as of now).
1021+
* Processes a batch of messages.
9681022
*
969-
* @param m Message as obtained from #consumeSingleCached.
1023+
* @param ms Messages as obtained from #consumeCachedN.
9701024
* @param config Config as passed to run().
9711025
* @returns {Promise<number>} the cache index of the message that was processed.
9721026
*/
973-
async #batchProcessor(m, config) {
1027+
async #batchProcessor(ms, config) {
9741028
let eachMessageProcessed = false;
975-
const payload = this.#createBatchPayload(m);
1029+
const payload = this.#createBatchPayload(ms);
9761030
try {
9771031
await config.eachBatch(payload);
9781032
if (config.eachBatchAutoResolve) {
@@ -1005,23 +1059,25 @@ class Consumer {
10051059
}
10061060

10071061
/* If the message is unprocessed, due to an error, or because the user has not resolved it, we seek back. */
1062+
/* TODO: currently we're seeking to just the first offset. Fix this to take care of messages we are resolving. */
10081063
if (!eachMessageProcessed) {
10091064
await this.seek({
1010-
topic: m.topic,
1011-
partition: m.partition,
1012-
offset: m.offset,
1065+
topic: ms[0].topic,
1066+
partition: ms[0].partition,
1067+
offset: ms[0].offset,
10131068
});
10141069
}
10151070

10161071
/* Store the offsets we need to store, or at least record them for cache invalidation reasons. */
1072+
/* TODO: currently we just store the last offset of the batch. Fix it to store the last resolved one + 1. */
10171073
if (eachMessageProcessed) {
10181074
try {
10191075
if (!this.#userManagedStores) {
10201076
this.#internalClient.offsetsStore([{
1021-
topic: m.topic, partition: m.partition, offset: Number(m.offset) + 1, leaderEpoch: m.leaderEpoch
1077+
topic: ms[ms.length - 1].topic, partition: ms[ms.length - 1].partition, offset: Number(ms[ms.length - 1].offset) + 1, leaderEpoch: ms[ms.length - 1].leaderEpoch
10221078
}]);
10231079
}
1024-
this.#lastConsumedOffsets.set(partitionKey(m), Number(m.offset) + 1);
1080+
this.#lastConsumedOffsets.set(partitionKey(ms[ms.length - 1]), Number(ms[ms.length - 1].offset) + 1);
10251081
} catch (e) {
10261082
/* Not much we can do, except log the error. */
10271083
if (this.#logger)
@@ -1035,7 +1091,7 @@ class Consumer {
10351091
if (this.#checkPendingSeeks)
10361092
await this.#seekInternal();
10371093

1038-
return m.index;
1094+
return ms.index;
10391095
}
10401096

10411097
/**
@@ -1050,7 +1106,7 @@ class Consumer {
10501106
*
10511107
* Worker termination acts as a async barrier.
10521108
*/
1053-
async #worker(config, perMessageProcessor, id) {
1109+
async #worker(config, perMessageProcessor, fetcher, id) {
10541110
let nextIdx = -1;
10551111
while (!this.#workerTerminationScheduled) {
10561112
/* Invalidate the message cache if needed */
@@ -1066,7 +1122,7 @@ class Consumer {
10661122
continue;
10671123
}
10681124

1069-
const m = await this.#consumeSingleCached(nextIdx).catch(e => {
1125+
const m = await fetcher(nextIdx).catch(e => {
10701126
/* Since this error cannot be exposed to the user in the current situation, just log and retry.
10711127
* This is due to restartOnFailure being set to always true. */
10721128
if (this.#logger)
@@ -1097,13 +1153,18 @@ class Consumer {
10971153
async #runInternal(config) {
10981154
this.#concurrency = config.partitionsConsumedConcurrently;
10991155
const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor;
1156+
/* TODO: make this dynamic, based on max batch size / size of last message seen. */
1157+
const maxBatchSize = 30;
1158+
const fetcher = config.eachMessage
1159+
? (savedIdx) => this.#consumeSingleCached(savedIdx)
1160+
: (savedIdx) => this.#consumeCachedN(savedIdx, maxBatchSize);
11001161
this.#workers = [];
11011162
while (!(await acquireOrLog(this.#lock, this.#logger)));
11021163

11031164
while (!this.#disconnectStarted) {
11041165
this.#workerTerminationScheduled = false;
11051166
const workersToSpawn = Math.max(1, Math.min(this.#concurrency, this.#partitionCount));
1106-
this.#workers = Array(workersToSpawn).fill().map((_, i) => this.#worker(config, perMessageProcessor.bind(this), i));
1167+
this.#workers = Array(workersToSpawn).fill().map((_, i) => this.#worker(config, perMessageProcessor.bind(this), fetcher.bind(this), i));
11071168
await Promise.all(this.#workers);
11081169

11091170
/* One of the possible reasons for the workers to end is that the cache is globally stale.

0 commit comments

Comments
 (0)