Skip to content

Commit a728b4a

Browse files
committed
Add per-partition cache expiry logic
1 parent 9d3130f commit a728b4a

File tree

2 files changed

+361
-252
lines changed

2 files changed

+361
-252
lines changed

lib/kafkajs/_consumer.js

Lines changed: 63 additions & 252 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const {
1919
partitionKey,
2020
} = require('./_common');
2121
const { Buffer } = require('buffer');
22-
const { hrtime } = require('process');
22+
const MessageCache = require('./_consumer_cache');
2323

2424
const ConsumerState = Object.freeze({
2525
INIT: 0,
@@ -35,218 +35,6 @@ const PartitionAssigners = Object.freeze({
3535
cooperativeSticky: 'cooperative-sticky',
3636
});
3737

38-
39-
/**
40-
* A PerPartitionMessageCache is a cache for messages for a single partition.
41-
*/
42-
class PerPartitionMessageCache {
43-
/* The cache is a list of messages. */
44-
cache = [];
45-
/* Index of next element to be fetched in the cache. */
46-
currentIndex = 0;
47-
/* Whether the cache is stale. */
48-
stale = false;
49-
50-
/**
51-
* Returns the number of total elements in the cache.
52-
*/
53-
size() {
54-
return this.cache.length;
55-
}
56-
57-
/**
58-
* Clears the cache.
59-
*/
60-
clear() {
61-
this.cache = [];
62-
this.currentIndex = 0;
63-
this.stale = false;
64-
}
65-
66-
/**
67-
* Adds a message to the cache.
68-
*/
69-
add(message) {
70-
this.cache.push(message);
71-
}
72-
73-
/**
74-
* Returns whether the cache is stale.
75-
*/
76-
isStale() {
77-
return this.stale;
78-
}
79-
80-
/**
81-
* @returns The next element in the cache or null if none exists.
82-
* @warning Does not check for staleness.
83-
*/
84-
next() {
85-
return this.currentIndex < this.cache.length ? this.cache[this.currentIndex++] : null;
86-
}
87-
}
88-
89-
90-
/**
91-
* MessageCache defines a dynamically sized cache for messages.
92-
* Internally, it uses PerPartitionMessageCache to store messages for each partition.
93-
* The capacity is increased or decreased according to whether the last fetch of messages
94-
* was less than the current capacity or saturated the current capacity.
95-
*/
96-
class MessageCache {
97-
98-
constructor(expiryDurationMs) {
99-
/* Per partition cache list containing non-empty PPCs */
100-
this.ppcList = [];
101-
/* Map of topic+partition to PerPartitionMessageCache. */
102-
this.tpToPpc = new Map();
103-
/* Index of the current PPC in the ppcList. */
104-
this.currentPpc = 0;
105-
/* Maximum size of the cache. (Capacity) */
106-
this.maxSize = 1;
107-
/* Number of times the size has been increased in a row, used for accounting for maxSize. */
108-
this.increaseCount = 0;
109-
/* Last cached time */
110-
this.cachedTime = hrtime();
111-
/* Whether the cache is stale. */
112-
this.stale = false;
113-
/* Expiry duration for this cache */
114-
this.expiryDurationMs = expiryDurationMs;
115-
}
116-
117-
addTopicPartitions(topicPartitions) {
118-
if (this.ppcList.length !== 0) {
119-
throw new Error('Cannot add topic partitions to a non-empty cache.');
120-
}
121-
for (const topicPartition of topicPartitions) {
122-
const key = partitionKey(topicPartition);
123-
this.tpToPpc.set(key, new PerPartitionMessageCache());
124-
}
125-
}
126-
127-
removeTopicPartitions(topicPartitions = null) {
128-
if (this.ppcList.length !== 0) {
129-
throw new Error('Cannot remove topic partitions from a non-empty cache.');
130-
}
131-
132-
if (topicPartitions === null) {
133-
this.tpToPpc.clear();
134-
return;
135-
}
136-
for (const topicPartition of assignment) {
137-
const key = partitionKey(topicPartition);
138-
this.tpToPpc.delete(key);
139-
}
140-
}
141-
142-
/**
143-
* Returns whether the cache is stale.
144-
*/
145-
isStale() {
146-
if (this.stale)
147-
return true;
148-
149-
const cacheTime = hrtime(this.cachedTime);
150-
const cacheTimeMs = Math.floor(cacheTime[0] * 1000 + cacheTime[1] / 1000000);
151-
this.stale = cacheTimeMs > this.expiryDurationMs;
152-
153-
// TODO: ideally, local staleness should not lead to global staleness.
154-
// But for now, make it so because seeking to stored offset on local staleness is tricky.
155-
this.stale = this.stale || this.ppcList.some(cache => cache.isStale());
156-
return this.stale;
157-
}
158-
159-
/**
160-
* Request a size increase.
161-
* It increases the size by 2x, but only if the size is less than 1024,
162-
* only if the size has been requested to be increased twice in a row.
163-
*/
164-
increaseMaxSize() {
165-
if (this.maxSize === 1024)
166-
return;
167-
168-
this.increaseCount++;
169-
if (this.increaseCount <= 1)
170-
return;
171-
172-
this.maxSize = Math.min(this.maxSize << 1, 1024);
173-
this.increaseCount = 0;
174-
}
175-
176-
/**
177-
* Request a size decrease.
178-
* It decreases the size to 80% of the last received size, with a minimum of 1.
179-
* @param {number} recvdSize - the number of messages received in the last poll.
180-
*/
181-
decreaseMaxSize(recvdSize) {
182-
this.maxSize = Math.max(Math.floor((recvdSize * 8) / 10), 1);
183-
this.increaseCount = 0;
184-
}
185-
186-
/**
187-
* Add a single message to the cache.
188-
*/
189-
#add(message) {
190-
const key = partitionKey(message)
191-
const cache = this.tpToPpc.get(key);
192-
cache.add(message);
193-
if (cache.size() === 1) {
194-
this.ppcList.push(cache);
195-
}
196-
}
197-
198-
/**
199-
* Adds many messages into the cache, partitioning them as per their toppar.
200-
*/
201-
addMessages(messages) {
202-
this.stale = false;
203-
this.cachedTime = hrtime();
204-
this.currentPpc = 0;
205-
for (const message of messages)
206-
this.#add(message);
207-
208-
// TODO: add ppcList sort step.
209-
// Rationale: ideally it's best to consume in the ascending order of timestamps.
210-
}
211-
212-
/**
213-
* Returns the next element in the cache, or null if none exists.
214-
*
215-
* If the current PPC is exhausted, it moves to the next PPC.
216-
* If all PPCs are exhausted, it returns null.
217-
* @warning Does not check for staleness. That is left up to the user.
218-
*/
219-
next() {
220-
if (this.currentPpc >= this.ppcList.length) {
221-
return null;
222-
}
223-
224-
let next = null;
225-
while (next === null && this.currentPpc < this.ppcList.length) {
226-
next = this.ppcList[this.currentPpc].next();
227-
if (next !== null)
228-
break;
229-
this.currentPpc++;
230-
}
231-
return next; // Caller is responsible for triggering fetch logic here if next == null.
232-
}
233-
234-
/**
235-
* Clears cache completely.
236-
*/
237-
clear() {
238-
for (const cache of this.ppcList) {
239-
cache.clear();
240-
}
241-
this.ppcList = [];
242-
this.currentPpc = 0;
243-
this.maxSize = 1;
244-
this.increaseCount = 0;
245-
this.stale = false;
246-
this.cachedTime = hrtime();
247-
}
248-
}
249-
25038
class Consumer {
25139
/**
25240
* The config supplied by the user.
@@ -312,7 +100,6 @@ class Consumer {
312100
/**
313101
* A map of topic+partition to the offset that was last consumed.
314102
* The keys are of the type "<topic>|<partition>".
315-
* This is only populated when we're in the kafkaJS compatibility mode.
316103
* @type {Map<string, number>}
317104
*/
318105
#lastConsumedOffsets = new Map();
@@ -358,25 +145,25 @@ class Consumer {
358145
}
359146

360147
/**
361-
* Clear the message cache.
362-
* For simplicity, this always clears the entire message cache rather than being selective.
148+
* Clear the message cache, and reset to stored positions.
363149
*
364-
* @param {boolean} seek - whether to seek to the stored offsets after clearing the cache.
365-
* this should be set to true if partitions are retained after this operation.
150+
* @param {Array<{topic: string, partition: number}>|null} topicPartitions to clear the cache for, if null, then clear all assigned.
366151
*/
367-
async #clearCacheAndResetPositions(seek = true) {
368-
/* Seek to stored offset for each topic partition so that if
369-
* we've gotten further along then they have, we can come back. */
370-
if (seek) {
371-
const assignment = this.assignment();
372-
const seekPromises = [];
373-
for (const topicPartitionOffset of assignment) {
374-
const key = partitionKey(topicPartitionOffset);
375-
if (!this.#lastConsumedOffsets.has(key))
376-
continue;
152+
async #clearCacheAndResetPositions(topicPartitions = null) {
153+
/* Seek to stored offset for each topic partition. It's possible that we've
154+
* consumed messages upto N from the internalClient, but the user has stale'd the cache
155+
* after consuming just k (< N) messages. We seek to k+1. */
156+
157+
const clearPartitions = topicPartitions ? topicPartitions : this.assignment();
158+
const seekPromises = [];
159+
for (const topicPartitionOffset of clearPartitions) {
160+
const key = partitionKey(topicPartitionOffset);
161+
if (!this.#lastConsumedOffsets.has(key))
162+
continue;
377163

378-
/* Fire off a seek */
379-
const seekPromise = new Promise((resolve, reject) => this.#internalClient.seek({
164+
/* Fire off a seek */
165+
const seekPromise = new Promise((resolve, reject) => {
166+
this.#internalClient.seek({
380167
topic: topicPartitionOffset.topic,
381168
partition: topicPartitionOffset.partition,
382169
offset: +this.#lastConsumedOffsets.get(key)
@@ -386,18 +173,24 @@ class Consumer {
386173
} else {
387174
resolve();
388175
}
389-
}));
390-
seekPromises.push(seekPromise);
391-
}
176+
});
392177

393-
/* TODO: we should cry more about this and render the consumer unusable. */
394-
await Promise.all(seekPromises).catch(err => this.#logger.error("Seek error. This is effectively a fatal error:" + err));
178+
this.#lastConsumedOffsets.delete(key);
179+
});
180+
seekPromises.push(seekPromise);
395181
}
396182

397-
/* Clear the cache. */
398-
this.#messageCache.clear();
399-
/* Clear the offsets - no need to keep them around. */
400-
this.#lastConsumedOffsets.clear();
183+
/* TODO: we should cry more about this and render the consumer unusable. */
184+
await Promise.all(seekPromises).catch(err => this.#logger.error("Seek error. This is effectively a fatal error:" + err));
185+
186+
187+
/* Clear the cache and stored offsets.
188+
* We need to do this only if topicPartitions = null (global cache expiry).
189+
* This is because in case of a local cache expiry, MessageCache handles
190+
* skipping that (and clearing that later before getting new messages). */
191+
if (!topicPartitions) {
192+
this.#messageCache.clear();
193+
}
401194
}
402195

403196
/**
@@ -1044,9 +837,14 @@ class Consumer {
1044837
if (!(await acquireOrLog(this.#lock, this.#logger)))
1045838
continue;
1046839

1047-
/* Invalidate the message cache if needed. */
1048-
if (this.#messageCache.isStale()) {
1049-
await this.#clearCacheAndResetPositions(true);
840+
/* Invalidate the message cache if needed */
841+
const locallyStale = this.#messageCache.popLocallyStale();
842+
if (this.#messageCache.isStale()) { /* global staleness */
843+
await this.#clearCacheAndResetPositions();
844+
await this.#lock.release();
845+
continue;
846+
} else if (locallyStale.length !== 0) { /* local staleness */
847+
await this.#clearCacheAndResetPositions(locallyStale);
1050848
await this.#lock.release();
1051849
continue;
1052850
}
@@ -1153,9 +951,14 @@ class Consumer {
1153951
if (!(await acquireOrLog(this.#lock, this.#logger)))
1154952
continue;
1155953

1156-
/* Invalidate the message cache if needed. */
1157-
if (this.#messageCache.isStale()) {
1158-
await this.#clearCacheAndResetPositions(true);
954+
/* Invalidate the message cache if needed */
955+
const locallyStale = this.#messageCache.popLocallyStale();
956+
if (this.#messageCache.isStale()) { /* global staleness */
957+
await this.#clearCacheAndResetPositions();
958+
await this.#lock.release();
959+
continue;
960+
} else if (locallyStale.length !== 0) { /* local staleness */
961+
await this.#clearCacheAndResetPositions(locallyStale);
1159962
await this.#lock.release();
1160963
continue;
1161964
}
@@ -1441,14 +1244,21 @@ class Consumer {
14411244
offset
14421245
};
14431246

1444-
/* We need a complete reset of the cache if we're seeking to a different offset even for one partition.
1445-
* At a later point, this may be improved at the cost of added complexity of maintaining message generation,
1446-
* or else purging the cache of just those partitions which are seeked. */
1447-
await this.#clearCacheAndResetPositions(true);
1247+
/* The ideal sequence of events here is to:
1248+
* 1. Mark the cache as stale so we don't consume from it any further.
1249+
* 2. Call clearCacheAndResetPositions() for the topic partition, which is supposed
1250+
* to be called after each cache invalidation.
1251+
*
1252+
* However, what (2) does is to pop lastConsumedOffsets[topic partition], and seeks to
1253+
* the said popped value. Seeking is redundant since we seek here anyway. So, we can skip
1254+
* the seek by just clearing the lastConsumedOffsets[topic partition].
1255+
*/
1256+
this.#messageCache.markStale([topicPartition]);
1257+
this.#lastConsumedOffsets.delete(key);
14481258

14491259
/* It's assumed that topicPartition is already assigned, and thus can be seeked to and committed to.
14501260
* Errors are logged to detect bugs in the internal code. */
1451-
/* TODO: is it work awaiting seeks to finish? */
1261+
/* TODO: is it worth awaiting seeks to finish? */
14521262
this.#internalClient.seek(topicPartitionOffset, 0, err => err ? this.#logger.error(err) : null);
14531263
offsetsToCommit.push({
14541264
topic: topicPartition.topic,
@@ -1567,8 +1377,9 @@ class Consumer {
15671377
}
15681378
this.#internalClient.pause(topics);
15691379

1570-
// TODO: make this staleness per-partition, not on a global cache level.
1571-
this.#messageCache.stale = true;
1380+
/* Mark the messages in the cache as stale, runInternal* will deal with
1381+
* making it unusable. */
1382+
this.#messageCache.markStale(topics);
15721383

15731384
topics.map(JSON.stringify).forEach(topicPartition => this.#pausedPartitions.add(topicPartition));
15741385

0 commit comments

Comments
 (0)