Skip to content

Commit 5f9be45

Browse files
committed
Make is cleared before rebalance less flaky in case of increased time before first assignment
1 parent 33d1370 commit 5f9be45

File tree

2 files changed

+8
-10
lines changed

2 files changed

+8
-10
lines changed

lib/kafkajs/_consumer.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,7 +1415,7 @@ class Consumer {
14151415
offset: firstMessage.offset - 1,
14161416
});
14171417
}
1418-
1418+
14191419
this.#messageCache.returnMessages(messagesToReturn);
14201420
return ppc;
14211421
}
@@ -1463,7 +1463,7 @@ class Consumer {
14631463
continue;
14641464

14651465
if (this.#pendingOperations.length) {
1466-
/*
1466+
/*
14671467
* Don't process messages anymore, execute the operations first.
14681468
* Return the messages to the cache that will be cleared if needed.
14691469
* `ppc` could have been changed, we must return it as well.

test/promisified/consumer/consumerCacheTests.spec.js

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
9090
});
9191

9292
it('is cleared on seek', async () => {
93-
const producer = createProducer({}, {'batch.num.messages': '1'});
9493
await consumer.connect();
9594
await producer.connect();
9695
await consumer.subscribe({ topic: topicName });
@@ -135,8 +134,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
135134
expect(messagesConsumed.filter(m => m.partition === 1).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`));
136135
// partition 2
137136
expect(messagesConsumed.filter(m => m.partition === 2).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`));
138-
139-
await producer.disconnect();
140137
});
141138

142139
it('is cleared before rebalance', async () => {
@@ -145,7 +142,7 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
145142
* the consumers are created with the same groupId, we create them here.
146143
* TODO: verify correctness of theory. It's conjecture... which solves flakiness. */
147144
let groupId = `consumer-group-id-${secureRandom()}`;
148-
const multiplier = 9;
145+
const multiplier = 18;
149146
const numMessages = 16 * multiplier;
150147
consumer = createConsumer({
151148
groupId,
@@ -161,7 +158,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
161158
autoCommit: isAutoCommit,
162159
clientId: "consumer2",
163160
});
164-
const producer = createProducer({}, {'batch.num.messages': '1'});
165161

166162
await consumer.connect();
167163
await producer.connect();
@@ -181,11 +177,14 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
181177
{ topic: event.topic, partition: event.partition, offset: Number(event.message.offset) + 1 },
182178
]);
183179

184-
await sleep(100);
180+
// Simulate some processing time so we don't poll all messages
181+
// and put them in the cache before consumer2 joins.
182+
if (messagesConsumedConsumer2.length === 0)
183+
await sleep(100);
185184
}
186185
});
187186

188-
/* Evenly distribute 1024*9 messages across 3 partitions */
187+
/* Evenly distribute numMessages messages across 3 partitions */
189188
let i = 0;
190189
const messages = Array(numMessages)
191190
.fill()
@@ -226,7 +225,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
226225
expect(messagesConsumedConsumer2.length).toBeGreaterThan(0);
227226

228227
await consumer2.disconnect();
229-
await producer.disconnect();
230228
}, 60000);
231229

232230
it('does not hold up polling for non-message events', async () => {

0 commit comments

Comments
 (0)