Skip to content

Commit fd43067

Browse files
committed
Pause, resume and assignment
1 parent 949bed8 commit fd43067

File tree

4 files changed

+99
-27
lines changed

4 files changed

+99
-27
lines changed

examples/kafkajs/consumer.js

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ const { Kafka } = require('../..').KafkaJS
33

44
async function consumerStart() {
55
let consumer;
6+
var stopped = false;
7+
68
const kafka = new Kafka({
79
brokers: ['<fill>'],
810
ssl: true,
@@ -12,24 +14,28 @@ async function consumerStart() {
1214
username: '<fill>',
1315
password: '<fill>',
1416
},
15-
rebalanceListener: {
16-
onPartitionsAssigned: async (assignment) => {
17-
console.log(`Assigned partitions ${JSON.stringify(assignment)}`);
18-
},
19-
onPartitionsRevoked: async (assignment) => {
20-
console.log(`Revoked partitions ${JSON.stringify(assignment)}`);
17+
});
18+
19+
consumer = kafka.consumer({
20+
groupId: 'test-group',
21+
rebalanceListener: {
22+
onPartitionsAssigned: async (assignment) => {
23+
console.log(`Assigned partitions ${JSON.stringify(assignment)}`);
24+
},
25+
onPartitionsRevoked: async (assignment) => {
26+
console.log(`Revoked partitions ${JSON.stringify(assignment)}`);
27+
if (!stopped) {
2128
await consumer.commitOffsets().catch((e) => {
2229
console.error(`Failed to commit ${e}`);
2330
})
2431
}
25-
},
26-
rdKafka: {
27-
'enable.auto.commit': false
2832
}
33+
},
34+
rdKafka: {
35+
'enable.auto.commit': false
36+
}
2937
});
3038

31-
consumer = kafka.consumer({ groupId: 'test-group' });
32-
3339
await consumer.connect();
3440
console.log("Connected successfully");
3541

@@ -39,6 +45,7 @@ async function consumerStart() {
3945
]
4046
})
4147

48+
// Batch consumer, commit and seek example
4249
var batch = 0;
4350
consumer.run({
4451
eachMessage: async ({ topic, partition, message }) => {
@@ -62,12 +69,48 @@ async function consumerStart() {
6269
},
6370
});
6471

72+
// Pause/Resume example
73+
const pauseResumeLoop = async () => {
74+
let paused = false;
75+
let ticks = 0;
76+
while (!stopped) {
77+
await new Promise((resolve) => setTimeout(resolve, 100));
78+
if (stopped)
79+
break;
80+
81+
ticks++;
82+
if (ticks == 200) {
83+
ticks = 0;
84+
const assignment = consumer.assignment();
85+
if (paused) {
86+
console.log(`Resuming partitions ${JSON.stringify(assignment)}`)
87+
consumer.resume(assignment);
88+
} else {
89+
console.log(`Pausing partitions ${JSON.stringify(assignment)}`);
90+
consumer.pause(assignment);
91+
}
92+
paused = !paused;
93+
}
94+
}
95+
}
96+
97+
if (consumer.assignment) {
98+
// KafkaJS doesn't have assignment()
99+
pauseResumeLoop()
100+
}
101+
102+
// Disconnect example
65103
const disconnect = () => {
66104
process.off('SIGINT', disconnect);
67105
process.off('SIGTERM', disconnect);
68-
consumer.disconnect().finally(() => {
69-
console.log("Disconnected successfully");
70-
});
106+
stopped = true;
107+
consumer.commitOffsets()
108+
.finally(() =>
109+
consumer.disconnect()
110+
)
111+
.finally(() =>
112+
console.log("Disconnected successfully")
113+
);
71114
}
72115
process.on('SIGINT', disconnect);
73116
process.on('SIGTERM', disconnect);

lib/kafkajs/_consumer.js

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,23 @@ class Consumer {
166166
}
167167
}
168168

169+
#flattenTopicPartitions(topics) {
170+
const ret = [];
171+
for (let topic of topics) {
172+
if (topic.partition != null)
173+
ret.push({
174+
topic: topic.topic,
175+
partition: topic.partition
176+
});
177+
else {
178+
for (let partition of topic.partitions) {
179+
ret.push({topic: topic.topic, partition});
180+
}
181+
}
182+
}
183+
return ret;
184+
}
185+
169186
async connect() {
170187
if (this.#state !== ConsumerState.INIT) {
171188
return Promise.reject('Connect has already been called elsewhere.');
@@ -243,15 +260,21 @@ class Consumer {
243260
}
244261

245262
pause(topics) {
246-
this.#notImplemented();
263+
topics = this.#flattenTopicPartitions(topics);
264+
this.#internalClient.pause(topics);
247265
}
248266

249267
paused() {
250268
this.#notImplemented();
251269
}
252270

271+
assignment() {
272+
return this.#flattenTopicPartitions(this.#internalClient.assignments());
273+
}
274+
253275
resume(topics) {
254-
this.#notImplemented();
276+
topics = this.#flattenTopicPartitions(topics);
277+
this.#internalClient.resume(topics);
255278
}
256279

257280
on(eventName, listener) {

lib/kafkajs/_kafka.js

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,27 @@ class Kafka {
99
this.#commonClientConfig = config;
1010
}
1111

12-
producer(config) {
13-
if (config === null || !(config instanceof Object)) {
14-
config = {};
12+
#mergeConfiguration(config) {
13+
let baseConfig = Object.assign({}, this.#commonClientConfig);
14+
config = Object.assign({}, config);
15+
16+
let rdKafka = baseConfig.rdKafka;
17+
Object.assign(baseConfig, config);
18+
if (rdKafka && config.rdKafka) {
19+
baseConfig.rdKafka = {
20+
...rdkafka,
21+
...config.rdKafka
1522
}
16-
17-
config = Object.assign(config, this.#commonClientConfig);
18-
return new Producer(config);
23+
}
24+
return baseConfig;
1925
}
2026

21-
consumer(config) {
22-
if (config === null || !(config instanceof Object)) {
23-
config = {};
27+
producer(config) {
28+
return new Producer(this.#mergeConfiguration(config));
2429
}
2530

26-
config = Object.assign(config, this.#commonClientConfig);
27-
return new Consumer(config);
31+
consumer(config) {
32+
return new Consumer(this.#mergeConfiguration(config));
2833
}
2934
}
3035

types/kafkajs.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ export type Consumer = {
417417
describeGroup(): Promise<GroupDescription>
418418
pause(topics: Array<{ topic: string; partitions?: number[] }>): void
419419
paused(): TopicPartitions[]
420+
assignment(): TopicPartitions[]
420421
resume(topics: Array<{ topic: string; partitions?: number[] }>): void
421422
on(
422423
eventName: ConsumerEvents['HEARTBEAT'],

0 commit comments

Comments
 (0)