Skip to content

Commit 85f9b7e

Browse files
committed
Fix tests for Per Partition Concurrency
1 parent f2ec063 commit 85f9b7e

File tree

5 files changed

+145
-109
lines changed

5 files changed

+145
-109
lines changed

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 107 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@ const {
1515
generateMessages,
1616
} = require('../testhelpers');
1717

18-
describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
18+
/* All combinations of autoCommit and partitionsConsumedConcurrently */
19+
const cases = Array(2 * 3).fill().map((_, i) => [i < 3, (i % 3) + 1]).slice(-1);
20+
21+
describe.each(cases)('Consumer', (isAutoCommit, partitionsConsumedConcurrently) => {
1922
let topicName, groupId, producer, consumer;
23+
const partitions = 3;
2024

2125
beforeEach(async () => {
26+
console.log("Starting:", expect.getState().currentTestName, "| isAutoCommit =", isAutoCommit, "| partitionsConsumedConcurrently =", partitionsConsumedConcurrently);
2227
topicName = `test-topic-${secureRandom()}`
2328
groupId = `consumer-group-id-${secureRandom()}`
2429

25-
await createTopic({ topic: topicName })
26-
30+
await createTopic({ topic: topicName, partitions })
2731
producer = createProducer({});
2832

2933
consumer = createConsumer({
@@ -37,6 +41,7 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
3741
afterEach(async () => {
3842
consumer && (await consumer.disconnect())
3943
producer && (await producer.disconnect())
44+
console.log("Ending:", expect.getState().currentTestName, "| isAutoCommit =", isAutoCommit, "| partitionsConsumedConcurrently =", partitionsConsumedConcurrently);
4045
});
4146

4247
it('consume messages', async () => {
@@ -45,13 +50,16 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
4550
await consumer.subscribe({ topic: topicName })
4651

4752
const messagesConsumed = [];
48-
consumer.run({ eachMessage: async event => messagesConsumed.push(event) });
53+
consumer.run({
54+
partitionsConsumedConcurrently,
55+
eachMessage: async event => messagesConsumed.push(event)
56+
});
4957

50-
const messages = Array(100)
58+
const messages = Array(10)
5159
.fill()
5260
.map(() => {
5361
const value = secureRandom()
54-
return { key: `key-${value}`, value: `value-${value}` }
62+
return { key: `key-${value}`, value: `value-${value}`, partition: 0 }
5563
})
5664

5765
await producer.send({ topic: topicName, messages })
@@ -76,7 +84,7 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
7684
message: expect.objectContaining({
7785
key: Buffer.from(messages[messages.length - 1].key),
7886
value: Buffer.from(messages[messages.length - 1].value),
79-
offset: '99',
87+
offset: '' + (messagesConsumed.length - 1),
8088
}),
8189
})
8290
)
@@ -91,17 +99,21 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
9199
await consumer.subscribe({ topic: topicName })
92100

93101
const messagesConsumed = [];
94-
consumer.run({ eachMessage: async event => messagesConsumed.push(event) });
102+
consumer.run({
103+
partitionsConsumedConcurrently,
104+
eachMessage: async event => messagesConsumed.push(event)
105+
});
95106

96-
const messages = [ {
97-
value: `value-${secureRandom}`,
98-
headers: {
99-
'header-1': 'value-1',
100-
'header-2': 'value-2',
101-
'header-3': [ 'value-3-1', 'value-3-2', Buffer.from([1,0,1,0,1]) ],
102-
'header-4': Buffer.from([1,0,1,0,1]),
103-
}
104-
} ]
107+
const messages = [{
108+
value: `value-${secureRandom}`,
109+
headers: {
110+
'header-1': 'value-1',
111+
'header-2': 'value-2',
112+
'header-3': ['value-3-1', 'value-3-2', Buffer.from([1, 0, 1, 0, 1])],
113+
'header-4': Buffer.from([1, 0, 1, 0, 1]),
114+
},
115+
partition: 0,
116+
}]
105117

106118
await producer.send({ topic: topicName, messages })
107119
await waitForMessages(messagesConsumed, { number: messages.length })
@@ -117,8 +129,8 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
117129
// Headers are always returned as Buffers from the broker.
118130
'header-1': Buffer.from('value-1'),
119131
'header-2': Buffer.from('value-2'),
120-
'header-3': [ Buffer.from('value-3-1'), Buffer.from('value-3-2'), Buffer.from([1,0,1,0,1]) ],
121-
'header-4': Buffer.from([1,0,1,0,1]),
132+
'header-3': [Buffer.from('value-3-1'), Buffer.from('value-3-2'), Buffer.from([1, 0, 1, 0, 1])],
133+
'header-4': Buffer.from([1, 0, 1, 0, 1]),
122134
}
123135
}),
124136
})
@@ -132,6 +144,7 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
132144

133145
const messagesConsumed = [];
134146
consumer.run({
147+
partitionsConsumedConcurrently,
135148
eachBatchAutoResolve: isAutoResolve,
136149
eachBatch: async event => {
137150
// Match the message format to be checked easily later.
@@ -148,42 +161,47 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
148161
}
149162
});
150163

151-
const messages = Array(100)
164+
const messages = Array(100 * partitions)
152165
.fill()
153-
.map(() => {
166+
.map((_, i) => {
154167
const value = secureRandom()
155-
return { key: `key-${value}`, value: `value-${value}` }
168+
return { key: `key-${value}`, value: `value-${value}`, partition: i % partitions }
156169
})
157170

158171
await producer.send({ topic: topicName, messages })
159172
await waitForMessages(messagesConsumed, { number: messages.length })
160173

161-
expect(messagesConsumed[0]).toEqual(
162-
expect.objectContaining({
163-
topic: topicName,
164-
partition: 0,
165-
message: expect.objectContaining({
166-
key: Buffer.from(messages[0].key),
167-
value: Buffer.from(messages[0].value),
168-
offset: String(0),
169-
}),
170-
})
171-
)
174+
for (let p = 0; p < partitions; p++) {
175+
const specificPartitionMessages = messagesConsumed.filter(m => m.partition === p);
176+
const specificExpectedMessages = messages.filter(m => m.partition === p);
177+
expect(specificPartitionMessages[0]).toEqual(
178+
expect.objectContaining({
179+
topic: topicName,
180+
partition: p,
181+
message: expect.objectContaining({
182+
key: Buffer.from(specificExpectedMessages[0].key),
183+
value: Buffer.from(specificExpectedMessages[0].value),
184+
offset: String(0),
185+
}),
186+
})
187+
);
172188

173-
expect(messagesConsumed[messagesConsumed.length - 1]).toEqual(
174-
expect.objectContaining({
175-
topic: topicName,
176-
partition: 0,
177-
message: expect.objectContaining({
178-
key: Buffer.from(messages[messages.length - 1].key),
179-
value: Buffer.from(messages[messages.length - 1].value),
180-
offset: String(messages.length - 1),
181-
}),
182-
})
183-
)
189+
expect(specificPartitionMessages[specificPartitionMessages.length - 1]).toEqual(
190+
expect.objectContaining({
191+
topic: topicName,
192+
partition: p,
193+
message: expect.objectContaining({
194+
key: Buffer.from(specificExpectedMessages[specificExpectedMessages.length - 1].key),
195+
value: Buffer.from(specificExpectedMessages[specificExpectedMessages.length - 1].value),
196+
offset: String(specificExpectedMessages.length - 1),
197+
}),
198+
})
199+
);
200+
201+
// check if all offsets are present
202+
expect(specificPartitionMessages.map(m => m.message.offset)).toEqual(specificExpectedMessages.map((_, i) => `${i}`))
203+
}
184204

185-
// check if all offsets are present
186-
expect(messagesConsumed.map(m => m.message.offset)).toEqual(messages.map((_, i) => `${i}`))
187205
});
188206

189207
it('is able to reconsume messages after not resolving it', async () => {
@@ -194,6 +212,7 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
194212
let messageSeen = false;
195213
const messagesConsumed = [];
196214
consumer.run({
215+
partitionsConsumedConcurrently,
197216
eachBatchAutoResolve: false,
198217
eachBatch: async event => {
199218
expect(event.batch.messages.length).toEqual(1);
@@ -216,10 +235,11 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
216235
.fill()
217236
.map(() => {
218237
const value = secureRandom()
219-
return { key: `key-${value}`, value: `value-${value}` }
238+
return { key: `key-${value}`, value: `value-${value}`, partition: 0 }
220239
})
221240

222-
await producer.send({ topic: topicName, messages })
241+
await producer.send({ topic: topicName, messages });
242+
await waitFor(() => consumer.assignment().length > 0, () => { }, 100);
223243
await waitForMessages(messagesConsumed, { number: messages.length });
224244
});
225245

@@ -231,6 +251,7 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
231251
let messageSeen = false;
232252
const messagesConsumed = [];
233253
consumer.run({
254+
partitionsConsumedConcurrently,
234255
eachBatchAutoResolve: isAutoResolve,
235256
eachBatch: async event => {
236257
expect(event.batch.messages.length).toEqual(1);
@@ -250,10 +271,10 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
250271
.fill()
251272
.map(() => {
252273
const value = secureRandom()
253-
return { key: `key-${value}`, value: `value-${value}` }
274+
return { key: `key-${value}`, value: `value-${value}`, partition: 0 };
254275
})
255276

256-
await producer.send({ topic: topicName, messages })
277+
await producer.send({ topic: topicName, messages });
257278
await waitForMessages(messagesConsumed, { number: messages.length });
258279
});
259280

@@ -264,6 +285,7 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
264285

265286
const messagesConsumed = [];
266287
consumer.run({
288+
partitionsConsumedConcurrently,
267289
eachBatchAutoResolve: isAutoResolve,
268290
eachBatch: async event => {
269291
messagesConsumed.push(...event.batch.messages);
@@ -277,7 +299,7 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
277299
.fill()
278300
.map(() => {
279301
const value = secureRandom()
280-
return { key: `key-${value}`, value: `value-${value}` }
302+
return { key: `key-${value}`, value: `value-${value}`, partition: 0 }
281303
})
282304

283305
await producer.send({ topic: topicName, messages })
@@ -287,49 +309,45 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
287309
expect(messagesConsumed[1].key.toString()).toBe(messages[1].key);
288310
});
289311

290-
/* Skip until concurrency support for eachMessage is added. */
291-
it.skip('consumes messages concurrently', async () => {
292-
const partitionsConsumedConcurrently = 2
312+
it('consumes messages concurrently where partitionsConsumedConcurrently - partitions = diffConcurrencyPartitions', async () => {
313+
const partitions = 3;
314+
/* We want partitionsConsumedConcurrently to be 2, 3, and 4 rather than 1, 2, and 3 that is tested by the test. */
315+
const partitionsConsumedConcurrentlyDiff = partitionsConsumedConcurrently + 1;
293316
topicName = `test-topic-${secureRandom()}`
294317
await createTopic({
295318
topic: topicName,
296-
partitions: partitionsConsumedConcurrently + 1,
319+
partitions: partitions,
297320
})
298321
await consumer.connect()
299322
await producer.connect()
300323
await consumer.subscribe({ topic: topicName })
301324

302-
let inProgress = 0
303-
let hitConcurrencyLimit = false
304-
consumer.on(consumer.events.START_BATCH_PROCESS, () => {
305-
inProgress++
306-
expect(inProgress).toBeLessThanOrEqual(partitionsConsumedConcurrently)
307-
hitConcurrencyLimit = hitConcurrencyLimit || inProgress === partitionsConsumedConcurrently
308-
})
309-
consumer.on(consumer.events.END_BATCH_PROCESS, () => inProgress--)
310-
311-
const messagesConsumed = []
325+
let inProgress = 0;
326+
let inProgressMaxValue = 0;
327+
const messagesConsumed = [];
312328
consumer.run({
313-
partitionsConsumedConcurrently,
329+
partitionsConsumedConcurrently: partitionsConsumedConcurrentlyDiff,
314330
eachMessage: async event => {
315-
await sleep(1)
316-
messagesConsumed.push(event)
331+
inProgress++;
332+
await sleep(1);
333+
messagesConsumed.push(event);
334+
inProgressMaxValue = Math.max(inProgress, inProgressMaxValue)
335+
inProgress--;
317336
},
318337
})
319338

320-
await waitForConsumerToJoinGroup(consumer)
339+
await waitFor(() => consumer.assignment().length > 0, () => { }, 100);
321340

322-
const messages = Array(100)
341+
const messages = Array(1024*9)
323342
.fill()
324-
.map(() => {
325-
const value = secureRandom()
326-
return { key: `key-${value}`, value: `value-${value}` }
327-
})
328-
329-
await producer.send({ topic: topicName, messages })
330-
await waitForMessages(messagesConsumed, { number: messages.length })
343+
.map((_, i) => {
344+
const value = secureRandom(512)
345+
return { key: `key-${value}`, value: `value-${value}`, partition: i % partitions }
346+
});
331347

332-
expect(hitConcurrencyLimit).toBeTrue()
348+
await producer.send({ topic: topicName, messages });
349+
await waitForMessages(messagesConsumed, { number: messages.length });
350+
expect(inProgressMaxValue).toBe(Math.min(partitionsConsumedConcurrentlyDiff, partitions));
333351
});
334352

335353
it('consume GZIP messages', async () => {
@@ -346,9 +364,9 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
346364
consumer.run({ eachMessage: async event => messagesConsumed.push(event) });
347365

348366
const key1 = secureRandom();
349-
const message1 = { key: `key-${key1}`, value: `value-${key1}` };
367+
const message1 = { key: `key-${key1}`, value: `value-${key1}`, partition: 0 };
350368
const key2 = secureRandom();
351-
const message2 = { key: `key-${key2}`, value: `value-${key2}` };
369+
const message2 = { key: `key-${key2}`, value: `value-${key2}`, partition: 0 };
352370

353371
await producer.send({
354372
topic: topicName,
@@ -477,7 +495,7 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
477495
.fill()
478496
.map(() => {
479497
const value = secureRandom()
480-
return { key: `key-${value}`, value: `value-${value}` }
498+
return { key: `key-${value}`, value: `value-${value}`, partition: 0 }
481499
});
482500

483501
await consumer.connect();
@@ -704,7 +722,7 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
704722
await consumer.subscribe({ topic: topicName });
705723

706724
const messagesConsumed = []
707-
const idempotentMessages = generateMessages({ prefix: 'idempotent' })
725+
const idempotentMessages = generateMessages({ prefix: 'idempotent', partition: 0 })
708726

709727
consumer.run({
710728
eachMessage: async event => messagesConsumed.push(event),
@@ -743,9 +761,9 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
743761

744762
const messagesConsumed = [];
745763

746-
const messages1 = generateMessages({ prefix: 'txn1' });
747-
const messages2 = generateMessages({ prefix: 'txn2' });
748-
const nontransactionalMessages1 = generateMessages({ prefix: 'nontransactional1', number: 1 });
764+
const messages1 = generateMessages({ prefix: 'txn1', partition: 0 });
765+
const messages2 = generateMessages({ prefix: 'txn2', partition: 0 });
766+
const nontransactionalMessages1 = generateMessages({ prefix: 'nontransactional1', number: 1, partition: 0 });
749767

750768
consumer.run({
751769
eachMessage: async event => messagesConsumed.push(event),
@@ -800,9 +818,9 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
800818

801819
const messagesConsumed = []
802820

803-
const abortedMessages1 = generateMessages({ prefix: 'aborted-txn-1' });
804-
const abortedMessages2 = generateMessages({ prefix: 'aborted-txn-2' });
805-
const committedMessages = generateMessages({ prefix: 'committed-txn', number: 10 });
821+
const abortedMessages1 = generateMessages({ prefix: 'aborted-txn-1', partition: 0 });
822+
const abortedMessages2 = generateMessages({ prefix: 'aborted-txn-2', partition: 0 });
823+
const committedMessages = generateMessages({ prefix: 'committed-txn', number: 10, partition: 0 });
806824

807825
consumer.run({
808826
eachMessage: async event => messagesConsumed.push(event),
@@ -858,7 +876,7 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
858876

859877
const messagesConsumed = [];
860878

861-
const abortedMessages = generateMessages({ prefix: 'aborted-txn1' });
879+
const abortedMessages = generateMessages({ prefix: 'aborted-txn1', partition: 0 });
862880

863881
consumer.run({
864882
eachMessage: async event => messagesConsumed.push(event),

0 commit comments

Comments
 (0)