Skip to content

Commit 27245a7

Browse files
committed
Update Admin Client for config changes
1 parent 1b6d4bf commit 27245a7

File tree

5 files changed

+62
-23
lines changed

5 files changed

+62
-23
lines changed

examples/kafkajs/admin.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ const { Kafka } = require('../..').KafkaJS
33

44
async function adminStart() {
55
const kafka = new Kafka({
6-
brokers: ['<fill>'],
6+
kafkaJs: {
7+
brokers: ['localhost:9092'],
8+
}
79
});
810

911
const admin = kafka.admin();

lib/kafkajs/_admin.js

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
const RdKafka = require('../rdkafka');
2-
const { kafkaJSToRdKafkaConfig, createKafkaJsErrorFromLibRdKafkaError } = require('./_common');
2+
const { kafkaJSToRdKafkaConfig,
3+
createKafkaJsErrorFromLibRdKafkaError,
4+
DefaultLogger,
5+
checkAllowedKeys } = require('./_common');
36
const error = require('./_error');
47

58
/**
@@ -17,16 +20,16 @@ const AdminState = Object.freeze({
1720

1821
class Admin {
1922
/**
20-
* kJSConfig is the merged kafkaJS config object.
21-
* @type {import("../../types/kafkajs").AdminConfig & import("../../types/kafkajs").KafkaConfig}
23+
* The config supplied by the user.
24+
* @type {import("../../types/kafkajs").AdminConstructorConfig|null}
2225
*/
23-
#kJSConfig = null;
26+
#userConfig = null;
2427

2528
/**
26-
* rdKafkaConfig contains the config objects that will be passed to node-rdkafka.
27-
* @type {{globalConfig: import("../../types/config").GlobalConfig}|null}
29+
* The config realized after processing any compatibility options.
30+
* @type {import("../../types/config").GlobalConfig|null}
2831
*/
29-
#rdKafkaConfig = null;
32+
#internalConfig = null;
3033

3134
/**
3235
* internalClient is the node-rdkafka client used by the API.
@@ -39,25 +42,55 @@ class Admin {
3942
*/
4043
#state = AdminState.INIT;
4144

45+
/**
46+
* A logger for the admin client.
47+
* @type {import("../../types/kafkajs").Logger}
48+
*/
49+
#logger = new DefaultLogger();
50+
4251
/**
4352
* @constructor
44-
* @param {import("../../types/kafkajs").ProducerConfig} kJSConfig
53+
* @param {import("../../types/kafkajs").AdminConstructorConfig} config
4554
*/
46-
constructor(kJSConfig) {
47-
this.#kJSConfig = kJSConfig;
55+
constructor(config) {
56+
this.#userConfig = config;
4857
}
4958

50-
async #config() {
51-
if (!this.#rdKafkaConfig)
52-
this.#rdKafkaConfig = await this.#finalizedConfig();
53-
return this.#rdKafkaConfig;
59+
#config() {
60+
if (!this.#internalConfig)
61+
this.#internalConfig = this.#finalizedConfig();
62+
return this.#internalConfig;
5463
}
5564

56-
async #finalizedConfig() {
57-
/* This sets the common configuration options for the client. */
58-
const { globalConfig } = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
65+
#kafkaJSToAdminConfig(kjsConfig) {
66+
if (!kjsConfig || Object.keys(kjsConfig).length === 0) {
67+
return {};
68+
}
69+
70+
const disallowedKey = checkAllowedKeys('admin', kjsConfig);
71+
if (disallowedKey) {
72+
throw new error.KafkaJSError(CompatibilityErrorMessages.unsupportedKey(disallowedKey), { code: error.ErrorCodes.ERR__INVALID_ARG });
73+
}
74+
75+
const rdKafkaConfig = kafkaJSToRdKafkaConfig(kjsConfig);
76+
return rdKafkaConfig;
77+
}
78+
79+
#finalizedConfig() {
80+
let compatibleConfig = this.#kafkaJSToAdminConfig(this.#userConfig.kafkaJs);
81+
82+
/* Set the logger's level in case we're not in compatibility mode - just set it to DEBUG, the broadest
83+
* log level, as librdkafka will control the granularity. */
84+
if (!compatibleConfig || Object.keys(compatibleConfig).length === 0) {
85+
this.#logger.setLogLevel(logLevel.DEBUG);
86+
}
87+
88+
let rdKafkaConfig = Object.assign(compatibleConfig, this.#userConfig);
89+
90+
/* Delete properties which are already processed, or cannot be passed to node-rdkafka */
91+
delete rdKafkaConfig.kafkaJs;
5992

60-
return { globalConfig };
93+
return rdKafkaConfig;
6194
}
6295

6396
/**
@@ -71,12 +104,12 @@ class Admin {
71104

72105
this.#state = AdminState.CONNECTING;
73106

74-
const { globalConfig } = await this.#config();
107+
const config = this.#config();
75108

76109
return new Promise((resolve, reject) => {
77110
try {
78111
/* AdminClient creation is a synchronous operation for node-rdkafka */
79-
this.#internalClient = RdKafka.AdminClient.create(globalConfig);
112+
this.#internalClient = RdKafka.AdminClient.create(config);
80113
this.#state = AdminState.CONNECTED;
81114
resolve();
82115
} catch (err) {

lib/kafkajs/_common.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ const kafkaJsProperties = {
5353
'autoCommitInterval',
5454
'autoCommitThreshold',
5555
'rebalanceListener',
56-
]
56+
],
57+
admin: [],
5758
}
5859

5960
const logLevel = Object.freeze({

lib/kafkajs/_producer.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ class Producer {
8484
}
8585

8686
#config() {
87-
this.#internalClient
8887
if (!this.#internalConfig)
8988
this.#internalConfig = this.#finalizedConfig();
9089
return this.#internalConfig;

types/kafkajs.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ export interface AdminConfig {
176176
retry?: RetryOptions
177177
}
178178

179+
export interface AdminConstructorConfig extends GlobalConfig {
180+
kafkaJs?: AdminConfig;
181+
}
182+
179183
export interface ITopicConfig {
180184
topic: string
181185
numPartitions?: number

0 commit comments

Comments
 (0)