Skip to content

Commit 33d1370

Browse files
committed
Fix for at-least-once guarantee not ensured in case a seek happens on one partition and there are messages being fetched for other partitions
1 parent 99a906c commit 33d1370

File tree

2 files changed

+53
-22
lines changed

2 files changed

+53
-22
lines changed

lib/kafkajs/_consumer.js

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,21 +1402,21 @@ class Consumer {
14021402
return ppc;
14031403
}
14041404

1405-
#discardMessages(ms, ppc) {
1406-
if (ms) {
1407-
let m = ms[0];
1408-
if (m.constructor === Array) {
1409-
m = m[0];
1410-
}
1411-
ppc = ms[1];
1412-
if (m && !this.#lastConsumedOffsets.has(ppc.key)) {
1405+
#returnMessages(ms) {
1406+
let m = ms[0];
1407+
// ppc could have been change we must return it as well.
1408+
let ppc = ms[1];
1409+
const messagesToReturn = m.constructor === Array ? m : [m];
1410+
const firstMessage = messagesToReturn[0];
1411+
if (firstMessage && !this.#lastConsumedOffsets.has(ppc.key)) {
14131412
this.#lastConsumedOffsets.set(ppc.key, {
1414-
topic: m.topic,
1415-
partition: m.partition,
1416-
offset: m.offset - 1,
1413+
topic: firstMessage.topic,
1414+
partition: firstMessage.partition,
1415+
offset: firstMessage.offset - 1,
14171416
});
1418-
}
14191417
}
1418+
1419+
this.#messageCache.returnMessages(messagesToReturn);
14201420
return ppc;
14211421
}
14221422

@@ -1463,11 +1463,15 @@ class Consumer {
14631463
continue;
14641464

14651465
if (this.#pendingOperations.length) {
1466-
ppc = this.#discardMessages(ms, ppc);
1467-
break;
1466+
/*
1467+
* Don't process messages anymore, execute the operations first.
1468+
* Return the messages to the cache that will be cleared if needed.
1469+
* `ppc` could have been changed, we must return it as well.
1470+
*/
1471+
ppc = this.#returnMessages(ms);
1472+
} else {
1473+
ppc = await perMessageProcessor(ms, config);
14681474
}
1469-
1470-
ppc = await perMessageProcessor(ms, config);
14711475
} catch (e) {
14721476
/* Since this error cannot be exposed to the user in the current situation, just log and retry.
14731477
* This is due to restartOnFailure being set to always true. */
@@ -1548,7 +1552,9 @@ class Consumer {
15481552
* @private
15491553
*/
15501554
async #executePendingOperations() {
1551-
for (const op of this.#pendingOperations) {
1555+
// Execute all pending operations, they could add more operations.
1556+
while (this.#pendingOperations.length > 0) {
1557+
const op = this.#pendingOperations.shift();
15521558
await op();
15531559
}
15541560
this.#pendingOperations = [];

lib/kafkajs/_consumer_cache.js

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,19 @@ class PerPartitionMessageCache {
2828
}
2929

3030
/**
31-
* Adds a message to the cache.
31+
* Adds a message to the cache as last one.
3232
*/
33-
_add(message) {
33+
_addLast(message) {
3434
this.#cache.addLast(message);
3535
}
3636

37+
/**
38+
* Adds a message to the cache as first one.
39+
*/
40+
_addFirst(message) {
41+
this.#cache.addFirst(message);
42+
}
43+
3744
get key() {
3845
return this.#key;
3946
}
@@ -126,7 +133,7 @@ class MessageCache {
126133
*
127134
* @param {Object} message - the message to add to the cache.
128135
*/
129-
#add(message) {
136+
#add(message, head = false) {
130137
const key = partitionKey(message);
131138
let cache = this.#tpToPpc.get(key);
132139
if (!cache) {
@@ -135,7 +142,11 @@ class MessageCache {
135142
cache._node = this.#availablePartitions.addLast(cache);
136143
this.notifyAvailablePartitions();
137144
}
138-
cache._add(message);
145+
if (head) {
146+
cache._addFirst(message);
147+
} else {
148+
cache._addLast(message);
149+
}
139150
}
140151

141152
get availableSize() {
@@ -183,7 +194,21 @@ class MessageCache {
183194
*/
184195
addMessages(messages) {
185196
for (const message of messages)
186-
this.#add(message);
197+
this.#add(message, false);
198+
this.#size += messages.length;
199+
}
200+
201+
/**
202+
* Return messages to the cache, to be read again.
203+
*
204+
* @param {Array} messages - the messages to return to the cache.
205+
*/
206+
returnMessages(messages) {
207+
let i = messages.length - 1;
208+
while (i >= 0) {
209+
this.#add(messages[i], true);
210+
i--;
211+
}
187212
this.#size += messages.length;
188213
}
189214

0 commit comments

Comments
 (0)