Skip to content

Commit 2adc75e

Browse files
authored
Add assign/unassign within rebalance callbacks
Also remove onPartitionsAssigned and onPartitionsRevoked.
1 parent 56759d8 commit 2adc75e

File tree

5 files changed

+214
-73
lines changed

5 files changed

+214
-73
lines changed

examples/kafkajs/consumer.js

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
2-
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
2+
const { Kafka, ErrorCodes } = require('@confluentinc/kafka-javascript').KafkaJS;
33

44
async function consumerStart() {
55
let consumer;
@@ -22,22 +22,17 @@ async function consumerStart() {
2222
kafkaJS: {
2323
groupId: 'test-group',
2424
autoCommit: false,
25-
rebalanceListener: {
26-
onPartitionsAssigned: async (assignment) => {
27-
console.log(`Assigned partitions ${JSON.stringify(assignment)}`);
28-
},
29-
onPartitionsRevoked: async (assignment) => {
30-
console.log(`Revoked partitions ${JSON.stringify(assignment)}`);
31-
if (!stopped) {
32-
await consumer.commitOffsets().catch((e) => {
33-
console.error(`Failed to commit ${e}`);
34-
})
35-
}
36-
}
37-
},
3825
},
39-
4026
/* Properties from librdkafka can also be used */
27+
rebalance_cb: (err, assignment) => {
28+
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
29+
console.log(`Assigned partitions ${JSON.stringify(assignment)}`);
30+
} else if (err.code === ErrorCodes.ERR__REVOKE_PARTITIONS) {
31+
console.log(`Revoked partitions ${JSON.stringify(assignment)}`);
32+
} else {
33+
console.error(`Rebalance error ${err}`);
34+
}
35+
},
4136
'auto.commit.interval.ms': 6000,
4237
});
4338

lib/kafkajs/_common.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ const kafkaJSProperties = {
5252
'autoCommit',
5353
'autoCommitInterval',
5454
'autoCommitThreshold',
55-
'rebalanceListener',
5655
],
5756
admin: [],
5857
};

lib/kafkajs/_consumer.js

Lines changed: 58 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -248,45 +248,67 @@ class Consumer {
248248
err = LibrdKafkaError.create(err);
249249
const userSpecifiedRebalanceCb = this.#userConfig['rebalance_cb'];
250250

251-
let call;
251+
let assignmentFnCalled = false;
252+
function assignmentFn(userAssignment) {
253+
if (assignmentFnCalled)
254+
return;
255+
assignmentFnCalled = true;
256+
257+
if (this.#internalClient.rebalanceProtocol() === "EAGER") {
258+
this.#internalClient.assign(userAssignment);
259+
this.#partitionCount = userAssignment.length;
260+
} else {
261+
this.#internalClient.incrementalAssign(userAssignment);
262+
this.#partitionCount += userAssignment.length;
263+
}
264+
}
252265

253-
/* Since we don't expose assign() or incremental_assign() methods, we allow the user
254-
* to modify the assignment by returning it. If a truthy value is returned, we use that
255-
* and do not apply any pending seeks to it either. */
266+
function unassignmentFn(userAssignment) {
267+
if (assignmentFnCalled)
268+
return;
269+
270+
assignmentFnCalled = true;
271+
if (this.#internalClient.rebalanceProtocol() === "EAGER") {
272+
this.#internalClient.unassign();
273+
this.#messageCache.removeTopicPartitions();
274+
this.#partitionCount = 0;
275+
} else {
276+
this.#internalClient.incrementalUnassign(userAssignment);
277+
this.#messageCache.removeTopicPartitions(userAssignment);
278+
this.#partitionCount -= userAssignment.length;
279+
}
280+
}
281+
282+
let call = Promise.resolve();
283+
284+
/* We allow the user to modify the assignment by returning it. If a truthy
285+
* value is returned, we use that and do not apply any pending seeks to it either.
286+
* The user can alternatively use the assignmentFns argument.
287+
* Precedence is given to the calling of functions within assignmentFns. */
256288
let assignmentModified = false;
257289
if (typeof userSpecifiedRebalanceCb === 'function') {
258290
call = new Promise((resolve, reject) => {
259-
try {
260-
const alternateAssignment = userSpecifiedRebalanceCb(err, assignment);
291+
const assignmentFns = {
292+
assign: assignmentFn.bind(this),
293+
unassign: unassignmentFn.bind(this),
294+
};
295+
296+
/* The user specified callback may be async, or sync. Wrapping it in a
297+
* Promise.resolve ensures that we always get a promise back. */
298+
return Promise.resolve(
299+
userSpecifiedRebalanceCb(err, assignment, assignmentFns)
300+
).then(alternateAssignment => {
261301
if (alternateAssignment) {
262302
assignment = alternateAssignment;
263303
assignmentModified = true;
264304
}
265305
resolve();
266-
} catch (e) {
267-
reject(e);
268-
}
306+
}).catch(reject);
269307
});
270-
} else {
271-
switch (err.code) {
272-
// TODO: is this the right way to handle this error?
273-
// We might just be able to throw, because the error is something the user has caused.
274-
case LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS:
275-
call = (this.#userConfig.rebalanceListener.onPartitionsAssigned ?
276-
this.#userConfig.rebalanceListener.onPartitionsAssigned(assignment) :
277-
Promise.resolve()).catch(e => this.#logger.error(e));
278-
break;
279-
case LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS:
280-
call = (this.#userConfig.rebalanceListener.onPartitionsRevoked ?
281-
this.#userConfig.rebalanceListener.onPartitionsRevoked(assignment) :
282-
Promise.resolve()).catch(e => this.#logger.error(e));
283-
break;
284-
default:
285-
call = Promise.reject(`Unexpected rebalanceListener error code ${err.code}`).catch((e) => {
286-
this.#logger.error(e);
287-
});
288-
break;
289-
}
308+
} else if (err.code !== LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS && err.code !== LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS) {
309+
call = Promise.reject(`Unexpected rebalance_cb error code ${err.code}`).catch((e) => {
310+
this.#logger.error(e);
311+
});
290312
}
291313

292314
call
@@ -311,16 +333,10 @@ class Consumer {
311333
if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
312334

313335
const checkPendingSeeks = this.#pendingSeeks.size !== 0;
314-
if (checkPendingSeeks && !assignmentModified)
336+
if (checkPendingSeeks && !assignmentModified && !assignmentFnCalled)
315337
assignment = this.#assignAsPerSeekedOffsets(assignment);
316338

317-
if (this.#internalClient.rebalanceProtocol() === "EAGER") {
318-
this.#internalClient.assign(assignment);
319-
this.#partitionCount = assignment.length;
320-
} else {
321-
this.#internalClient.incrementalAssign(assignment);
322-
this.#partitionCount += assignment.length;
323-
}
339+
assignmentFn.call(this, assignment);
324340

325341
if (checkPendingSeeks) {
326342
const offsetsToCommit = assignment
@@ -342,15 +358,7 @@ class Consumer {
342358
this.#messageCache.addTopicPartitions(assignment);
343359

344360
} else {
345-
if (this.#internalClient.rebalanceProtocol() === "EAGER") {
346-
this.#internalClient.unassign();
347-
this.#messageCache.removeTopicPartitions();
348-
this.#partitionCount = 0;
349-
} else {
350-
this.#internalClient.incrementalUnassign(assignment);
351-
this.#messageCache.removeTopicPartitions(assignment);
352-
this.#partitionCount -= assignment.length;
353-
}
361+
unassignmentFn.call(this, assignment);
354362
}
355363
} catch (e) {
356364
// Ignore exceptions if we are not connected
@@ -522,16 +530,10 @@ class Consumer {
522530

523531
/* Delete properties which are already processed, or cannot be passed to node-rdkafka */
524532
delete rdKafkaConfig.kafkaJS;
525-
delete rdKafkaConfig.rebalanceListener;
526533

527534
/* Certain properties that the user has set are overridden. We use trampolines to accommodate the user's callbacks.
528535
* TODO: add trampoline method for offset commit callback. */
529536
rdKafkaConfig['offset_commit_cb'] = true;
530-
531-
if (!Object.hasOwn(this.#userConfig, 'rebalanceListener')) {
532-
/* We might want to do certain things to maintain internal state in rebalance listener, so we need to set it to an empty object. */
533-
this.#userConfig.rebalanceListener = {};
534-
}
535537
rdKafkaConfig['rebalance_cb'] = this.#rebalanceCallback.bind(this);
536538

537539
/* Offset management is different from case to case.
@@ -1587,6 +1589,7 @@ class Consumer {
15871589
if (!topic.partitions) {
15881590
toppar.partitions = this.#getAllAssignedPartition(topic.topic);
15891591
} else {
1592+
/* TODO: add a check here to make sure we own each partition */
15901593
toppar.partitions = [...topic.partitions];
15911594
}
15921595

@@ -1597,6 +1600,8 @@ class Consumer {
15971600
if (flattenedToppars.length === 0) {
15981601
return;
15991602
}
1603+
1604+
/* TODO: error handling is lacking for pause, including partition level errors. */
16001605
this.#internalClient.pause(flattenedToppars);
16011606

16021607
/* Mark the messages in the cache as stale, runInternal* will deal with
@@ -1608,7 +1613,7 @@ class Consumer {
16081613
.filter(key => this.#topicPartitionToBatchPayload.has(key))
16091614
.forEach(key => this.#topicPartitionToBatchPayload.get(key)._stale = true);
16101615

1611-
flattenedToppars.map(JSON.stringify).forEach(topicPartition => this.#pausedPartitions.add(topicPartition));
1616+
flattenedToppars.map(JSON.stringify).forEach(topicPartition => this.#pausedPartitions.add(topicPartition));
16121617

16131618
/* Note: we don't use flattenedToppars here because resume flattens them again. */
16141619
return () => this.resume(toppars);

test/promisified/consumer/incrementalRebalance.spec.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,11 @@ describe('Consumer > incremental rebalance', () => {
148148
let revokes = 0;
149149

150150
consumer = createConsumer(consumerConfig, {
151-
rebalanceListener: {
152-
onPartitionsAssigned: async (assignment) => {
151+
rebalance_cb: async (err, assignment) => {
152+
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
153153
assigns++;
154154
expect(assignment.length).toBe(2);
155-
},
156-
onPartitionsRevoked: async (assignment) => {
155+
} else if (err.code === ErrorCodes.ERR__REVOKE_PARTITIONS) {
157156
revokes++;
158157
expect(assignment.length).toBe(2);
159158
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
jest.setTimeout(30000);
2+
3+
const { waitFor,
4+
secureRandom,
5+
createTopic,
6+
createConsumer,
7+
createProducer,
8+
sleep, } = require("../testhelpers");
9+
const { ErrorCodes } = require('../../../lib').KafkaJS;
10+
11+
describe('Consumer', () => {
12+
let consumer;
13+
let groupId, topicName;
14+
let consumerConfig;
15+
16+
beforeEach(async () => {
17+
topicName = `test-topic-${secureRandom()}`;
18+
groupId = `consumer-group-id-${secureRandom()}`;
19+
consumerConfig = {
20+
groupId,
21+
};
22+
consumer = null;
23+
await createTopic({ topic: topicName, partitions: 3 });
24+
});
25+
26+
afterEach(async () => {
27+
consumer && (await consumer.disconnect());
28+
});
29+
30+
it('calls rebalance callback', async () => {
31+
let calls = 0;
32+
consumer = createConsumer(consumerConfig, {
33+
rebalance_cb: function () {
34+
calls++;
35+
}
36+
});
37+
38+
await consumer.connect();
39+
await consumer.subscribe({ topic: topicName });
40+
consumer.run({ eachMessage: async () => { } });
41+
await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
42+
expect(calls).toBe(1); /* assign */
43+
await consumer.disconnect();
44+
expect(calls).toBe(2); /* assign + unassign */
45+
consumer = null;
46+
});
47+
48+
it('allows modifying the assignment via returns', async () => {
49+
consumer = createConsumer(consumerConfig, {
50+
rebalance_cb: function (err, assignment) {
51+
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
52+
expect(assignment.length).toBe(3);
53+
return assignment.filter(a => a.partition !== 0);
54+
}
55+
}
56+
});
57+
58+
await consumer.connect();
59+
await consumer.subscribe({ topic: topicName });
60+
consumer.run({ eachMessage: async () => { } });
61+
await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
62+
expect(consumer.assignment().length).toBe(2);
63+
expect(consumer.assignment()).toEqual(
64+
expect.arrayContaining([
65+
{ topic: topicName, partition: 1 },
66+
{ topic: topicName, partition: 2 }]));
67+
});
68+
69+
it('allows modifying the assigment via assignment functions', async () => {
70+
let calls = 0;
71+
consumer = createConsumer(consumerConfig, {
72+
rebalance_cb: function (err, assignment, assignmentFns) {
73+
calls++;
74+
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
75+
expect(assignment.length).toBe(3);
76+
assignmentFns.assign(assignment.filter(a => a.partition !== 0));
77+
} else {
78+
assignmentFns.unassign(assignment);
79+
}
80+
}
81+
});
82+
83+
await consumer.connect();
84+
await consumer.subscribe({ topic: topicName });
85+
consumer.run({ eachMessage: async () => { } });
86+
await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
87+
expect(consumer.assignment().length).toBe(2);
88+
expect(consumer.assignment()).toEqual(
89+
expect.arrayContaining([
90+
{ topic: topicName, partition: 1 },
91+
{ topic: topicName, partition: 2 }]));
92+
await consumer.disconnect();
93+
expect(calls).toBe(2);
94+
consumer = null;
95+
});
96+
97+
it('pauses correctly from the rebalance callback after assign', async () => {
98+
consumer = createConsumer(consumerConfig, {
99+
rebalance_cb: function (err, assignment, assignmentFns) {
100+
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
101+
expect(assignment.length).toBe(3);
102+
103+
/* Assign first so we can pause. */
104+
assignmentFns.assign(assignment);
105+
106+
/* Convert the assignment into format suitable for pause argument. */
107+
const pausablePartitions = [{ topic: topicName, partitions: [0, 1, 2] }];
108+
consumer.pause(pausablePartitions);
109+
} else {
110+
assignmentFns.unassign(assignment);
111+
}
112+
}
113+
});
114+
115+
let messagesConsumed = [];
116+
await consumer.connect();
117+
await consumer.subscribe({ topic: topicName });
118+
consumer.run({ eachMessage: async (e) => { messagesConsumed.push(e); } });
119+
await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
120+
121+
const producer = createProducer({});
122+
await producer.connect();
123+
const key1 = secureRandom();
124+
for (const partition of [0, 1, 2]) {
125+
const message = { key: `key-${key1}`, value: `value-${key1}`, partition };
126+
await producer.send({
127+
topic: topicName,
128+
messages: [message],
129+
});
130+
}
131+
await producer.disconnect();
132+
133+
expect(consumer.paused()).toEqual([{ topic: topicName, partitions: [0, 1, 2] }]);
134+
135+
/* Give it some extra time just in case - should be enough to get the messages if a partition isn't paused. */
136+
await sleep(1000);
137+
expect(messagesConsumed.length).toBe(0);
138+
139+
consumer.resume([ { topic: topicName } ]);
140+
await waitFor(() => messagesConsumed.length === 3, () => null, 1000);
141+
expect(messagesConsumed.length).toBe(3);
142+
});
143+
});

0 commit comments

Comments
 (0)