Skip to content

Commit fa65e50

Browse files
committed
Add parametrization to tests for auto commit
1 parent 6e49c46 commit fa65e50

File tree

4 files changed

+41
-15
lines changed

4 files changed

+41
-15
lines changed

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ const {
1313
waitForConsumerToJoinGroup,
1414
sleep,
1515
generateMessages,
16-
} = require('../testhelpers')
16+
} = require('../testhelpers');
1717

18-
describe('Consumer', () => {
18+
describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
1919
let topicName, groupId, producer, consumer;
2020

2121
beforeEach(async () => {
@@ -30,6 +30,7 @@ describe('Consumer', () => {
3030
groupId,
3131
maxWaitTimeInMs: 100,
3232
fromBeginning: true,
33+
autoCommit: isAutoCommit,
3334
});
3435
});
3536

@@ -307,6 +308,7 @@ describe('Consumer', () => {
307308
minBytes: 1024,
308309
maxWaitTimeInMs: 500,
309310
fromBeginning: true,
311+
autoCommit: isAutoCommit,
310312
})
311313

312314
const messages = Array(10)
@@ -492,6 +494,7 @@ describe('Consumer', () => {
492494
groupId,
493495
maxWaitTimeInMs: 100,
494496
fromBeginning: true,
497+
autoCommit: isAutoCommit
495498
});
496499

497500
await consumer.connect();
@@ -528,7 +531,8 @@ describe('Consumer', () => {
528531
consumer = createConsumer({
529532
groupId,
530533
maxWaitTimeInMs: 100,
531-
fromBeginning: true
534+
fromBeginning: true,
535+
autoCommit: isAutoCommit,
532536
});
533537

534538
await consumer.connect();
@@ -585,6 +589,7 @@ describe('Consumer', () => {
585589
groupId,
586590
maxWaitTimeInMs: 100,
587591
fromBeginning: true,
592+
autoCommit: isAutoCommit,
588593
});
589594

590595
await consumer.connect();
@@ -642,6 +647,7 @@ describe('Consumer', () => {
642647
maxWaitTimeInMs: 100,
643648
readUncommitted: true,
644649
fromBeginning: true,
650+
autoCommit: isAutoCommit,
645651
})
646652

647653
await consumer.connect();
@@ -678,6 +684,9 @@ describe('Consumer', () => {
678684
it(
679685
'respects offsets sent by a committed transaction ("consume-transform-produce" flow)',
680686
async () => {
687+
if (isAutoCommit) { /* only autoCommit: false makes sense for this test. */
688+
return;
689+
}
681690
// Seed the topic with some messages. We don't need a tx producer for this.
682691
await producer.connect();
683692
const partition = 0;
@@ -785,6 +794,10 @@ describe('Consumer', () => {
785794
it(
786795
'does not respect offsets sent by an aborted transaction ("consume-transform-produce" flow)',
787796
async () => {
797+
if (isAutoCommit) { /* only autoCommit: false makes sense for this test. */
798+
return;
799+
}
800+
788801
// Seed the topic with some messages. We don't need a tx producer for this.
789802
await producer.connect();
790803

test/promisified/consumer/consumerCacheTests.spec.js

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
jest.setTimeout(30000)
22

3+
const { is } = require('bluebird');
34
const {
45
secureRandom,
56
createTopic,
@@ -10,7 +11,7 @@ const {
1011
sleep,
1112
} = require('../testhelpers');
1213

13-
describe('Consumer message cache', () => {
14+
describe.each([[false], [true]])('Consumer message cache', (isAutoCommit) => {
1415
let topicName, groupId, producer, consumer;
1516

1617
beforeEach(async () => {
@@ -21,10 +22,12 @@ describe('Consumer message cache', () => {
2122

2223
producer = createProducer({});
2324

25+
const common = {};
2426
consumer = createConsumer({
2527
groupId,
2628
maxWaitTimeInMs: 100,
2729
fromBeginning: true,
30+
autoCommit: isAutoCommit,
2831
});
2932
});
3033

@@ -127,6 +130,7 @@ describe('Consumer message cache', () => {
127130
groupId,
128131
maxWaitTimeInMs: 100,
129132
fromBeginning: true,
133+
autoCommit: isAutoCommit,
130134
});
131135

132136
await consumer.connect();
@@ -142,6 +146,10 @@ describe('Consumer message cache', () => {
142146
eachMessage: async event => {
143147
messagesConsumed.push(event);
144148
messagesConsumedConsumer1.push(event);
149+
if (!isAutoCommit)
150+
await consumer.commitOffsets([
151+
{ topic: event.topic, partition: event.partition, offset: Number(event.message.offset) + 1 },
152+
]);
145153

146154
/* Until the second consumer joins, consume messages slowly so as to not consume them all
147155
* before the rebalance triggers. */
@@ -204,6 +212,7 @@ describe('Consumer message cache', () => {
204212
rebalanceTimeout: 10000,
205213
sessionTimeout: 10000,
206214
clientId: "impatientConsumer",
215+
autoCommit: isAutoCommit,
207216
});
208217

209218
await producer.connect();
@@ -219,6 +228,11 @@ describe('Consumer message cache', () => {
219228
eachMessage: async event => {
220229
messagesConsumed.push(event);
221230
impatientConsumerMessages.push(event);
231+
if (!isAutoCommit)
232+
await impatientConsumer.commitOffsets([
233+
{ topic: event.topic, partition: event.partition, offset: Number(event.message.offset) + 1 },
234+
]);
235+
222236
/* When the second consumer is joining, deliberately slow down message consumption.
223237
* We should still have a rebalance very soon, since we must expire the cache and
224238
* trigger a rebalance before max.poll.interval.ms.
@@ -268,5 +282,5 @@ describe('Consumer message cache', () => {
268282
expect(impatientConsumerMessages.length).toBeGreaterThan(0);
269283

270284
await impatientConsumer.disconnect();
271-
});
285+
}, 60000);
272286
});

test/promisified/consumer/subscribe.spec.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ jest.setTimeout(30000);
33
const { Kafka, ErrorCodes } = require('../../../lib').KafkaJS;
44
const { secureRandom,
55
createTopic,
6+
waitFor,
67
waitForMessages,
78
waitForConsumerToJoinGroup,
89
createProducer,
@@ -13,7 +14,6 @@ describe('Consumer', () => {
1314

1415
beforeEach(async () => {
1516
groupId = `consumer-group-id-${secureRandom()}`;
16-
1717
consumer = createConsumer({
1818
groupId,
1919
maxWaitTimeInMs: 1,
@@ -52,7 +52,7 @@ describe('Consumer', () => {
5252
});
5353

5454
consumer.run({ eachMessage: async event => messagesConsumed.push(event) });
55-
await waitForConsumerToJoinGroup(consumer);
55+
await waitFor(() => consumer.assignment().length > 0, () => null);
5656

5757
await producer.connect();
5858
await producer.sendBatch({

test/promisified/testhelpers.js

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,27 @@ const clusterInformation = {
1414

1515
const debug = process.env.TEST_DEBUG;
1616

17-
function makeConfig(config) {
17+
function makeConfig(config, common) {
1818
const kafkaJS = Object.assign(config, clusterInformation.kafkaJS);
19-
const common = {};
2019
if (debug) {
2120
common['debug'] = debug;
2221
}
2322

2423
return Object.assign(common, { kafkaJS });
2524
}
2625

27-
function createConsumer(config) {
28-
const kafka = new Kafka(makeConfig(config));
26+
function createConsumer(config, common = {}) {
27+
const kafka = new Kafka(makeConfig(config, common));
2928
return kafka.consumer();
3029
}
3130

32-
function createProducer(config) {
33-
const kafka = new Kafka(makeConfig(config));
31+
function createProducer(config, common = {}) {
32+
const kafka = new Kafka(makeConfig(config, common));
3433
return kafka.producer();
3534
}
3635

37-
function createAdmin(config) {
38-
const kafka = new Kafka(makeConfig(config));
36+
function createAdmin(config, common = {}) {
37+
const kafka = new Kafka(makeConfig(config, common));
3938
return kafka.admin();
4039
}
4140

0 commit comments

Comments
 (0)