Skip to content

Commit 7837ac4

Browse files
committed
Add experimental admin client and example
1 parent 6a3f5ce commit 7837ac4

File tree

6 files changed

+268
-1
lines changed

6 files changed

+268
-1
lines changed

MIGRATION.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,4 +290,12 @@
290290
* `paused()` is not (YET) supported.
291291
* Custom partition assignors are not supported.
292292

293+
### Admin Client
294+
295+
* The admin-client is currently experimental, and only has support for a limited subset of methods. The API is subject to change.
296+
The methods supported are:
297+
* The `createTopics` method does not yet support the `validateOnly` or `waitForLeaders` properties, and the per-topic configuration
298+
does not support `replicaAssignment`.
299+
* The `deleteTopics` method is fully supported.
300+
293301
## node-rdkafka

examples/kafkajs/admin.js

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
const { Kafka } = require('../..').KafkaJS
2+
//const { Kafka } = require('kafkajs')
3+
4+
async function adminStart() {
5+
const kafka = new Kafka({
6+
brokers: ['<fill>'],
7+
});
8+
9+
const admin = kafka.admin();
10+
await admin.connect();
11+
12+
await admin.createTopics({
13+
topics: [
14+
{
15+
topic: 'test-topic',
16+
numPartitions: 3,
17+
replicationFactor: 1,
18+
}
19+
]
20+
}).then(() => {
21+
console.log("Topic created successfully");
22+
}).catch((err) => {
23+
console.log("Topic creation failed", err);
24+
});
25+
26+
await admin.deleteTopics({
27+
topics: ['test-topic'],
28+
timeout: 5600,
29+
}).then(() => {
30+
console.log("Topic deleted successfully");
31+
}).catch((err) => {
32+
console.log("Topic deletion failed", err);
33+
});
34+
35+
await admin.disconnect();
36+
}
37+
38+
adminStart();

lib/kafkajs/_admin.js

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
const RdKafka = require('../rdkafka');
2+
const { kafkaJSToRdKafkaConfig, createKafkaJsErrorFromLibRdKafkaError } = require('./_common');
3+
const error = require('./_error');
4+
5+
/**
6+
* NOTE: The Admin client is currently in an experimental state with many
7+
* features missing or incomplete, and the API is subject to change.
8+
*/
9+
10+
const AdminState = Object.freeze({
11+
INIT: 0,
12+
CONNECTING: 1,
13+
CONNECTED: 4,
14+
DISCONNECTING: 5,
15+
DISCONNECTED: 6,
16+
});
17+
18+
class Admin {
19+
/**
20+
* kJSConfig is the merged kafkaJS config object.
21+
* @type {import("../../types/kafkajs").AdminConfig & import("../../types/kafkajs").KafkaConfig}
22+
*/
23+
#kJSConfig = null;
24+
25+
/**
26+
* rdKafkaConfig contains the config objects that will be passed to node-rdkafka.
27+
* @type {{globalConfig: import("../../types/config").GlobalConfig}|null}
28+
*/
29+
#rdKafkaConfig = null;
30+
31+
/**
32+
* internalClient is the node-rdkafka client used by the API.
33+
* @type {import("../rdkafka").AdminClient|null}
34+
*/
35+
#internalClient = null;
36+
/**
37+
* state is the current state of the admin client.
38+
* @type {AdminState}
39+
*/
40+
#state = AdminState.INIT;
41+
42+
/**
43+
* @constructor
44+
* @param {import("../../types/kafkajs").ProducerConfig} kJSConfig
45+
*/
46+
constructor(kJSConfig) {
47+
this.#kJSConfig = kJSConfig;
48+
}
49+
50+
async #config() {
51+
if (!this.#rdKafkaConfig)
52+
this.#rdKafkaConfig = await this.#finalizedConfig();
53+
return this.#rdKafkaConfig;
54+
}
55+
56+
async #finalizedConfig() {
57+
/* This sets the common configuration options for the client. */
58+
const { globalConfig } = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
59+
60+
return { globalConfig };
61+
}
62+
63+
/**
64+
* Set up the client and connect to the bootstrap brokers.
65+
* @returns {Promise<void>} Resolves when connection is complete, rejects on error.
66+
*/
67+
async connect() {
68+
if (this.#state !== AdminState.INIT) {
69+
throw new error.KafkaJSError("Connect has already been called elsewhere.", { code: error.ErrorCodes.ERR__STATE });
70+
}
71+
72+
this.#state = AdminState.CONNECTING;
73+
74+
const { globalConfig } = await this.#config();
75+
76+
return new Promise((resolve, reject) => {
77+
try {
78+
/* AdminClient creation is a synchronous operation for node-rdkafka */
79+
this.#internalClient = RdKafka.AdminClient.create(globalConfig);
80+
this.#state = AdminState.CONNECTED;
81+
resolve();
82+
} catch (err) {
83+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
84+
}
85+
});
86+
}
87+
88+
/**
89+
* Disconnect from the brokers, clean-up and tear down the client.
90+
* @returns {Promise<void>} Resolves when disconnect is complete, rejects on error.
91+
*/
92+
async disconnect() {
93+
if (this.#state >= AdminState.DISCONNECTING) {
94+
return;
95+
}
96+
97+
this.#state = AdminState.DISCONNECTING;
98+
return new Promise((resolve, reject) => {
99+
try {
100+
/* AdminClient disconnect for node-rdkakfa is synchronous. */
101+
this.#internalClient.disconnect();
102+
this.#state = AdminState.DISCONNECTED;
103+
resolve();
104+
} catch (err) {
105+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
106+
}
107+
});
108+
}
109+
110+
111+
/**
112+
* Converts a topic configuration object from kafkaJS to a format suitable for node-rdkafka.
113+
* @param {import("../../types/kafkajs").ITopicConfig} topic
114+
* @returns {import("../../index").NewTopic}
115+
*/
116+
#topicConfigToRdKafka(topic) {
117+
let topicConfig = { topic: topic.topic };
118+
topicConfig.topic = topic.topic;
119+
topicConfig.num_partitions = topic.numPartitions ?? -1;
120+
topicConfig.replication_factor = topic.replicationFactor ?? -1;
121+
122+
if (Object.hasOwn(topic, "replicaAssignment")) {
123+
throw new error.KafkaJSError("replicaAssignment is not yet implemented.", { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
124+
}
125+
126+
topicConfig.config = {};
127+
topic.configEntries = topic.configEntries ?? [];
128+
for (const configEntry of topic.configEntries) {
129+
topicConfig.config[configEntry.name] = configEntry.value;
130+
}
131+
132+
return topicConfig;
133+
}
134+
135+
/**
136+
* Create topics with the given configuration.
137+
* @param {{ validateOnly?: boolean, waitForLeaders?: boolean, timeout?: number, topics: import("../../types/kafkajs").ITopicConfig[] }} options
138+
* @returns {Promise<void>} Resolves when the topics are created, rejects on error.
139+
*/
140+
async createTopics(options) {
141+
if (this.#state !== AdminState.CONNECTED) {
142+
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
143+
}
144+
145+
if (Object.hasOwn(options, "validateOnly")) {
146+
throw new error.KafkaJSError("validateOnly is not yet implemented.", { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
147+
}
148+
149+
if (Object.hasOwn(options, "waitForLeaders")) {
150+
throw new error.KafkaJSError("waitForLeaders is not yet implemented.", { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
151+
}
152+
153+
/* Convert each topic to a format suitable for node-rdkafka, and dispatch the call. */
154+
const ret =
155+
options.topics
156+
.map(this.#topicConfigToRdKafka)
157+
.map(topicConfig => new Promise((resolve, reject) => {
158+
this.#internalClient.createTopic(topicConfig, options.timeout ?? 5000, (err) => {
159+
if (err) {
160+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
161+
} else {
162+
resolve();
163+
}
164+
});
165+
}));
166+
167+
return Promise.all(ret);
168+
}
169+
170+
/**
171+
* Deletes given topics.
172+
* @param {{topics: string[], timeout?: number}} options
173+
* @returns {Promise<void>} Resolves when the topics are deleted, rejects on error.
174+
*/
175+
async deleteTopics(options) {
176+
if (this.#state !== AdminState.CONNECTED) {
177+
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
178+
}
179+
180+
return Promise.all(
181+
options.topics.map(topic => new Promise((resolve, reject) => {
182+
this.#internalClient.deleteTopic(topic, options.timeout ?? 5000, err => {
183+
if (err) {
184+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
185+
} else {
186+
resolve();
187+
}
188+
});
189+
}))
190+
);
191+
}
192+
193+
}
194+
195+
module.exports = { Admin }

lib/kafkajs/_common.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ function createKafkaJsErrorFromLibRdKafkaError(librdKafkaError) {
201201
} else if (properties.code === error.ErrorCodes.ERR__NOT_IMPLEMENTED) {
202202
err = new error.KafkaJSNotImplemented(librdKafkaError, properties);
203203
} else if (properties.code === error.ErrorCodes.ERR__TIMED_OUT) {
204-
err = new error.KafkaJSTimedOut(librdKafkaError, properties);
204+
err = new error.KafkaJSTimeout(librdKafkaError, properties);
205205
} else if (properties.code === error.ErrorCodes.ERR__ALL_BROKERS_DOWN) {
206206
err = new error.KafkaJSNoBrokerAvailableError(librdKafkaError, properties);
207207
} else if (properties.code === error.ErrorCodes.ERR__TRANSPORT) {

lib/kafkajs/_kafka.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const { Producer } = require('./_producer');
22
const { Consumer, PartitionAssigners } = require('./_consumer');
3+
const { Admin } = require('./_admin');
34
const error = require('./_error');
45
const { logLevel } = require('./_common');
56

@@ -52,6 +53,10 @@ class Kafka {
5253
consumer(config) {
5354
return new Consumer(this.#mergeConfiguration(config));
5455
}
56+
57+
admin(config) {
58+
return new Admin(this.#mergeConfiguration(config));
59+
}
5560
}
5661

5762
module.exports = { Kafka, ...error, logLevel, PartitionAssigners, PartitionAssignors: PartitionAssigners };

types/kafkajs.d.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,27 @@ export type ConsumerEvents = {
149149
REQUEST_QUEUE_SIZE: 'consumer.network.request_queue_size'
150150
}
151151

152+
export interface AdminConfig {
153+
retry?: RetryOptions
154+
}
155+
156+
export interface ITopicConfig {
157+
topic: string
158+
numPartitions?: number
159+
replicationFactor?: number
160+
replicaAssignment?: ReplicaAssignment[]
161+
configEntries?: IResourceConfigEntry[]
162+
}
163+
164+
export interface ReplicaAssignment {
165+
partition: number
166+
replicas: Array<number>
167+
}
168+
169+
export interface IResourceConfigEntry {
170+
name: string
171+
value: string
172+
}
152173

153174
export enum logLevel {
154175
NOTHING = 0,

0 commit comments

Comments
 (0)