Skip to content

Commit 3e36c0b

Browse files
committed
Add cooperative rebalancing
1 parent 64cba18 commit 3e36c0b

16 files changed

+768
-136
lines changed

MIGRATION.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ producerRun().then(consumerRun).catch(console.error);
195195
| Property | Default Value | Comment |
196196
|--------------------------|-----------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
197197
| groupId | null | A mandatory string denoting consumer group name that this consumer is a part of. |
198-
| **partitionAssigners** | `[PartitionAssigners.roundRobin]` | Support for range and roundRobin assignors is provided. Custom assignors are not supported. Support for cooperative-sticky assignor is yet to be added |
198+
| **partitionAssigners** | `[PartitionAssigners.roundRobin]` | Support for range, roundRobin, and cooperativeSticky assignors is provided. Custom assignors are not supported. |
199199
| **partitionAssignors** | `[PartitionAssignors.roundRobin]` | Alias for `partitionAssigners` |
200200
| **rebalanceTimeout** | **300000** | The maximum allowed time for each member to join the group once a rebalance has begun. Note, that setting this value *also* changes the max poll interval. Message processing in `eachMessage` must not take more than this time. |
201201
| heartbeatInterval | 3000 | The expected time in milliseconds between heartbeats to the consumer coordinator. |

examples/consumer-flow.md

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
Connecting to a Kafka Consumer is easy. Let's try to connect to one using
2+
the Flowing implementation
3+
4+
```js
5+
/*
6+
* confluent-kafka-js - Node.js wrapper for RdKafka C/C++ library
7+
*
8+
* Copyright (c) 2016-2023 Blizzard Entertainment
9+
*
10+
* This software may be modified and distributed under the terms
11+
* of the MIT license. See the LICENSE.txt file for details.
12+
*/
13+
14+
var Kafka = require('../');
15+
16+
var consumer = new Kafka.KafkaConsumer({
17+
//'debug': 'all',
18+
'metadata.broker.list': 'localhost:9092',
19+
'group.id': 'confluent-kafka-js-consumer-flow-example',
20+
'enable.auto.commit': false
21+
});
22+
23+
var topicName = 'test';
24+
25+
//logging debug messages, if debug is enabled
26+
consumer.on('event.log', function(log) {
27+
console.log(log);
28+
});
29+
30+
//logging all errors
31+
consumer.on('event.error', function(err) {
32+
console.error('Error from consumer');
33+
console.error(err);
34+
});
35+
36+
//counter to commit offsets every numMessages are received
37+
var counter = 0;
38+
var numMessages = 5;
39+
40+
consumer.on('ready', function(arg) {
41+
console.log('consumer ready.' + JSON.stringify(arg));
42+
43+
consumer.subscribe([topicName]);
44+
//start consuming messages
45+
consumer.consume();
46+
});
47+
48+
49+
consumer.on('data', function(m) {
50+
counter++;
51+
52+
//committing offsets every numMessages
53+
if (counter % numMessages === 0) {
54+
console.log('calling commit');
55+
consumer.commit(m);
56+
}
57+
58+
// Output the actual message contents
59+
console.log(JSON.stringify(m));
60+
console.log(m.value.toString());
61+
62+
});
63+
64+
consumer.on('disconnected', function(arg) {
65+
console.log('consumer disconnected. ' + JSON.stringify(arg));
66+
});
67+
68+
//starting the consumer
69+
consumer.connect();
70+
71+
//stopping this example after 30s
72+
setTimeout(function() {
73+
consumer.disconnect();
74+
}, 30000);
75+
76+
```

examples/consumer.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
Connecting to a Kafka Consumer is easy. Let's try to connect to one using
2+
the Stream implementation
3+
4+
```js
5+
/*
6+
* confluent-kafka-js - Node.js wrapper for RdKafka C/C++ library
7+
*
8+
* Copyright (c) 2016-2023 Blizzard Entertainment
9+
*
10+
* This software may be modified and distributed under the terms
11+
* of the MIT license. See the LICENSE.txt file for details.
12+
*/
13+
14+
var Transform = require('stream').Transform;
15+
16+
var Kafka = require('../');
17+
18+
var stream = Kafka.KafkaConsumer.createReadStream({
19+
'metadata.broker.list': 'localhost:9092',
20+
'group.id': 'librd-test',
21+
'socket.keepalive.enable': true,
22+
'enable.auto.commit': false
23+
}, {}, {
24+
topics: 'test',
25+
waitInterval: 0,
26+
objectMode: false
27+
});
28+
29+
stream.on('error', function(err) {
30+
if (err) console.log(err);
31+
process.exit(1);
32+
});
33+
34+
stream
35+
.pipe(process.stdout);
36+
37+
stream.on('error', function(err) {
38+
console.log(err);
39+
process.exit(1);
40+
});
41+
42+
stream.consumer.on('event.error', function(err) {
43+
console.log(err);
44+
})
45+
```

examples/docker-alpine.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
When using docker to install `confluent-kafka-js`, you need to make sure you install appropriate library dependencies. Alpine linux is a lighter weight version of linux and does not come with the same base libraries as other distributions (like glibc).
2+
3+
You can see some of the differences here: https://linuxacademy.com/blog/cloud/alpine-linux-and-docker/
4+
5+
```dockerfile
6+
FROM node:14-alpine
7+
8+
RUN apk --no-cache add \
9+
bash \
10+
g++ \
11+
ca-certificates \
12+
lz4-dev \
13+
musl-dev \
14+
cyrus-sasl-dev \
15+
openssl-dev \
16+
make \
17+
python3
18+
19+
RUN apk add --no-cache --virtual .build-deps gcc zlib-dev libc-dev bsd-compat-headers py-setuptools bash
20+
21+
# Create app directory
22+
RUN mkdir -p /usr/local/app
23+
24+
# Move to the app directory
25+
WORKDIR /usr/local/app
26+
27+
# Install confluent-kafka-js
28+
RUN npm install confluent-kafka-js
29+
# Copy package.json first to check if an npm install is needed
30+
```

examples/high-level-producer.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
```js
2+
var Kafka = require('../');
3+
4+
var producer = new Kafka.HighLevelProducer({
5+
'metadata.broker.list': 'localhost:9092',
6+
});
7+
8+
// Throw away the keys
9+
producer.setKeySerializer(function(v) {
10+
return new Promise((resolve, reject) => {
11+
setTimeout(() => {
12+
resolve(null);
13+
}, 20);
14+
});
15+
});
16+
17+
// Take the message field
18+
producer.setValueSerializer(function(v) {
19+
return Buffer.from(v.message);
20+
});
21+
22+
producer.connect(null, function() {
23+
producer.produce('test', null, {
24+
message: 'alliance4ever',
25+
}, null, Date.now(), function(err, offset) {
26+
// The offset if our acknowledgement level allows us to receive delivery offsets
27+
setImmediate(function() {
28+
producer.disconnect();
29+
});
30+
});
31+
});
32+
```

examples/kafkajs/consumer.js

Lines changed: 80 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,23 @@
1-
// require('kafkajs') is replaced with require('confluent-kafka-javascript').KafkaJS.
2-
// Since this example is within the package itself, we use '../..', but code
3-
// will typically use 'confluent-kafka-javascript'.
4-
const { Kafka } = require('../..').KafkaJS;
1+
const { Kafka } = require('../..').KafkaJS
2+
//const { Kafka } = require('kafkajs')
53

64
async function consumerStart() {
7-
let consumer;
8-
var stopped = false;
5+
let consumer;
6+
var stopped = false;
97

10-
const kafka = new Kafka({
11-
kafkaJS: {
12-
brokers: ['localhost:9092'],
13-
ssl: true,
14-
connectionTimeout: 5000,
15-
sasl: {
16-
mechanism: 'plain',
17-
username: '<fill>',
18-
password: '<fill>',
19-
},
20-
}
21-
});
8+
const kafka = new Kafka({
9+
brokers: ['<fill>'],
10+
ssl: true,
11+
connectionTimeout: 5000,
12+
sasl: {
13+
mechanism: 'plain',
14+
username: '<fill>',
15+
password: '<fill>',
16+
},
17+
});
2218

23-
consumer = kafka.consumer({
24-
kafkaJS: {
19+
consumer = kafka.consumer({
2520
groupId: 'test-group',
26-
autoCommit: false,
2721
rebalanceListener: {
2822
onPartitionsAssigned: async (assignment) => {
2923
console.log(`Assigned partitions ${JSON.stringify(assignment)}`);
@@ -37,90 +31,89 @@ async function consumerStart() {
3731
}
3832
}
3933
},
40-
},
41-
42-
/* Properties from librdkafka can also be used */
43-
'auto.commit.interval.ms': 6000,
44-
});
45-
46-
await consumer.connect();
47-
console.log("Connected successfully");
34+
rdKafka: {
35+
'enable.auto.commit': false
36+
}
37+
});
4838

49-
await consumer.subscribe({
50-
topics: [
51-
"topic2"
52-
]
53-
})
39+
await consumer.connect();
40+
console.log("Connected successfully");
5441

55-
// Batch consumer, commit and seek example
56-
var batch = 0;
57-
consumer.run({
58-
eachMessage: async ({ topic, partition, message }) => {
59-
console.log({
60-
topic,
61-
partition,
62-
offset: message.offset,
63-
key: message.key?.toString(),
64-
value: message.value.toString(),
65-
})
42+
await consumer.subscribe({
43+
topics: [
44+
"topic2"
45+
]
46+
})
6647

67-
if (++batch % 100 == 0) {
68-
await consumer.seek({
48+
// Batch consumer, commit and seek example
49+
var batch = 0;
50+
consumer.run({
51+
eachMessage: async ({ topic, partition, message }) => {
52+
console.log({
6953
topic,
7054
partition,
71-
offset: -2
72-
});
73-
await consumer.commitOffsets();
74-
batch = 0;
75-
}
76-
},
77-
});
55+
offset: message.offset,
56+
key: message.key?.toString(),
57+
value: message.value.toString(),
58+
})
7859

79-
// Pause/Resume example
80-
const pauseResumeLoop = async () => {
81-
let paused = false;
82-
let ticks = 0;
83-
while (!stopped) {
84-
await new Promise((resolve) => setTimeout(resolve, 100));
85-
if (stopped)
86-
break;
60+
if (++batch % 100 == 0) {
61+
await consumer.seek({
62+
topic,
63+
partition,
64+
offset: -2
65+
});
66+
await consumer.commitOffsets();
67+
batch = 0;
68+
}
69+
},
70+
});
71+
72+
// Pause/Resume example
73+
const pauseResumeLoop = async () => {
74+
let paused = false;
75+
let ticks = 0;
76+
while (!stopped) {
77+
await new Promise((resolve) => setTimeout(resolve, 100));
78+
if (stopped)
79+
break;
8780

88-
ticks++;
89-
if (ticks == 200) {
90-
ticks = 0;
91-
const assignment = consumer.assignment();
92-
if (paused) {
93-
console.log(`Resuming partitions ${JSON.stringify(assignment)}`)
94-
consumer.resume(assignment);
95-
} else {
96-
console.log(`Pausing partitions ${JSON.stringify(assignment)}`);
97-
consumer.pause(assignment);
81+
ticks++;
82+
if (ticks == 200) {
83+
ticks = 0;
84+
const assignment = consumer.assignment();
85+
if (paused) {
86+
console.log(`Resuming partitions ${JSON.stringify(assignment)}`)
87+
consumer.resume(assignment);
88+
} else {
89+
console.log(`Pausing partitions ${JSON.stringify(assignment)}`);
90+
consumer.pause(assignment);
91+
}
92+
paused = !paused;
9893
}
99-
paused = !paused;
10094
}
10195
}
102-
}
10396

104-
if (consumer.assignment()) {
105-
// KafkaJS doesn't have assignment()
106-
pauseResumeLoop()
107-
}
97+
if (consumer.assignment) {
98+
// KafkaJS doesn't have assignment()
99+
pauseResumeLoop()
100+
}
108101

109-
// Disconnect example
110-
const disconnect = () => {
111-
process.off('SIGINT', disconnect);
112-
process.off('SIGTERM', disconnect);
113-
stopped = true;
114-
consumer.commitOffsets()
102+
// Disconnect example
103+
const disconnect = () => {
104+
process.off('SIGINT', disconnect);
105+
process.off('SIGTERM', disconnect);
106+
stopped = true;
107+
consumer.commitOffsets()
115108
.finally(() =>
116109
consumer.disconnect()
117110
)
118111
.finally(() =>
119112
console.log("Disconnected successfully")
120113
);
121-
}
122-
process.on('SIGINT', disconnect);
123-
process.on('SIGTERM', disconnect);
114+
}
115+
process.on('SIGINT', disconnect);
116+
process.on('SIGTERM', disconnect);
124117
}
125118

126119
consumerStart()

0 commit comments

Comments
 (0)