Skip to content

Commit 1b33604

Browse files
authored
Cooperative Rebalance (#1081)
1 parent fd8039e commit 1b33604

File tree

9 files changed

+348
-15
lines changed

9 files changed

+348
-15
lines changed

e2e/both.spec.js

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,65 @@ describe('Consumer/Producer', function() {
614614
});
615615
});
616616

617+
describe('Cooperative sticky', function() {
618+
var consumer;
619+
620+
beforeEach(function(done) {
621+
var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
622+
623+
var consumerOpts = {
624+
'metadata.broker.list': kafkaBrokerList,
625+
'group.id': grp,
626+
'fetch.wait.max.ms': 1000,
627+
'session.timeout.ms': 10000,
628+
'enable.auto.commit': false,
629+
'debug': 'all',
630+
'partition.assignment.strategy': 'cooperative-sticky'
631+
};
632+
633+
consumer = new Kafka.KafkaConsumer(consumerOpts, {
634+
'auto.offset.reset': 'largest',
635+
});
636+
637+
consumer.connect({}, function(err, d) {
638+
t.ifError(err);
639+
t.equal(typeof d, 'object', 'metadata should be returned');
640+
done();
641+
});
642+
643+
eventListener(consumer);
644+
});
645+
646+
afterEach(function(done) {
647+
consumer.disconnect(function() {
648+
done();
649+
});
650+
});
651+
652+
it('should be able to produce and consume messages', function (done) {
653+
var key = 'key';
654+
655+
crypto.randomBytes(4096, function(ex, buffer) {
656+
producer.setPollInterval(10);
657+
658+
consumer.on('data', function(message) {
659+
t.equal(buffer.toString(), message.value.toString(), 'invalid message value');
660+
t.equal(key, message.key, 'invalid message key');
661+
t.equal(topic, message.topic, 'invalid message topic');
662+
t.ok(message.offset >= 0, 'invalid message offset');
663+
done();
664+
});
665+
666+
consumer.subscribe([topic]);
667+
consumer.consume();
668+
669+
setTimeout(function() {
670+
producer.produce(topic, null, buffer, key);
671+
}, 2000);
672+
});
673+
});
674+
});
675+
617676
function assert_headers_match(expectedHeaders, messageHeaders) {
618677
t.equal(expectedHeaders.length, messageHeaders.length, 'Headers length does not match expected length');
619678
for (var i = 0; i < expectedHeaders.length; i++) {

e2e/consumer.spec.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,4 +344,43 @@ describe('Consumer', function() {
344344
});
345345

346346
});
347+
348+
describe('rebalance protocol', function () {
349+
var strategies = {
350+
'undefined': 'EAGER',
351+
'range': 'EAGER',
352+
'roundrobin': 'EAGER',
353+
'cooperative-sticky': 'COOPERATIVE',
354+
};
355+
356+
Object.keys(strategies).forEach(function (strategy) {
357+
it('should return ' + strategies[strategy] + ' for ' + strategy, function(done) {
358+
var consumer = new KafkaConsumer({
359+
...gcfg,
360+
...(strategy !== 'undefined' && { 'partition.assignment.strategy': strategy })
361+
}, {});
362+
363+
t.equal(consumer.rebalanceProtocol(), 'NONE');
364+
365+
consumer.connect({ timeout: 2000 }, function(err) {
366+
t.ifError(err);
367+
368+
consumer.subscribe([topic]);
369+
370+
consumer.on('rebalance', function (err) {
371+
if (err.code === -175) {
372+
t.equal(consumer.rebalanceProtocol(), strategies[strategy]);
373+
consumer.disconnect(done);
374+
}
375+
});
376+
377+
consumer.consume(1, function(err) {
378+
t.ifError(err);
379+
});
380+
});
381+
382+
eventListener(consumer);
383+
});
384+
});
385+
});
347386
});

index.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
208208
constructor(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig);
209209

210210
assign(assignments: Assignment[]): this;
211+
incrementalAssign(assignments: Assignment[]): this;
211212

212213
assignments(): Assignment[];
213214

@@ -248,12 +249,15 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
248249
subscription(): string[];
249250

250251
unassign(): this;
252+
incrementalUnassign(assignments: Assignment[]): this;
251253

252254
unsubscribe(): this;
253255

254256
offsetsForTimes(topicPartitions: TopicPartitionTime[], timeout: number, cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;
255257
offsetsForTimes(topicPartitions: TopicPartitionTime[], cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;
256258

259+
rebalanceProtocol(): string;
260+
257261
static createReadStream(conf: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream;
258262
}
259263

lib/kafka-consumer.js

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,20 @@ function KafkaConsumer(conf, topicConf) {
5858
// Emit the event
5959
self.emit('rebalance', err, assignment);
6060

61-
// That's it
61+
// That's it.
6262
try {
6363
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
64-
self.assign(assignment);
64+
if (self.rebalanceProtocol() === 'COOPERATIVE') {
65+
self.incrementalAssign(assignment);
66+
} else {
67+
self.assign(assignment);
68+
}
6569
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
66-
self.unassign();
70+
if (self.rebalanceProtocol() === 'COOPERATIVE') {
71+
self.incrementalUnassign(assignment);
72+
} else {
73+
self.unassign();
74+
}
6775
}
6876
} catch (e) {
6977
// Ignore exceptions if we are not connected
@@ -275,6 +283,40 @@ KafkaConsumer.prototype.unassign = function() {
275283
return this;
276284
};
277285

286+
/**
287+
* Assign the consumer specific partitions and topics. Used for
288+
* cooperative rebalancing.
289+
*
290+
* @param {array} assignments - Assignments array. Should contain
291+
* objects with topic and partition set. Assignments are additive.
292+
* @return {Client} - Returns itself
293+
*/
294+
KafkaConsumer.prototype.incrementalAssign = function(assignments) {
295+
this._client.incrementalAssign(TopicPartition.map(assignments));
296+
return this;
297+
};
298+
299+
/**
300+
* Unassign the consumer specific partitions and topics. Used for
301+
* cooperative rebalancing.
302+
*
303+
* @param {array} assignments - Assignments array. Should contain
304+
* objects with topic and partition set. Assignments are subtractive.
305+
* @return {Client} - Returns itself
306+
*/
307+
KafkaConsumer.prototype.incrementalUnassign = function(assignments) {
308+
this._client.incrementalUnassign(TopicPartition.map(assignments));
309+
return this;
310+
};
311+
312+
/**
313+
* Get the type of rebalance protocol used in the consumer group.
314+
*
315+
* @returns "NONE", "COOPERATIVE" or "EAGER".
316+
*/
317+
KafkaConsumer.prototype.rebalanceProtocol = function() {
318+
return this._client.rebalanceProtocol();
319+
}
278320

279321
/**
280322
* Get the assignments for the consumer

src/connection.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,18 @@ Connection::~Connection() {
6868
}
6969
}
7070

71+
Baton Connection::rdkafkaErrorToBaton(RdKafka::Error* error) {
72+
if ( NULL == error) {
73+
return Baton(RdKafka::ERR_NO_ERROR);
74+
}
75+
else {
76+
Baton result(error->code(), error->str(), error->is_fatal(),
77+
error->is_retriable(), error->txn_requires_abort());
78+
delete error;
79+
return result;
80+
}
81+
}
82+
7183
RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) {
7284
return RdKafka::TopicPartition::create(topic, RdKafka::Topic::PARTITION_UA);
7385
}

src/connection.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class Connection : public Nan::ObjectWrap {
8080

8181
static Nan::Persistent<v8::Function> constructor;
8282
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
83+
static Baton rdkafkaErrorToBaton(RdKafka::Error* error);
8384

8485
bool m_has_been_disconnected;
8586
bool m_is_closing;

0 commit comments

Comments
 (0)