Skip to content

Commit 2f8d293

Browse files
authored
eachBatch should respect partition restriction despite concurrency (#224)
* Add failing test * Do not remove partition unless no messages are fetched * Update CHANGELOG.md
1 parent df74c29 commit 2f8d293

File tree

4 files changed

+73
-3
lines changed

4 files changed

+73
-3
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ v1.2.0 is a feature release. It is supported for all usage.
66

77
1. Add support for an Admin API to fetch topic offsets by timestamp (#206).
88

9+
## Fixes
10+
11+
1. Fixes an issue where the `eachBatch` callback was being called for the same partition concurrently (#224).
12+
913

1014
# confluent-kafka-javascript v1.1.0
1115

lib/kafkajs/_consumer_cache.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,11 +241,10 @@ class MessageCache {
241241

242242
let nextN = ppc._nextN(size);
243243

244-
if (size === -1 || nextN.length < size) {
244+
if (!nextN.length) {
245245
this.#removeEmptyPartition(ppc);
246-
}
247-
if (!nextN.length)
248246
return this.nextN(null, size);
247+
}
249248

250249
this.#size -= nextN.length;
251250
return [nextN, ppc];

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,4 +812,50 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
812812
expect(lost).toEqual(1);
813813
expect(errors).toEqual(false);
814814
}, 60000);
815+
816+
it('consumes no more than one batch from the same partition at at time', async () => {
817+
if (partitionsConsumedConcurrently === 1) {
818+
return;
819+
}
820+
821+
await producer.connect();
822+
823+
const messages = Array(200).fill().map(() => {
824+
const value = secureRandom();
825+
return { value: `value-${value}` };
826+
});
827+
828+
await producer.send({
829+
topic: topicName,
830+
messages: messages,
831+
});
832+
833+
await consumer.connect();
834+
await consumer.subscribe({ topic: topicName });
835+
836+
const batchesHandled = new Map();
837+
for (let i = 0; i < partitions; i++) {
838+
batchesHandled.set(i, 0);
839+
}
840+
841+
let messagesConsumed = 0;
842+
let batchesCountExceeds1 = false;
843+
844+
consumer.run({
845+
partitionsConsumedConcurrently,
846+
eachBatch: async ({ batch }) => {
847+
messagesConsumed += batch.messages.length;
848+
batchesHandled.set(batch.partition, batchesHandled.get(batch.partition) + 1);
849+
if (batchesHandled.get(batch.partition) > 1) {
850+
batchesCountExceeds1 = true;
851+
} else {
852+
await sleep(100);
853+
}
854+
batchesHandled.set(batch.partition, batchesHandled.get(batch.partition) - 1);
855+
}
856+
});
857+
858+
await waitFor(() => (messagesConsumed === messages.length || batchesCountExceeds1), () => { }, 100);
859+
expect(batchesCountExceeds1).toBe(false);
860+
});
815861
});

test/promisified/unit/cache.spec.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,27 @@ describe('MessageCache', () => {
126126
expect(receivedMessages.filter(msg => msg.partition === 1).length).toBe(30);
127127
});
128128

129+
it('caches messages and does not return messages from the same partition unless PPC is returned', () => {
130+
// Just partition 0 has messages.
131+
const msgs = messages.slice(0, 10).filter(msg => msg.partition === 0);
132+
cache.addMessages(msgs);
133+
134+
let next = [null, null];
135+
let nextPpc = [null, null];
136+
137+
// Fetch messages all the way to the end for partition 0 with size = 10 (can also use -1 instead of 10).
138+
next[0] = cache.nextN(nextPpc[0], 10);
139+
expect(next[0]).not.toBeNull();
140+
141+
// More messages get added in the meanwhile to the same partition.
142+
cache.addMessages(msgs);
143+
144+
// This call should not return anything - since the partition is still
145+
// owned by the first nextN caller, they have not returned it yet.
146+
next[1] = cache.nextN(nextPpc[1], -1);
147+
expect(next[1]).toBeNull();
148+
});
149+
129150
it('does not allow fetching messages more than available partitions at a time', () => {
130151
const msgs = messages.slice(0, 90);
131152
cache.addMessages(msgs);

0 commit comments

Comments
 (0)