Skip to content

Commit d921875

Browse files
committed
Add tests for incremental rebalancing
1 parent 755dafb commit d921875

File tree

1 file changed

+176
-0
lines changed

1 file changed

+176
-0
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
jest.setTimeout(30000);
2+
3+
const { waitFor,
4+
secureRandom,
5+
createTopic,
6+
createProducer,
7+
createConsumer, } = require("../testhelpers");
8+
const { PartitionAssigners, ErrorCodes } = require('../../../lib').KafkaJS;
9+
10+
describe('Consumer > incremental rebalance', () => {
11+
let consumer;
12+
let groupId, topicName;
13+
14+
const consumerConfig = {
15+
groupId,
16+
partitionAssigners: [PartitionAssigners.cooperativeSticky],
17+
};
18+
19+
beforeEach(async () => {
20+
topicName = `test-topic1-${secureRandom()}`;
21+
groupId = `consumer-group-id-${secureRandom()}`
22+
consumer = null;
23+
await createTopic({ topic: topicName, partitions: 2 });
24+
});
25+
26+
afterEach(async () => {
27+
consumer && (await consumer.disconnect())
28+
});
29+
30+
it('returns protocol name', async () => {
31+
consumer = createConsumer(consumerConfig);
32+
await consumer.connect();
33+
await consumer.subscribe({ topic: topicName });
34+
consumer.run({ eachMessage: async () => { } });
35+
36+
await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
37+
38+
expect(consumer.rebalanceProtocol()).toEqual('COOPERATIVE');
39+
});
40+
41+
it('calls rebalance callback', async () => {
42+
let assigns = 0;
43+
let revokes = 0;
44+
const rebalanceCallback = function (err, assignment) {
45+
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
46+
assigns++;
47+
expect(assignment.length).toBe(2);
48+
} else if (err.code === ErrorCodes.ERR__REVOKE_PARTITIONS) {
49+
revokes++;
50+
expect(assignment.length).toBe(2);
51+
} else {
52+
// It's either assign or revoke and nothing else.
53+
jest.fail('Unexpected error code');
54+
}
55+
}
56+
57+
58+
consumer = createConsumer(consumerConfig, {
59+
'rebalance_cb': rebalanceCallback,
60+
});
61+
await consumer.connect();
62+
await consumer.subscribe({ topic: topicName });
63+
consumer.run({ eachMessage: async () => { } });
64+
65+
await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
66+
expect(assigns).toBe(1);
67+
expect(consumer.assignment().length).toBe(2);
68+
69+
await consumer.disconnect();
70+
consumer = null;
71+
expect(revokes).toBe(1);
72+
expect(assigns).toBe(1);
73+
});
74+
75+
it('allows changing the assignment', async () => {
76+
let assigns = 0;
77+
const rebalanceCallback = function (err, assignment) {
78+
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
79+
assigns++;
80+
expect(assignment.length).toBe(2);
81+
assignment = [assignment[0]];
82+
return assignment;
83+
} else {
84+
// It's either assign or revoke and nothing else.
85+
expect(err.code).toBe(ErrorCodes.ERR__REVOKE_PARTITIONS);
86+
}
87+
}
88+
89+
90+
consumer = createConsumer(consumerConfig, {
91+
'rebalance_cb': rebalanceCallback,
92+
});
93+
await consumer.connect();
94+
await consumer.subscribe({ topic: topicName });
95+
consumer.run({ eachMessage: async () => { } });
96+
97+
await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
98+
expect(assigns).toBe(1);
99+
expect(consumer.assignment().length).toBe(1);
100+
});
101+
102+
it('is actually incremental', async () => {
103+
let expectedAssignmentCount = 0;
104+
const rebalanceCallback = (err, assignment) => {
105+
/* Empty assignments are ignored, they're a rebalance for the synchronization barrier. */
106+
if (assignment.length === 0)
107+
return;
108+
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
109+
expect(assignment.length).toBe(expectedAssignmentCount);
110+
} else if (err.code === ErrorCodes.ERR__REVOKE_PARTITIONS) {
111+
expect(assignment.length).toBe(expectedAssignmentCount);
112+
} else {
113+
// It's either assign or revoke and nothing else.
114+
jest.fail('Unexpected error code');
115+
}
116+
}
117+
118+
/* First consumer joins and gets all partitions. */
119+
expectedAssignmentCount = 2;
120+
consumer = createConsumer(consumerConfig, {
121+
'rebalance_cb': rebalanceCallback,
122+
});
123+
124+
await consumer.connect();
125+
await consumer.subscribe({ topic: topicName });
126+
consumer.run({ eachMessage: async () => { } });
127+
128+
await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
129+
expect(consumer.assignment().length).toBe(2);
130+
131+
/* Second consumer joins and gets one partition. */
132+
expectedAssignmentCount = 1;
133+
const consumer2 = createConsumer(consumerConfig, {
134+
'rebalance_cb': rebalanceCallback,
135+
});
136+
137+
await consumer2.connect();
138+
await consumer2.subscribe({ topic: topicName });
139+
consumer2.run({ eachMessage: async () => { } });
140+
await waitFor(() => consumer2.assignment().length > 0, () => null, 1000);
141+
expect(consumer.assignment().length).toBe(1);
142+
expect(consumer2.assignment().length).toBe(1);
143+
144+
await consumer2.disconnect();
145+
});
146+
147+
it('works with promisified handler', async () => {
148+
let assigns = 0;
149+
let revokes = 0;
150+
151+
consumer = createConsumer(consumerConfig, {
152+
rebalanceListener: {
153+
onPartitionsAssigned: async (assignment) => {
154+
assigns++;
155+
expect(assignment.length).toBe(2);
156+
},
157+
onPartitionsRevoked: async (assignment) => {
158+
revokes++;
159+
expect(assignment.length).toBe(2);
160+
}
161+
},
162+
});
163+
await consumer.connect();
164+
await consumer.subscribe({ topic: topicName });
165+
consumer.run({ eachMessage: async () => { } });
166+
167+
await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
168+
expect(assigns).toBe(1);
169+
expect(consumer.assignment().length).toBe(2);
170+
171+
await consumer.disconnect();
172+
consumer = null;
173+
expect(revokes).toBe(1);
174+
expect(assigns).toBe(1);
175+
});
176+
});

0 commit comments

Comments
 (0)