@@ -21,6 +21,49 @@ var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500;
2121var DEFAULT_CONSUME_TIME_OUT = 1000 ;
2222util . inherits ( KafkaConsumer , Client ) ;
2323
24+ var eagerRebalanceCallback = function ( err , assignment ) {
25+ // Create the librdkafka error
26+ err = LibrdKafkaError . create ( err ) ;
27+ // Emit the event
28+ this . emit ( 'rebalance' , err , assignment ) ;
29+
30+ // That's it
31+ try {
32+ if ( err . code === - 175 /*ERR__ASSIGN_PARTITIONS*/ ) {
33+ this . assign ( assignment ) ;
34+ } else if ( err . code === - 174 /*ERR__REVOKE_PARTITIONS*/ ) {
35+ this . unassign ( ) ;
36+ }
37+ } catch ( e ) {
38+ // Ignore exceptions if we are not connected
39+ if ( this . isConnected ( ) ) {
40+ this . emit ( 'rebalance.error' , e ) ;
41+ }
42+ }
43+ }
44+
45+ var cooperativeRebalanceCallback = function ( err , assignment ) {
46+ // Create the librdkafka error
47+ err = LibrdKafkaError . create ( err ) ;
48+ // Emit the event
49+ this . emit ( 'rebalance' , err , assignment ) ;
50+
51+ // That's it
52+ try {
53+ if ( err . code === - 175 /*ERR__ASSIGN_PARTITIONS*/ ) {
54+ this . incrementalAssign ( assignment ) ;
55+ } else if ( err . code === - 174 /*ERR__REVOKE_PARTITIONS*/ ) {
56+ this . incrementalUnassign ( assignment ) ;
57+ }
58+ } catch ( e ) {
59+ // Ignore exceptions if we are not connected
60+ if ( this . isConnected ( ) ) {
61+ this . emit ( 'rebalance.error' , e ) ;
62+ }
63+ }
64+ }
65+
66+
2467/**
2568 * KafkaConsumer class for reading messages from Kafka
2669 *
@@ -52,26 +95,9 @@ function KafkaConsumer(conf, topicConf) {
5295
5396 // If rebalance is undefined we don't want any part of this
5497 if ( onRebalance && typeof onRebalance === 'boolean' ) {
55- conf . rebalance_cb = function ( err , assignment ) {
56- // Create the librdkafka error
57- err = LibrdKafkaError . create ( err ) ;
58- // Emit the event
59- self . emit ( 'rebalance' , err , assignment ) ;
60-
61- // That's it
62- try {
63- if ( err . code === - 175 /*ERR__ASSIGN_PARTITIONS*/ ) {
64- self . assign ( assignment ) ;
65- } else if ( err . code === - 174 /*ERR__REVOKE_PARTITIONS*/ ) {
66- self . unassign ( ) ;
67- }
68- } catch ( e ) {
69- // Ignore exceptions if we are not connected
70- if ( self . isConnected ( ) ) {
71- self . emit ( 'rebalance.error' , e ) ;
72- }
73- }
74- } ;
98+ conf . rebalance_cb = conf [ 'partition.assignment.strategy' ] === 'cooperative-sticky'
99+ ? cooperativeRebalanceCallback . bind ( this )
100+ : eagerRebalanceCallback . bind ( this ) ;
75101 } else if ( onRebalance && typeof onRebalance === 'function' ) {
76102 /*
77103 * Once this is opted in to, that's it. It's going to manually rebalance
0 commit comments