Skip to content

Commit b2f451d

Browse files
committed
rdKafka configuration
1 parent 9648fd6 commit b2f451d

File tree

4 files changed

+59
-27
lines changed

4 files changed

+59
-27
lines changed

examples/kafkajs/consumer.js

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,13 @@ async function consumerStart() {
1717
await consumer.connect();
1818
console.log("Connected successfully");
1919

20-
const disconnect = () =>
21-
consumer.disconnect().then(() => {
22-
console.log("Disconnected successfully");
23-
});
24-
process.on('SIGINT', disconnect);
25-
process.on('SIGTERM', disconnect);
26-
27-
consumer.subscribe({
20+
await consumer.subscribe({
2821
topics: [
2922
"topic2"
3023
]
3124
})
3225

33-
await consumer.run({
26+
consumer.run({
3427
eachMessage: async ({ topic, partition, message }) => {
3528
console.log({
3629
topic,
@@ -41,6 +34,13 @@ async function consumerStart() {
4134
})
4235
},
4336
});
37+
38+
const disconnect = () =>
39+
consumer.disconnect().then(() => {
40+
console.log("Disconnected successfully");
41+
});
42+
process.on('SIGINT', disconnect);
43+
process.on('SIGTERM', disconnect);
4444
}
4545

4646
consumerStart()

lib/kafkajs/_common.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
function kafkaJSToRdKafkaConfig(config) {
1+
async function kafkaJSToRdKafkaConfig(config) {
22
const ret = {
33
'allow.auto.create.topics': 'false'
44
}
@@ -24,6 +24,14 @@ function kafkaJSToRdKafkaConfig(config) {
2424
ret['security.protocol'] = 'sasl_plaintext';
2525
}
2626

27+
if (config.rdKafka) {
28+
if (config.rdKafka.constructor === Function) {
29+
await config.rdKafka(ret);
30+
} else {
31+
Object.assign(ret, config.rdKafka);
32+
}
33+
}
34+
2735
return ret;
2836
}
2937

lib/kafkajs/_consumer.js

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,28 @@ const ConsumerState = Object.freeze({
1010
});
1111

1212
class Consumer {
13-
#config = {}
13+
#kJSConfig = null
14+
#rdKafkaConfig = null;
1415
#internalClient = null;
1516
#connectPromiseFunc = {};
1617
#state = ConsumerState.INIT;
1718

18-
constructor(config) {
19-
this.#config = kafkaJSToRdKafkaConfig(config);
20-
if (config.groupId != null) {
21-
this.#config["group.id"] = config.groupId;
19+
constructor(kJSConfig) {
20+
this.#kJSConfig = kJSConfig;
21+
}
22+
23+
#config() {
24+
if (!this.#rdKafkaConfig)
25+
this.#rdKafkaConfig = this.#finalizedConfig();
26+
return this.#rdKafkaConfig;
27+
}
28+
29+
async #finalizedConfig() {
30+
const config = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
31+
if (this.#kJSConfig.groupId != null) {
32+
config["group.id"] = this.#kJSConfig.groupId;
2233
}
34+
return config;
2335
}
2436

2537
#readyCb(arg) {
@@ -98,13 +110,13 @@ class Consumer {
98110
});
99111
}
100112

101-
connect() {
113+
async connect() {
102114
if (this.#state !== ConsumerState.INIT) {
103115
return Promise.reject("Connect has already been called elsewhere.");
104116
}
105117

106118
this.#state = ConsumerState.CONNECTING;
107-
this.#internalClient = new RdKafka.KafkaConsumer(this.#config);
119+
this.#internalClient = new RdKafka.KafkaConsumer(await this.#config());
108120
this.#internalClient.on('ready', this.#readyCb.bind(this));
109121
this.#internalClient.on('event.error', this.#errorCb.bind(this));
110122
this.#internalClient.on('event.log', console.log);
@@ -117,11 +129,11 @@ class Consumer {
117129
});
118130
}
119131

120-
subscribe(subscription) {
132+
async subscribe(subscription) {
121133
this.#internalClient.subscribe(subscription.topics);
122134
}
123135

124-
stop() {
136+
async stop() {
125137
this.#notImplemented();
126138
}
127139

@@ -140,15 +152,15 @@ class Consumer {
140152
}
141153
}
142154

143-
commitOffsets(topicPartitions) {
155+
async commitOffsets(topicPartitions) {
144156
this.#notImplemented();
145157
}
146158

147159
seek(topicPartitionOffset) {
148160
this.#notImplemented();
149161
}
150162

151-
describeGroup() {
163+
async describeGroup() {
152164
this.#notImplemented();
153165
}
154166

lib/kafkajs/_producer.js

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,26 @@ const ProducerState = Object.freeze({
1010
});
1111

1212
class Producer {
13-
#config = {}
13+
#kJSConfig = null
14+
#rdKafkaConfig = null;
1415
#internalClient = null;
1516
#connectPromiseFunc = {};
1617
#state = ProducerState.INIT;
1718

18-
constructor(config) {
19-
this.#config = kafkaJSToRdKafkaConfig(config);
20-
this.#config.dr_cb = 'true';
19+
constructor(kJSConfig) {
20+
this.#kJSConfig = kJSConfig;
21+
}
22+
23+
#config() {
24+
if (!this.#rdKafkaConfig)
25+
this.#rdKafkaConfig = this.#finalizedConfig();
26+
return this.#rdKafkaConfig;
27+
}
28+
29+
async #finalizedConfig() {
30+
const config = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
31+
config.dr_cb = 'true';
32+
return config;
2133
}
2234

2335
#readyCb(arg) {
@@ -76,13 +88,13 @@ class Producer {
7688
}
7789
}
7890

79-
connect() {
91+
async connect() {
8092
if (this.#state !== ProducerState.INIT) {
8193
return Promise.reject("Connect has already been called elsewhere.");
8294
}
8395

8496
this.#state = ProducerState.CONNECTING;
85-
this.#internalClient = new RdKafka.Producer(this.#config);
97+
this.#internalClient = new RdKafka.Producer(await this.#config());
8698
this.#internalClient.on('ready', this.#readyCb.bind(this));
8799
this.#internalClient.on('event.error', this.#errorCb.bind(this));
88100
this.#internalClient.on('event.log', console.log);

0 commit comments

Comments
 (0)