Skip to content

Commit 9a2f9c4

Browse files
committed
Add message set capability to message cache
1 parent 85f9b7e commit 9a2f9c4

File tree

2 files changed

+196
-5
lines changed

2 files changed

+196
-5
lines changed

lib/kafkajs/_consumer_cache.js

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,26 @@ class PerPartitionMessageCache {
5252
next() {
5353
return this.currentIndex < this.cache.length ? this.cache[this.currentIndex++] : null;
5454
}
55+
56+
/**
57+
* @returns Upto `n` next elements in the cache or an null if none available.
58+
* @warning Does not check for staleness.
59+
*/
60+
nextN(n) {
61+
if (this.currentIndex >= this.cache.length) {
62+
return null;
63+
}
64+
65+
if (this.currentIndex + n >= this.cache.length) {
66+
const res = this.cache.slice(this.currentIndex);
67+
this.currentIndex = this.cache.length;
68+
return res;
69+
}
70+
71+
const res = this.cache.slice(this.currentIndex, this.currentIndex + n);
72+
this.currentIndex += n;
73+
return res;
74+
}
5575
}
5676

5777

@@ -387,6 +407,66 @@ class MessageCache {
387407
return null; // Caller is responsible for triggering fetch logic here if next == null.
388408
}
389409

410+
/**
411+
* Returns the next `size` elements in the cache as an array, or null if none exists.
412+
*
413+
* @sa next, the behaviour is similar in other aspects.
414+
*/
415+
nextN(idx = -1, size = 1) {
416+
let index = idx;
417+
if (index !== -1 && !this.pendingIndices.has(index)) {
418+
/* The user is behaving well by returning the index to us, but in the meanwhile, it's possible
419+
* that we ran out of messages and fetched a new batch. So we just discard what the user is
420+
* returning to us. */
421+
this.logger.error("Returning unowned index", idx, "to cache. Discarding it.");
422+
index = -1;
423+
} else if (index !== -1) {
424+
this.pendingIndices.delete(index);
425+
/* We don't add the index back to the this.indices here because we're just going to remove it again the
426+
* first thing in the loop below, so it's slightly better to just avoid doing it. */
427+
}
428+
429+
if (index === -1) {
430+
if (this.indices.size() === 0 || this.pendingIndices.size === this.maxConcurrency) {
431+
return null;
432+
}
433+
index = this.indices.pop(); // index cannot be undefined here since indices.size > 0
434+
}
435+
436+
/* This loop will always terminate. Why?
437+
* On each iteration:
438+
* 1. We either return (if next is not null).
439+
* 2. We change the PPC index we're interested in, and there are a finite number of PPCs.
440+
* (PPCs don't repeat within the loop since the indices of the PPC are popped from within the
441+
* heap and not put back in, or else a new index is created bounded by ppcList.length).
442+
*/
443+
while (true) {
444+
const next = this.ppcList[index].nextN(size);
445+
if (this.ppcList[index].isStale() || next === null) {
446+
/* If the current PPC is stale or empty, then we move on to the next one.
447+
* It is equally valid to choose any PPC available within this.indices, or else
448+
* move on to the next PPC (maxIndicesIndex + 1) if available.
449+
* We prefer the second option a bit more since we don't have to do a heap operation. */
450+
const toAdd = this.maxIndicesIndex + 1;
451+
if (toAdd < this.ppcList.length) {
452+
this.maxIndicesIndex = toAdd;
453+
index = toAdd;
454+
} else if (!this.indices.isEmpty()) {
455+
index = this.indices.pop()
456+
} else {
457+
break; // nothing left.
458+
}
459+
continue;
460+
}
461+
462+
this.pendingIndices.add(index);
463+
/* Arrays are just objects. Setting a property is odd, but not disallowed. */
464+
next.index = index;
465+
return next;
466+
}
467+
return null; // Caller is responsible for triggering fetch logic here if next == null.
468+
}
469+
390470
/**
391471
* Clears the cache completely.
392472
* This resets it to a base state, and reduces the capacity of the cache back to 1.

test/promisified/unit/cache.spec.js

Lines changed: 116 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ describe('MessageCache', () => {
88
.fill()
99
.map((_, i) => ({ topic: 'topic', partition: i % 3, number: i }));
1010

11-
beforeEach(() => {
12-
});
13-
1411
describe("with concurrency", () => {
1512
let cache;
1613
beforeEach(() => {
@@ -37,6 +34,31 @@ describe('MessageCache', () => {
3734
expect(receivedMessages.slice(61, 30).every((msg, i) => msg.partition === receivedMessages[60].partition && (msg.number - 3) === receivedMessages[i].number)).toBeTruthy();
3835
});
3936

37+
it('caches messages and retrieves N of them', () => {
38+
const msgs = messages.slice(0, 90);
39+
cache.addMessages(msgs);
40+
41+
const receivedMessages = [];
42+
let nextIdx = -1;
43+
const expectedFetchedSizes = [11, 11, 8];
44+
for (let i = 0; i < (90/11); i++) {
45+
/* We choose to fetch 11 messages together rather than 10 so that we can test the case where
46+
* remaining messages > 0 but less than requested size. */
47+
const next = cache.nextN(nextIdx, 11);
48+
/* There are 30 messages per partition, the first fetch will get 11, the second 11, and the last one
49+
* 8, and then it repeats for each partition. */
50+
expect(next.length).toBe(expectedFetchedSizes[i % 3]);
51+
expect(next).not.toBeNull();
52+
receivedMessages.push(...next);
53+
nextIdx = next.index;
54+
}
55+
56+
/* Results are on a per-partition basis and well-ordered */
57+
expect(receivedMessages.slice(1, 30).every((msg, i) => msg.partition === receivedMessages[0].partition && (msg.number - 3) === receivedMessages[i].number)).toBeTruthy();
58+
expect(receivedMessages.slice(31, 30).every((msg, i) => msg.partition === receivedMessages[30].partition && (msg.number - 3) === receivedMessages[i].number)).toBeTruthy();
59+
expect(receivedMessages.slice(61, 30).every((msg, i) => msg.partition === receivedMessages[60].partition && (msg.number - 3) === receivedMessages[i].number)).toBeTruthy();
60+
});
61+
4062
it('does not allow fetching more than 1 message at a time', () => {
4163
const msgs = messages.slice(0, 90);
4264
cache.addMessages(msgs);
@@ -119,8 +141,30 @@ describe('MessageCache', () => {
119141
nextIdxs = [next0.index, next1.index];
120142
}
121143

122-
/* Results are on a zig-zag basis. */
123-
expect(receivedMessages.every((msg, i) => msg.number === receivedMessages.number));
144+
expect(receivedMessages.length).toBe(60);
145+
expect(receivedMessages.filter(msg => msg.partition === 0).length).toBe(30);
146+
expect(receivedMessages.filter(msg => msg.partition === 1).length).toBe(30);
147+
});
148+
149+
it('caches messages and retrieves N of them 2-at-a-time', () => {
150+
const msgs = messages.slice(0, 90).filter(msg => msg.partition !== 3);
151+
cache.addMessages(msgs);
152+
153+
const receivedMessages = [];
154+
let nextIdxs = [-1, -1];
155+
for (let i = 0; i < 30/11; i++) {
156+
const next0 = cache.nextN(nextIdxs[0], 11);
157+
const next1 = cache.nextN(nextIdxs[1], 11);
158+
expect(next0).not.toBeNull();
159+
expect(next1).not.toBeNull();
160+
receivedMessages.push(...next0);
161+
receivedMessages.push(...next1);
162+
nextIdxs = [next0.index, next1.index];
163+
}
164+
165+
expect(receivedMessages.length).toBe(60);
166+
expect(receivedMessages.filter(msg => msg.partition === 0).length).toBe(30);
167+
expect(receivedMessages.filter(msg => msg.partition === 1).length).toBe(30);
124168
});
125169

126170
it('does not allow fetching more than 2 message at a time', () => {
@@ -141,6 +185,25 @@ describe('MessageCache', () => {
141185
expect(next).not.toBeNull();
142186
});
143187

188+
189+
it('does not allow fetching more than 2 message sets at a time', () => {
190+
const msgs = messages.slice(0, 90);
191+
cache.addMessages(msgs);
192+
193+
let next = cache.nextN(-1, 11);
194+
let savedIndex = next.index;
195+
expect(next).not.toBeNull();
196+
next = cache.nextN(-1, 11);
197+
expect(next).not.toBeNull();
198+
next = cache.nextN(-1, 11);
199+
expect(next).toBeNull();
200+
expect(cache.pendingSize()).toBe(2);
201+
202+
// Fetch after returning index works.
203+
next = cache.nextN(savedIndex, 11);
204+
expect(next).not.toBeNull();
205+
});
206+
144207
it('stops fetching from stale partition', () => {
145208
const msgs = messages.slice(0, 90);
146209
cache.addMessages(msgs);
@@ -163,6 +226,29 @@ describe('MessageCache', () => {
163226
expect(receivedMessages).toEqual(expect.arrayContaining(msgs.slice(0, 3)));
164227
});
165228

229+
it('stops fetching message sets from stale partition', () => {
230+
const msgs = messages.slice(0, 90);
231+
cache.addMessages(msgs);
232+
233+
const receivedMessages = [];
234+
let nextIdx = -1;
235+
for (let i = 0; i < 3; i++) {
236+
const next = cache.nextN(nextIdx, 11);
237+
expect(next).not.toBeNull();
238+
receivedMessages.push(...next);
239+
nextIdx = next.index;
240+
cache.markStale([{topic: next[0].topic, partition: next[0].partition}]);
241+
}
242+
243+
// We should not be able to get anything more.
244+
expect(cache.nextN(nextIdx, 11)).toBeNull();
245+
// Nothing should be pending, we've returned everything.
246+
expect(cache.pendingSize()).toBe(0);
247+
// The first [11, 11, 11] messages from different toppars.
248+
expect(receivedMessages.length).toBe(33);
249+
expect(receivedMessages).toEqual(expect.arrayContaining(msgs.slice(0, 33)));
250+
});
251+
166252
it('one slow processing message should not slow down others', () => {
167253
const msgs = messages.slice(0, 90);
168254
cache.addMessages(msgs);
@@ -188,6 +274,31 @@ describe('MessageCache', () => {
188274
expect(receivedMessages.slice(31, 30).every((msg, i) => msg.partition === receivedMessages[30].partition && (msg.number - 3) === receivedMessages[i].number)).toBeTruthy();
189275
});
190276

277+
it('one slow processing message set should not slow down others', () => {
278+
const msgs = messages.slice(0, 90);
279+
cache.addMessages(msgs);
280+
281+
const receivedMessages = [];
282+
let nextIdx = -1;
283+
const slowMsg = cache.nextN(nextIdx, 11);
284+
for (let i = 0; i < 60/11; i++) { /* 60 - for non-partition 0 msgs */
285+
const next = cache.nextN(nextIdx, 11);
286+
expect(next).not.toBeNull();
287+
receivedMessages.push(...next);
288+
nextIdx = next.index;
289+
}
290+
291+
292+
// We should not be able to get anything more.
293+
expect(cache.nextN(nextIdx, 11)).toBeNull();
294+
// The slowMsg should be pending.
295+
expect(cache.pendingSize()).toBe(1);
296+
297+
/* Messages should be partition-wise and well-ordered. */
298+
expect(receivedMessages.slice(1, 30).every((msg, i) => msg.partition === receivedMessages[0].partition && (msg.number - 3) === receivedMessages[i].number)).toBeTruthy();
299+
expect(receivedMessages.slice(31, 30).every((msg, i) => msg.partition === receivedMessages[30].partition && (msg.number - 3) === receivedMessages[i].number)).toBeTruthy();
300+
});
301+
191302
it('should not be able to handle cache-clearance in the middle of processing', () => {
192303
const msgs = messages.slice(0, 90);
193304
cache.addMessages(msgs);

0 commit comments

Comments
 (0)