Skip to content

Commit 9d3130f

Browse files
committed
Add per-partition cache with global expiry
1 parent 16d9547 commit 9d3130f

File tree

3 files changed

+194
-53
lines changed

3 files changed

+194
-53
lines changed

lib/kafkajs/_common.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,15 @@ async function acquireOrLog(lock, logger) {
689689
return false;
690690
}
691691

692+
/**
693+
* Creates a key for maps from a topicPartition object.
694+
* @param {{topic: string, partition: number}} topicPartition Any object which can be treated as a topic partition.
695+
* @returns {string} The created key.
696+
*/
697+
function partitionKey(topicPartition) {
698+
return topicPartition.topic + '|'+ (topicPartition.partition);
699+
}
700+
692701
module.exports = {
693702
kafkaJSToRdKafkaConfig,
694703
topicPartitionOffsetToRdKafka,
@@ -707,4 +716,5 @@ module.exports = {
707716
checkIfKafkaJsKeysPresent,
708717
Lock,
709718
acquireOrLog,
719+
partitionKey,
710720
};

lib/kafkajs/_consumer.js

Lines changed: 176 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ const {
1515
checkAllowedKeys,
1616
logLevel,
1717
Lock,
18-
acquireOrLog
18+
acquireOrLog,
19+
partitionKey,
1920
} = require('./_common');
2021
const { Buffer } = require('buffer');
2122
const { hrtime } = require('process');
@@ -36,47 +37,129 @@ const PartitionAssigners = Object.freeze({
3637

3738

3839
/**
39-
* MessageCache represents a cache of messages that have been consumed,
40-
* but not yet passed to the user.
41-
* It has a dynamic capacity, increased or decreased based on requirement.
40+
* A PerPartitionMessageCache is a cache for messages for a single partition.
4241
*/
43-
class MessageCache {
42+
class PerPartitionMessageCache {
4443
/* The cache is a list of messages. */
4544
cache = [];
46-
/* The maximum size of the cache. Set to 1 initially. */
47-
maxSize = 1;
4845
/* Index of next element to be fetched in the cache. */
49-
currentIndex = this.maxSize;
46+
currentIndex = 0;
5047
/* Whether the cache is stale. */
5148
stale = false;
52-
/* Number of times the cache has been requested to be increased in size. */
53-
increaseCount = 0;
54-
/* Last cached time */
55-
cachedTime = hrtime();
56-
/* Expiry duration for this cache */
57-
expiryDurationMs = 500;
5849

59-
constructor(expiryDurationMs) {
60-
this.expiryDurationMs = expiryDurationMs;
50+
/**
51+
* Returns the number of total elements in the cache.
52+
*/
53+
size() {
54+
return this.cache.length;
6155
}
6256

6357
/**
6458
* Clears the cache.
6559
*/
6660
clear() {
6761
this.cache = [];
68-
this.maxSize = 1;
69-
this.currentIndex = this.maxSize;
62+
this.currentIndex = 0;
7063
this.stale = false;
64+
}
65+
66+
/**
67+
* Adds a message to the cache.
68+
*/
69+
add(message) {
70+
this.cache.push(message);
71+
}
72+
73+
/**
74+
* Returns whether the cache is stale.
75+
*/
76+
isStale() {
77+
return this.stale;
78+
}
79+
80+
/**
81+
* @returns The next element in the cache or null if none exists.
82+
* @warning Does not check for staleness.
83+
*/
84+
next() {
85+
return this.currentIndex < this.cache.length ? this.cache[this.currentIndex++] : null;
86+
}
87+
}
88+
89+
90+
/**
91+
* MessageCache defines a dynamically sized cache for messages.
92+
* Internally, it uses PerPartitionMessageCache to store messages for each partition.
93+
* The capacity is increased or decreased according to whether the last fetch of messages
94+
* was less than the current capacity or saturated the current capacity.
95+
*/
96+
class MessageCache {
97+
98+
constructor(expiryDurationMs) {
99+
/* Per partition cache list containing non-empty PPCs */
100+
this.ppcList = [];
101+
/* Map of topic+partition to PerPartitionMessageCache. */
102+
this.tpToPpc = new Map();
103+
/* Index of the current PPC in the ppcList. */
104+
this.currentPpc = 0;
105+
/* Maximum size of the cache. (Capacity) */
106+
this.maxSize = 1;
107+
/* Number of times the size has been increased in a row, used for accounting for maxSize. */
71108
this.increaseCount = 0;
109+
/* Last cached time */
72110
this.cachedTime = hrtime();
111+
/* Whether the cache is stale. */
112+
this.stale = false;
113+
/* Expiry duration for this cache */
114+
this.expiryDurationMs = expiryDurationMs;
115+
}
116+
117+
addTopicPartitions(topicPartitions) {
118+
if (this.ppcList.length !== 0) {
119+
throw new Error('Cannot add topic partitions to a non-empty cache.');
120+
}
121+
for (const topicPartition of topicPartitions) {
122+
const key = partitionKey(topicPartition);
123+
this.tpToPpc.set(key, new PerPartitionMessageCache());
124+
}
125+
}
126+
127+
removeTopicPartitions(topicPartitions = null) {
128+
if (this.ppcList.length !== 0) {
129+
throw new Error('Cannot remove topic partitions from a non-empty cache.');
130+
}
131+
132+
if (topicPartitions === null) {
133+
this.tpToPpc.clear();
134+
return;
135+
}
136+
for (const topicPartition of assignment) {
137+
const key = partitionKey(topicPartition);
138+
this.tpToPpc.delete(key);
139+
}
140+
}
141+
142+
/**
143+
* Returns whether the cache is stale.
144+
*/
145+
isStale() {
146+
if (this.stale)
147+
return true;
148+
149+
const cacheTime = hrtime(this.cachedTime);
150+
const cacheTimeMs = Math.floor(cacheTime[0] * 1000 + cacheTime[1] / 1000000);
151+
this.stale = cacheTimeMs > this.expiryDurationMs;
152+
153+
// TODO: ideally, local staleness should not lead to global staleness.
154+
// But for now, make it so because seeking to stored offset on local staleness is tricky.
155+
this.stale = this.stale || this.ppcList.some(cache => cache.isStale());
156+
return this.stale;
73157
}
74158

75159
/**
76160
* Request a size increase.
77161
* It increases the size by 2x, but only if the size is less than 1024,
78162
* only if the size has been requested to be increased twice in a row.
79-
* @returns
80163
*/
81164
increaseMaxSize() {
82165
if (this.maxSize === 1024)
@@ -101,33 +184,67 @@ class MessageCache {
101184
}
102185

103186
/**
104-
* Sets cache and resets all the indices and timer.
105-
* @param {*} messages
187+
* Add a single message to the cache.
106188
*/
107-
setCache(messages) {
108-
this.cache = messages;
109-
this.currentIndex = 1;
110-
this.cachedTime = hrtime();
189+
#add(message) {
190+
const key = partitionKey(message)
191+
const cache = this.tpToPpc.get(key);
192+
cache.add(message);
193+
if (cache.size() === 1) {
194+
this.ppcList.push(cache);
195+
}
111196
}
112197

113198
/**
114-
* @returns The next element in the cache or null if none exists.
115-
* @warning Does not check for staleness.
199+
* Adds many messages into the cache, partitioning them as per their toppar.
116200
*/
117-
next() {
118-
return this.currentIndex < this.cache.length ? this.cache[this.currentIndex++] : null;
201+
addMessages(messages) {
202+
this.stale = false;
203+
this.cachedTime = hrtime();
204+
this.currentPpc = 0;
205+
for (const message of messages)
206+
this.#add(message);
207+
208+
// TODO: add ppcList sort step.
209+
// Rationale: ideally it's best to consume in the ascending order of timestamps.
119210
}
120211

121-
/* Whether the cache is stale. */
122-
isStale() {
123-
if (this.stale)
124-
return true;
212+
/**
213+
* Returns the next element in the cache, or null if none exists.
214+
*
215+
* If the current PPC is exhausted, it moves to the next PPC.
216+
* If all PPCs are exhausted, it returns null.
217+
* @warning Does not check for staleness. That is left up to the user.
218+
*/
219+
next() {
220+
if (this.currentPpc >= this.ppcList.length) {
221+
return null;
222+
}
125223

126-
const cacheTime = hrtime(this.cachedTime);
127-
const cacheTimeMs = Math.floor(cacheTime[0] * 1000 + cacheTime[1] / 1000000);
128-
return cacheTimeMs > this.expiryDurationMs;
224+
let next = null;
225+
while (next === null && this.currentPpc < this.ppcList.length) {
226+
next = this.ppcList[this.currentPpc].next();
227+
if (next !== null)
228+
break;
229+
this.currentPpc++;
230+
}
231+
return next; // Caller is responsible for triggering fetch logic here if next == null.
129232
}
130233

234+
/**
235+
* Clears cache completely.
236+
*/
237+
clear() {
238+
for (const cache of this.ppcList) {
239+
cache.clear();
240+
}
241+
this.ppcList = [];
242+
this.currentPpc = 0;
243+
this.maxSize = 1;
244+
this.increaseCount = 0;
245+
this.stale = false;
246+
this.cachedTime = hrtime();
247+
}
131248
}
132249

133250
class Consumer {
@@ -254,7 +371,7 @@ class Consumer {
254371
const assignment = this.assignment();
255372
const seekPromises = [];
256373
for (const topicPartitionOffset of assignment) {
257-
const key = `${topicPartitionOffset.topic}|${topicPartitionOffset.partition}`;
374+
const key = partitionKey(topicPartitionOffset);
258375
if (!this.#lastConsumedOffsets.has(key))
259376
continue;
260377

@@ -376,11 +493,19 @@ class Consumer {
376493
}
377494
}
378495

496+
// Populate per-partion caches.
497+
// For cooperative sticky, just add the newly recieved partitions.
498+
// If it's eager, it's already empty, so we can add all the partitions.
499+
this.#messageCache.addTopicPartitions(assignment);
500+
379501
} else {
380-
if (this.#internalClient.rebalanceProtocol() === "EAGER")
502+
if (this.#internalClient.rebalanceProtocol() === "EAGER") {
381503
this.#internalClient.unassign();
382-
else
504+
this.#messageCache.removeTopicPartitions();
505+
} else {
383506
this.#internalClient.incrementalUnassign(assignment);
507+
this.#messageCache.removeTopicPartitions(assignment);
508+
}
384509
}
385510
} catch (e) {
386511
// Ignore exceptions if we are not connected
@@ -724,8 +849,8 @@ class Consumer {
724849
reject(createKafkaJsErrorFromLibRdKafkaError(err));
725850
return;
726851
}
727-
this.#messageCache.setCache(messages);
728-
const message = messages[0];
852+
this.#messageCache.addMessages(messages);
853+
const message = this.#messageCache.next();
729854
if (messages.length === this.#messageCache.maxSize) {
730855
this.#messageCache.increaseMaxSize();
731856
} else {
@@ -930,7 +1055,7 @@ class Consumer {
9301055
/* Since this error cannot be exposed to the user in the current situation, just log and retry.
9311056
* This is due to restartOnFailure being set to always true. */
9321057
if (this.#logger)
933-
this.#logger.error(`Consumer encountered error while consuming. Retrying. Error details: ${JSON.stringify(e)}`);
1058+
this.#logger.error(`Consumer encountered error while consuming. Retrying. Error details: ${e} : ${e.stack}`);
9341059
});
9351060

9361061
if (!m) {
@@ -973,7 +1098,7 @@ class Consumer {
9731098
*
9741099
* So - do nothing but a debug log, but at this point eachMessageProcessed is false.
9751100
*/
976-
this.#logger.debug(`Consumer encountered error while processing message. Error details: ${JSON.stringify(e)}. The same message may be reprocessed.`);
1101+
this.#logger.debug(`Consumer encountered error while processing message. Error details: ${e}: ${e.stack}. The same message may be reprocessed.`);
9771102
}
9781103

9791104
/* If the message is unprocessed, due to an error, or because the user has not resolved it, we seek back. */
@@ -993,7 +1118,7 @@ class Consumer {
9931118
topic: m.topic, partition: m.partition, offset: Number(m.offset) + 1, leaderEpoch: m.leaderEpoch
9941119
}]);
9951120
}
996-
this.#lastConsumedOffsets.set(`${m.topic}|${m.partition}`, Number(m.offset) + 1);
1121+
this.#lastConsumedOffsets.set(partitionKey(m), Number(m.offset) + 1);
9971122
} catch (e) {
9981123
/* Not much we can do, except log the error. */
9991124
if (this.#logger)
@@ -1089,7 +1214,7 @@ class Consumer {
10891214
* So - do nothing but a debug log, but at this point eachMessageProcessed needs to be false unless
10901215
* the user has explicitly marked it as true.
10911216
*/
1092-
this.#logger.debug(`Consumer encountered error while processing message. Error details: ${JSON.stringify(e)}. The same message may be reprocessed.`);
1217+
this.#logger.debug(`Consumer encountered error while processing message. Error details: ${e}: ${e.stack}. The same message may be reprocessed.`);
10931218

10941219
/* The value of eachBatchAutoResolve is not important. The only place where a message is marked processed
10951220
* despite an error is if the user says so, and the user can use resolveOffsets for both the possible
@@ -1115,7 +1240,7 @@ class Consumer {
11151240
topic: m.topic, partition: m.partition, offset: Number(m.offset) + 1, leaderEpoch: m.leaderEpoch
11161241
}]);
11171242
}
1118-
this.#lastConsumedOffsets.set(`${m.topic}|${m.partition}`, Number(m.offset) + 1);
1243+
this.#lastConsumedOffsets.set(partitionKey(m), Number(m.offset) + 1);
11191244
} catch (e) {
11201245
/* Not much we can do, except log the error. */
11211246
if (this.#logger)
@@ -1273,7 +1398,7 @@ class Consumer {
12731398

12741399
for (let i = 0; i < assignment.length; i++) {
12751400
const topicPartition = assignment[i];
1276-
const key = `${topicPartition.topic}|${topicPartition.partition}`;
1401+
const key = partitionKey(topicPartition);
12771402
if (!this.#pendingSeeks.has(key))
12781403
continue;
12791404

@@ -1303,7 +1428,7 @@ class Consumer {
13031428
let invalidateMessage = false;
13041429

13051430
for (const topicPartition of assignment) {
1306-
const key = `${topicPartition.topic}|${topicPartition.partition}`;
1431+
const key = partitionKey(topicPartition);
13071432
if (!this.#pendingSeeks.has(key))
13081433
continue;
13091434

@@ -1372,7 +1497,7 @@ class Consumer {
13721497
}
13731498

13741499
this.#checkPendingSeeks = true;
1375-
this.#pendingSeeks.set(`${rdKafkaTopicPartitionOffset.topic}|${rdKafkaTopicPartitionOffset.partition}`, rdKafkaTopicPartitionOffset.offset);
1500+
this.#pendingSeeks.set(partitionKey(rdKafkaTopicPartitionOffset), rdKafkaTopicPartitionOffset.offset);
13761501
}
13771502

13781503
async describeGroup() {
@@ -1441,6 +1566,8 @@ class Consumer {
14411566
return;
14421567
}
14431568
this.#internalClient.pause(topics);
1569+
1570+
// TODO: make this staleness per-partition, not on a global cache level.
14441571
this.#messageCache.stale = true;
14451572

14461573
topics.map(JSON.stringify).forEach(topicPartition => this.#pausedPartitions.add(topicPartition));

0 commit comments

Comments
 (0)