Skip to content

Commit 28d2253

Browse files
committed
Allow cache to disburse multiple messages at once
1 parent a728b4a commit 28d2253

File tree

4 files changed

+2483
-22
lines changed

4 files changed

+2483
-22
lines changed

LICENSE.heap-js

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
Code from https://github.com/ignlg/heap-js/ | Commit Hash: a3bb403 | dist/heap-js.es5.js
2+
3+
Code used in lib/kafkajs/_heap.js
4+
5+
----
6+
7+
8+
BSD 3-Clause License
9+
10+
Copyright (c) 2017, Ignacio Lago
11+
All rights reserved.
12+
13+
Redistribution and use in source and binary forms, with or without
14+
modification, are permitted provided that the following conditions are met:
15+
16+
* Redistributions of source code must retain the above copyright notice, this
17+
list of conditions and the following disclaimer.
18+
19+
* Redistributions in binary form must reproduce the above copyright notice,
20+
this list of conditions and the following disclaimer in the documentation
21+
and/or other materials provided with the distribution.
22+
23+
* Neither the name of the copyright holder nor the names of its
24+
contributors may be used to endorse or promote products derived from
25+
this software without specific prior written permission.
26+
27+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
28+
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29+
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
30+
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
31+
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
32+
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
33+
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
34+
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
35+
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
36+
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

lib/kafkajs/_consumer.js

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ class Consumer {
353353

354354
if (Object.hasOwn(kjsConfig, 'rebalanceTimeout')) {
355355
/* In librdkafka, we use the max poll interval as the rebalance timeout as well. */
356-
rdKafkaConfig['max.poll.interval.ms'] = kjsConfig.rebalanceTimeout;
356+
rdKafkaConfig['max.poll.interval.ms'] = +kjsConfig.rebalanceTimeout;
357357
} else {
358358
rdKafkaConfig['max.poll.interval.ms'] = 300000; /* librdkafka default */
359359
}
@@ -627,15 +627,24 @@ class Consumer {
627627

628628
/**
629629
* Consumes a single message from the internal consumer.
630+
* @param {number} savedIndex - the index of the message in the cache to return.
630631
* @returns {Promise<import("../..").Message>} a promise that resolves to a single message.
631632
* @note this method caches messages as well, but returns only a single message.
632633
*/
633-
async #consumeSingleCached() {
634-
const msg = this.#messageCache.next();
634+
async #consumeSingleCached(savedIndex) {
635+
const msg = this.#messageCache.next(savedIndex);
635636
if (msg) {
636637
return msg;
637638
}
638639

640+
// TODO: Add this block for concurrency
641+
// if (!msg) {
642+
// // it's possible that we get msg = null, but that's because partitionConcurrency
643+
// // exceeds the number of partitions containing messages. So in this case,
644+
// // we should not call for new fetches, rather, try to focus on what we have left.
645+
// return null;
646+
// }
647+
639648
return new Promise((resolve, reject) => {
640649
this.#internalClient.consume(this.#messageCache.maxSize, (err, messages) => {
641650
if (err) {
@@ -712,7 +721,8 @@ class Consumer {
712721
}
713722

714723
const rdKafkaConfig = this.#config();
715-
this.#messageCache = new MessageCache(Math.floor(rdKafkaConfig['max.poll.interval.ms'] * 0.8));
724+
const maxPollInterval = rdKafkaConfig['max.poll.interval.ms'] ?? 300000;
725+
this.#messageCache = new MessageCache(Math.floor(maxPollInterval * 0.8), 1);
716726

717727
this.#state = ConsumerState.CONNECTING;
718728
this.#internalClient = new RdKafka.KafkaConsumer(rdKafkaConfig);
@@ -830,6 +840,7 @@ class Consumer {
830840
/* Internal polling loop.
831841
* It accepts the same config object that `run` accepts, but config.eachMessage must be set. */
832842
async #runInternalEachMessage(config) {
843+
let savedIdx = -1;
833844
while (this.#state === ConsumerState.CONNECTED) {
834845

835846
/* We need to acquire a lock here, because we need to ensure that we don't
@@ -840,26 +851,34 @@ class Consumer {
840851
/* Invalidate the message cache if needed */
841852
const locallyStale = this.#messageCache.popLocallyStale();
842853
if (this.#messageCache.isStale()) { /* global staleness */
854+
// TODO: await all concurrent promises for eachMessage here.
843855
await this.#clearCacheAndResetPositions();
844856
await this.#lock.release();
845857
continue;
846858
} else if (locallyStale.length !== 0) { /* local staleness */
859+
// TODO: is it correct to await some concurrent promises for eachMessage here?
860+
// to be safe we can do it, but I don't think we really need to do that for
861+
// correctness.
847862
await this.#clearCacheAndResetPositions(locallyStale);
848863
await this.#lock.release();
849864
continue;
850865
}
851866

852-
const m = await this.#consumeSingleCached().catch(e => {
867+
const m = await this.#consumeSingleCached(savedIdx).catch(e => {
853868
/* Since this error cannot be exposed to the user in the current situation, just log and retry.
854869
* This is due to restartOnFailure being set to always true. */
855870
if (this.#logger)
856871
this.#logger.error(`Consumer encountered error while consuming. Retrying. Error details: ${e} : ${e.stack}`);
857872
});
858873

859874
if (!m) {
875+
// await all concurrency related promises right here if this is null, if any such promise exists.
876+
// see note in consumeSingleCached
877+
savedIdx = -1;
860878
await this.#lock.release();
861879
continue;
862880
}
881+
savedIdx = m.index;
863882

864883
/* TODO: add partitionsConsumedConcurrently-based concurrency here.
865884
* If we maintain a map of topic partitions to promises, and a counter,
@@ -944,6 +963,7 @@ class Consumer {
944963
/* Internal polling loop.
945964
* It accepts the same config object that `run` accepts, but config.eachBatch must be set. */
946965
async #runInternalEachBatch(config) {
966+
let savedIdx = -1;
947967
while (this.#state === ConsumerState.CONNECTED) {
948968

949969
/* We need to acquire a lock here, because we need to ensure that we don't
@@ -963,17 +983,19 @@ class Consumer {
963983
continue;
964984
}
965985

966-
const m = await this.#consumeSingleCached().catch(e => {
986+
const m = await this.#consumeSingleCached(savedIdx).catch(e => {
967987
/* Since this error cannot be exposed to the user in the current situation, just log and retry.
968988
* This is due to restartOnFailure being set to always true. */
969989
if (this.#logger)
970990
this.#logger.error(`Consumer encountered error while consuming. Retrying. Error details: ${JSON.stringify(e)}`);
971991
});
972992

973993
if (!m) {
994+
savedIdx = -1;
974995
await this.#lock.release();
975996
continue;
976997
}
998+
savedIdx = m.index;
977999

9781000
/* TODO: add partitionsConsumedConcurrently-based concurrency here.
9791001
* If we maintain a map of topic partitions to promises, and a counter,

lib/kafkajs/_consumer_cache.js

Lines changed: 66 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ const { hrtime } = require('process');
22
const {
33
partitionKey,
44
} = require('./_common');
5+
const { Heap } = require('./_heap');
56

67
/**
78
* A PerPartitionMessageCache is a cache for messages for a single partition.
@@ -62,13 +63,13 @@ class PerPartitionMessageCache {
6263
*/
6364
class MessageCache {
6465

65-
constructor(expiryDurationMs) {
66+
constructor(expiryDurationMs, maxConcurrency) {
6667
/* Per partition cache list containing non-empty PPCs */
6768
this.ppcList = [];
6869
/* Map of topic+partition to PerPartitionMessageCache. */
6970
this.tpToPpc = new Map();
7071
/* Index of the current PPC in the ppcList. */
71-
this.currentPpc = 0;
72+
this.currentPpcTODO_remove_this = 0;
7273
/* Maximum size of the cache. (Capacity) */
7374
this.maxSize = 1;
7475
/* Number of times the size has been increased in a row, used for accounting for maxSize. */
@@ -81,6 +82,15 @@ class MessageCache {
8182
this.expiryDurationMs = expiryDurationMs;
8283
/* A list of caches which have been marked stale since the last call to popLocallyStale or addMessages. */
8384
this.locallyStaleCaches = [];
85+
/* Max allowed concurrency */
86+
this.maxConcurrency = maxConcurrency;
87+
/* Contains a list of indices of ppcList from which we are allowed to consume. */
88+
this.indices = new Heap();
89+
/* Largest ppc index we are allowed to consume from (inclusive). */
90+
this.maxIndicesIndex = 0;
91+
/* Contains a list of indices of ppcList from which we have sent a message returned through next, but
92+
* the user has not returned the index back to us via next(idx) */
93+
this.pendingIndices = new Set();
8494
}
8595

8696
/**
@@ -226,7 +236,6 @@ class MessageCache {
226236
* run out of messages. We need to clear them, else #add() will not add
227237
* them back to the ppcList since they're not empty. */
228238
this.ppcList.forEach(cache => cache.clear());
229-
this.currentPpc = 0;
230239
this.ppcList = [];
231240

232241
if (this.locallyStaleCaches.length !== 0 && this.locallyStaleCaches.some(tp => {
@@ -245,34 +254,72 @@ class MessageCache {
245254

246255
// TODO: add ppcList sort step.
247256
// Rationale: ideally it's best to consume in the ascending order of timestamps.
257+
258+
/* Reset the indices and pendingIndices because ppcList is being created newly. */
259+
this.indices.clear();
260+
if (this.pendingIndices.size > 0) console.error('addMessages: pendingIndices = ', this.pendingIndices, console.trace());
261+
this.pendingIndices.clear();
262+
this.maxIndicesIndex = Math.min(this.maxConcurrency, this.ppcList.length - 1);
263+
for (let i = 0; i <= this.maxIndicesIndex; i++) {
264+
this.indices.push(i);
265+
}
248266
}
249267

250268
/**
251269
* Returns the next element in the cache, or null if none exists.
252270
*
253271
* If the current PPC is exhausted, it moves to the next PPC.
254272
* If all PPCs are exhausted, it returns null.
273+
* @param {number} idx - after a consumer has consumed a message, it must return the index back to us via this parameter.
274+
* otherwise, no messages from that topic partition will be consumed.
275+
* @returns {Object} - the next message in the cache, or null if none exists. An `index` field is added to the message.
255276
* @warning Does not check for global staleness. That is left up to the user.
256277
* Skips locally stale messages.
278+
* The topicPartition, if provided, MUST be one such that the user has fetched
279+
* the message from the same topicPartition earlier.
280+
* @note Whenever making changes to this function, ensure that you benchmark perf.
257281
*/
258-
next() {
259-
if (this.currentPpc >= this.ppcList.length) {
260-
return null;
282+
next(idx = -1) {
283+
let index = idx;
284+
if (!this.pendingIndices.has(index)) {
285+
/* The user is behaving well by returning the index to us, but in the meanwhile, it's possible
286+
* that we ran out of messages and fetched a new batch. So we just discard what the user is
287+
* returning to us. */
288+
index = -1;
289+
} else {
290+
this.pendingIndices.delete(index);
261291
}
262292

263-
let next = null;
264-
while (next === null && this.currentPpc < this.ppcList.length) {
265-
if (this.ppcList[this.currentPpc].isStale()) {
266-
this.currentPpc++;
293+
if (index === -1) {
294+
if (this.indices.size() === 0)
295+
return null;
296+
index = this.indices.pop(); // index cannot be undefined here since indices.size > 0
297+
}
298+
299+
while (true) {
300+
const next = this.ppcList[index].next();
301+
if (this.ppcList[index].isStale() || next === null) {
302+
/* If the current PPC is stale or empty, then we move on to the next one.
303+
* It is equally valid to choose any PPC available within this.indices, or else
304+
* move on to the next PPC (maxIndicesIndex + 1) if available.
305+
* We prefer the second option a bit more since we don't have to do a heap operation. */
306+
const toAdd = this.maxIndicesIndex + 1;
307+
if (toAdd < this.ppcList.length) {
308+
this.maxIndicesIndex = toAdd;
309+
index = toAdd;
310+
} else if (!this.indices.isEmpty()) {
311+
index = this.indices.pop()
312+
} else {
313+
break; // nothing left.
314+
}
267315
continue;
268316
}
269317

270-
next = this.ppcList[this.currentPpc].next();
271-
if (next !== null)
272-
break;
273-
this.currentPpc++;
318+
this.pendingIndices.add(index);
319+
next.index = index;
320+
return next;
274321
}
275-
return next; // Caller is responsible for triggering fetch logic here if next == null.
322+
return null; // Caller is responsible for triggering fetch logic here if next == null.
276323
}
277324

278325
/**
@@ -286,12 +333,15 @@ class MessageCache {
286333
cache.clear();
287334
}
288335
this.ppcList = [];
289-
this.currentPpc = 0;
290336
this.maxSize = 1;
291337
this.increaseCount = 0;
292338
this.stale = false;
293339
this.cachedTime = hrtime();
294340
this.locallyStaleCaches = [];
341+
this.indices.clear();
342+
// if (this.pendingIndices.size > 0) console.log('clear: pendingIndices = ', this.pendingIndices, console.);
343+
this.pendingIndices.clear();
344+
this.currentIndex = 0;
295345
}
296346
}
297347

0 commit comments

Comments
 (0)