Skip to content

Commit 510584b

Browse files
committed
Add OAUTHBEARER support to promisified API
1 parent 127567f commit 510584b

16 files changed

+163
-71
lines changed

MIGRATION.md

Lines changed: 22 additions & 22 deletions
Large diffs are not rendered by default.

index.d.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,8 +436,12 @@ export interface IAdminClient {
436436
disconnect(): void;
437437
}
438438

439+
export type EventHandlers = {
440+
[event_key: string]: (...args: any[]) => void;
441+
};
442+
439443
export abstract class AdminClient {
440-
static create(conf: GlobalConfig): IAdminClient;
444+
static create(conf: GlobalConfig, eventHandlers?: EventHandlers): IAdminClient;
441445
}
442446

443447
export type RdKafka = {

lib/admin.js

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,19 @@ util.inherits(AdminClient, Client);
5858
* This is a factory method because it immediately starts an
5959
* active handle with the brokers.
6060
*
61+
* @param {object} conf - Key value pairs to configure the admin client
62+
* @param {object} eventHandlers - optional key value pairs of event handlers to attach to the client
63+
*
6164
*/
62-
function createAdminClient(conf) {
65+
function createAdminClient(conf, eventHandlers) {
6366
var client = new AdminClient(conf);
6467

68+
if (eventHandlers && typeof eventHandlers === 'object') {
69+
for (const key in eventHandlers) {
70+
client.on(key, eventHandlers[key]);
71+
}
72+
}
73+
6574
// Wrap the error so we throw if it failed with some context
6675
LibrdKafkaError.wrap(client.connect(), true);
6776

@@ -127,6 +136,7 @@ AdminClient.prototype.connect = function () {
127136
this._client.configureCallbacks(true, this._cb_configs);
128137
LibrdKafkaError.wrap(this._client.connect(), true);
129138
this._isConnected = true;
139+
this.emit('ready', { name: this._client.name() });
130140
};
131141

132142
/**

lib/kafkajs/_admin.js

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ class Admin {
5050
*/
5151
#logger = new DefaultLogger();
5252

53+
/**
54+
* connectPromiseFunc is the set of promise functions used to resolve/reject the connect() promise.
55+
* @type {{resolve: Function, reject: Function}|{}}
56+
*/
57+
#connectPromiseFunc = null;
58+
5359
/**
5460
* @constructor
5561
* @param {import("../../types/kafkajs").AdminConstructorConfig} config
@@ -95,6 +101,29 @@ class Admin {
95101
return rdKafkaConfig;
96102
}
97103

104+
#readyCb() {
105+
if (this.#state !== AdminState.CONNECTING) {
106+
/* The connectPromiseFunc might not be set, so we throw such an error. It's a state error that we can't recover from. Probably a bug. */
107+
throw new error.KafkaJSError(`Ready callback called in invalid state ${this.#state}`, { code: error.ErrorCodes.ERR__STATE });
108+
}
109+
this.#state = AdminState.CONNECTED;
110+
111+
// Resolve the promise.
112+
this.#connectPromiseFunc['resolve']();
113+
}
114+
115+
/**
116+
* Callback for the event.error event, either fails the initial connect(), or logs the error.
117+
* @param {Error} err
118+
*/
119+
#errorCb(err) {
120+
if (this.#state === AdminState.CONNECTING) {
121+
this.#connectPromiseFunc['reject'](err);
122+
} else {
123+
this.#logger.error(err);
124+
}
125+
}
126+
98127
/**
99128
* Set up the client and connect to the bootstrap brokers.
100129
* @returns {Promise<void>} Resolves when connection is complete, rejects on error.
@@ -111,9 +140,11 @@ class Admin {
111140
return new Promise((resolve, reject) => {
112141
try {
113142
/* AdminClient creation is a synchronous operation for node-rdkafka */
114-
this.#internalClient = RdKafka.AdminClient.create(config);
115-
this.#state = AdminState.CONNECTED;
116-
resolve();
143+
this.#connectPromiseFunc = { resolve, reject };
144+
this.#internalClient = RdKafka.AdminClient.create(config, {
145+
'error': this.#errorCb.bind(this),
146+
'ready': this.#readyCb.bind(this),
147+
});
117148
} catch (err) {
118149
reject(createKafkaJsErrorFromLibRdKafkaError(err));
119150
}

lib/kafkajs/_common.js

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -168,12 +168,12 @@ const CompatibilityErrorMessages = Object.freeze({
168168
brokerString: () =>
169169
"The 'brokers' property must be an array of strings.\n" +
170170
"For example: ['kafka:9092', 'kafka2:9093']\n",
171-
saslOauthbearerUnsupported: () =>
172-
"SASL mechanism OAUTHBEARER is not supported yet.",
173171
saslUnsupportedMechanism: (mechanism) =>
174172
`SASL mechanism ${mechanism} is not supported.`,
175173
saslUsernamePasswordString: (mechanism) =>
176174
`The 'sasl.username' and 'sasl.password' properties must be strings and must be present for the mechanism ${mechanism}.`,
175+
saslOauthBearerProvider: () =>
176+
`The 'oauthBearerProvider' property must be a function.`,
177177
sslObject: () =>
178178
"The 'ssl' property must be a boolean. Any additional configuration must be provided outside the kafkaJS block.\n" +
179179
"Before: \n" +
@@ -280,27 +280,65 @@ function kafkaJSToRdKafkaConfig(config) {
280280
const mechanism = sasl.mechanism.toUpperCase();
281281

282282
if (mechanism === 'OAUTHBEARER') {
283-
throw new error.KafkaJSError(CompatibilityErrorMessages.saslOauthbearerUnsupported(), {
284-
code: error.ErrorCodes.ERR__NOT_IMPLEMENTED,
285-
});
286-
}
287-
288-
/* The mechanism must be PLAIN or SCRAM. */
289-
if (mechanism !== 'PLAIN' && !mechanism.startsWith('SCRAM')) {
283+
rdkafkaConfig["sasl.mechanism"] = mechanism;
284+
if (Object.hasOwn(sasl, "oauthBearerProvider")) {
285+
if (typeof sasl.oauthBearerProvider !== 'function') {
286+
throw new error.KafkaJSError(CompatibilityErrorMessages.saslOauthBearerProvider(), {
287+
code: error.ErrorCodes.ERR__INVALID_ARG,
288+
});
289+
}
290+
rdkafkaConfig['oauthbearer_token_refresh_cb'] = function (oauthbearer_config) {
291+
return sasl.oauthBearerProvider(oauthbearer_config)
292+
.then((token) => {
293+
if (!Object.hasOwn(token, 'value')) {
294+
throw new error.KafkaJSError('Token must have a value property.', {
295+
code: error.ErrorCodes.ERR__INVALID_ARG,
296+
});
297+
} else if (!Object.hasOwn(token, 'principal')) {
298+
throw new error.KafkaJSError('Token must have a principal property.', {
299+
code: error.ErrorCodes.ERR__INVALID_ARG,
300+
});
301+
} else if (!Object.hasOwn(token, 'lifetime')) {
302+
throw new error.KafkaJSError('Token must have a lifetime property.', {
303+
code: error.ErrorCodes.ERR__INVALID_ARG,
304+
});
305+
}
306+
307+
// Recast token into a value expected by node-rdkafka's callback.
308+
const setToken = {
309+
tokenValue: token.value,
310+
extensions: token.extensions,
311+
principal: token.principal,
312+
lifetime: token.lifetime,
313+
};
314+
return setToken;
315+
})
316+
.catch(err => {
317+
if (!(err instanceof Error)) {
318+
err = new Error(err);
319+
}
320+
throw err;
321+
});
322+
}
323+
}
324+
/* It's a valid case (unlike in KafkaJS) for oauthBearerProvider to be
325+
* null, because librdkafka provides an unsecured token provider for
326+
* non-prod usecases. So don't do anything in that case. */
327+
} else if (mechanism === 'PLAIN' || mechanism.startsWith('SCRAM')) {
328+
if (typeof sasl.username !== "string" || typeof sasl.password !== "string") {
329+
throw new error.KafkaJSError(CompatibilityErrorMessages.saslUsernamePasswordString(mechanism), {
330+
code: error.ErrorCodes.ERR__INVALID_ARG,
331+
});
332+
}
333+
rdkafkaConfig["sasl.mechanism"] = mechanism;
334+
rdkafkaConfig["sasl.username"] = sasl.username;
335+
rdkafkaConfig["sasl.password"] = sasl.password;
336+
} else {
290337
throw new error.KafkaJSError(CompatibilityErrorMessages.saslUnsupportedMechanism(mechanism), {
291338
code: error.ErrorCodes.ERR__INVALID_ARG,
292339
});
293340
}
294341

295-
if (typeof sasl.username !== "string" || typeof sasl.password !== "string") {
296-
throw new error.KafkaJSError(CompatibilityErrorMessages.saslUsernamePasswordString(mechanism), {
297-
code: error.ErrorCodes.ERR__INVALID_ARG,
298-
});
299-
}
300-
301-
rdkafkaConfig["sasl.mechanism"] = mechanism;
302-
rdkafkaConfig["sasl.username"] = sasl.username;
303-
rdkafkaConfig["sasl.password"] = sasl.password;
304342
withSASL = true;
305343
}
306344

lib/kafkajs/_consumer.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,7 @@ class Consumer {
790790
this.#state = ConsumerState.CONNECTING;
791791
this.#internalClient = new RdKafka.KafkaConsumer(rdKafkaConfig);
792792
this.#internalClient.on('ready', this.#readyCb.bind(this));
793+
this.#internalClient.on('error', this.#errorCb.bind(this));
793794
this.#internalClient.on('event.error', this.#errorCb.bind(this));
794795
this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger));
795796

lib/kafkajs/_producer.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ class Producer {
321321
this.#internalClient = new RdKafka.Producer(rdKafkaConfig);
322322
this.#internalClient.on('ready', this.#readyCb.bind(this));
323323
this.#internalClient.on('event.error', this.#errorCb.bind(this));
324+
this.#internalClient.on('error', this.#errorCb.bind(this));
324325
this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger));
325326

326327
return new Promise((resolve, reject) => {
@@ -614,10 +615,10 @@ class Producer {
614615
throw new error.KafkaJSError(CompatibilityErrorMessages.sendOptionsAcks('sendBatch'), { code: error.ErrorCodes.ERR__INVALID_ARG });
615616
}
616617
if (Object.hasOwn(sendOptions, 'timeout')) {
617-
throw new error.KafkaJSError(CompatibilityErrorMessages.sendOptionsTimeout('sendBatch'), { code: error.ErrorCodes.ERR__INVALID_ARG });
618+
throw new error.KafkaJSError(CompatibilityErrorMessages.sendOptionsTimeout('timeout'), { code: error.ErrorCodes.ERR__INVALID_ARG });
618619
}
619620
if (Object.hasOwn(sendOptions, 'compression')) {
620-
throw new error.KafkaJSError(CompatibilityErrorMessages.sendOptionsCompression('sendBatch'), { code: error.ErrorCodes.ERR__INVALID_ARG });
621+
throw new error.KafkaJSError(CompatibilityErrorMessages.sendOptionsCompression('compression'), { code: error.ErrorCodes.ERR__INVALID_ARG });
621622
}
622623

623624
if (sendOptions.topicMessages !== null && !Array.isArray(sendOptions.topicMessages)) {

lib/producer.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
* confluent-kafka-javascript - Node.js wrapper for RdKafka C/C++ library
33
*
44
* Copyright (c) 2016-2023 Blizzard Entertainment
5+
* (c) 2024 Confluent, Inc.
56
*
67
* This software may be modified and distributed under the terms
78
* of the MIT license. See the LICENSE.txt file for details.
@@ -70,7 +71,7 @@ function Producer(conf, topicConf) {
7071
delete conf.dr_cb;
7172
delete conf.dr_msg_cb;
7273

73-
// client is an initialized consumer object
74+
// client is an initialized producer object
7475
// @see NodeKafka::Producer::Init
7576
Client.call(this, conf, Kafka.Producer, topicConf);
7677

src/admin.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
105105

106106
// Inherited from NodeKafka::Connection
107107
Nan::SetPrototypeMethod(tpl, "configureCallbacks", NodeConfigureCallbacks);
108+
Nan::SetPrototypeMethod(tpl, "name", NodeName);
108109

109110
// Admin client operations
110111
Nan::SetPrototypeMethod(tpl, "createTopic", NodeCreateTopic);

src/connection.cc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,25 @@ RdKafka::TopicPartition* Connection::GetPartition(std::string &topic, int partit
116116
return RdKafka::TopicPartition::create(topic, partition);
117117
}
118118

119-
bool Connection::IsConnected() {
119+
bool Connection::IsConnected() const {
120120
return !m_is_closing && m_client != NULL;
121121
}
122122

123-
bool Connection::IsClosing() {
123+
bool Connection::IsClosing() const {
124124
return m_client != NULL && m_is_closing;
125125
}
126126

127127
RdKafka::Handle* Connection::GetClient() {
128128
return m_client;
129129
}
130130

131+
std::string Connection::Name() const {
132+
if (!IsConnected()) {
133+
return std::string("");
134+
}
135+
return std::string(m_client->name());
136+
}
137+
131138
Baton Connection::CreateTopic(std::string topic_name) {
132139
return CreateTopic(topic_name, NULL);
133140
}
@@ -629,4 +636,10 @@ NAN_METHOD(Connection::NodeSetOAuthBearerTokenFailure) {
629636
info.GetReturnValue().Set(Nan::Null());
630637
}
631638

639+
NAN_METHOD(Connection::NodeName) {
640+
Connection* obj = ObjectWrap::Unwrap<Connection>(info.This());
641+
std::string name = obj->Name();
642+
info.GetReturnValue().Set(Nan::New(name).ToLocalChecked());
643+
}
644+
632645
} // namespace NodeKafka

0 commit comments

Comments
 (0)