Skip to content

Commit 40119c6

Browse files
committed
Add setSaslCredentials for producer
1 parent 6ad531d commit 40119c6

File tree

8 files changed

+121
-12
lines changed

8 files changed

+121
-12
lines changed

index.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ export abstract class Client<Events extends string> extends EventEmitter {
198198
queryWatermarkOffsets(topic: string, partition: number, timeout: number, cb?: (err: LibrdKafkaError, offsets: WatermarkOffsets) => any): any;
199199
queryWatermarkOffsets(topic: string, partition: number, cb?: (err: LibrdKafkaError, offsets: WatermarkOffsets) => any): any;
200200

201+
setSaslCredentials(username: string, password: string): void;
202+
201203
on<E extends Events>(event: E, listener: EventListener<E>): this;
202204
once<E extends Events>(event: E, listener: EventListener<E>): this;
203205
}

lib/client.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,21 @@ Client.prototype.offsetsForTimes = function(toppars, timeout, cb) {
466466
});
467467
};
468468

469+
/**
470+
* Change SASL credentials to be sent on the next authentication attempt.
471+
*
472+
* @param {string} username
473+
* @param {string} password
474+
* @note Only applicable if SASL authentication is being used.
475+
*/
476+
Client.prototype.setSaslCredentials = function(username, password) {
477+
if (!this.isConnected()) {
478+
return;
479+
}
480+
481+
this._client.setSaslCredentials(username, password);
482+
};
483+
469484
/**
470485
* Wrap a potential RdKafka error.
471486
*

lib/kafkajs/_producer.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,40 @@ class Producer {
645645
logger() {
646646
return this.#logger;
647647
}
648+
649+
/**
650+
* Change SASL credentials to be sent on the next authentication attempt.
651+
*
652+
* @param {string} args.username
653+
* @param {string} args.password
654+
* @note Only applicable if SASL authentication is being used.
655+
*/
656+
setSaslCredentials(args = {}) {
657+
if (!Object.hasOwn(args, 'username')) {
658+
throw new error.KafkaJSError("username must be set for setSaslCredentials", { code: error.ErrorCodes.ERR__INVALID_ARG });
659+
}
660+
661+
if (!Object.hasOwn(args, 'password')) {
662+
throw new error.KafkaJSError("password must be set for setSaslCredentials", { code: error.ErrorCodes.ERR__INVALID_ARG });
663+
}
664+
665+
/**
666+
* In case we've not started connecting yet, just modify the configuration for
667+
* the first connection attempt.
668+
*/
669+
if (this.#state < ProducerState.CONNECTING) {
670+
this.#userConfig['sasl.username'] = args.username;
671+
this.#userConfig['sasl.password'] = args.password;
672+
if (Object.hasOwn(this.#userConfig, 'kafkaJS') && Object.hasOwn(this.#userConfig.kafkaJS, 'sasl')) {
673+
this.#userConfig.kafkaJS.sasl.username = args.username;
674+
this.#userConfig.kafkaJS.sasl.password = args.password;
675+
}
676+
return;
677+
}
678+
679+
680+
this.#internalClient.setSaslCredentials(args.username, args.password);
681+
}
648682
}
649683

650684
module.exports = { Producer, CompressionTypes };

src/admin.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
8888

8989
Nan::SetPrototypeMethod(tpl, "connect", NodeConnect);
9090
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
91+
Nan::SetPrototypeMethod(tpl, "setSaslCredentials", NodeSetSaslCredentials);
9192

9293
constructor.Reset(
9394
(tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked());

src/connection.cc

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,18 @@ Connection::~Connection() {
6767
}
6868
}
6969

70+
Baton Connection::rdkafkaErrorToBaton(RdKafka::Error* error) {
71+
if ( NULL == error) {
72+
return Baton(RdKafka::ERR_NO_ERROR);
73+
}
74+
else {
75+
Baton result(error->code(), error->str(), error->is_fatal(),
76+
error->is_retriable(), error->txn_requires_abort());
77+
delete error;
78+
return result;
79+
}
80+
}
81+
7082
RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) {
7183
return RdKafka::TopicPartition::create(topic, RdKafka::Topic::PARTITION_UA);
7284
}
@@ -215,6 +227,25 @@ Baton Connection::GetMetadata(
215227
}
216228
}
217229

230+
Baton Connection::SetSaslCredentials(
231+
std::string username, std::string password) {
232+
RdKafka::Error *error;
233+
234+
if (IsConnected()) {
235+
scoped_shared_read_lock lock(m_connection_lock);
236+
if (IsConnected()) {
237+
// Always send true - we
238+
error = m_client->sasl_set_credentials(username, password);
239+
} else {
240+
return Baton(RdKafka::ERR__STATE);
241+
}
242+
} else {
243+
return Baton(RdKafka::ERR__STATE);
244+
}
245+
246+
return rdkafkaErrorToBaton(error);
247+
}
248+
218249
void Connection::ConfigureCallback(const std::string &string_key, const v8::Local<v8::Function> &cb, bool add) {
219250
if (string_key.compare("event_cb") == 0) {
220251
if (add) {
@@ -337,6 +368,39 @@ NAN_METHOD(Connection::NodeQueryWatermarkOffsets) {
337368
info.GetReturnValue().Set(Nan::Null());
338369
}
339370

371+
NAN_METHOD(Connection::NodeSetSaslCredentials) {
372+
if (!info[0]->IsString()) {
373+
Nan::ThrowError("1st parameter must be a username string");
374+
return;
375+
}
376+
377+
if (!info[1]->IsString()) {
378+
Nan::ThrowError("2nd parameter must be a password string");
379+
return;
380+
}
381+
382+
// Get string pointer for the username
383+
Nan::Utf8String usernameUTF8(Nan::To<v8::String>(info[0]).ToLocalChecked());
384+
// The first parameter is the username
385+
std::string username(*usernameUTF8);
386+
387+
// Get string pointer for the password
388+
Nan::Utf8String passwordUTF8(Nan::To<v8::String>(info[1]).ToLocalChecked());
389+
// The first parameter is the password
390+
std::string password(*passwordUTF8);
391+
392+
Connection* obj = ObjectWrap::Unwrap<Connection>(info.This());
393+
Baton b = obj->SetSaslCredentials(username, password);
394+
395+
if (b.err() != RdKafka::ERR_NO_ERROR) {
396+
v8::Local<v8::Value> errorObject = b.ToObject();
397+
return Nan::ThrowError(errorObject);
398+
}
399+
400+
info.GetReturnValue().Set(Nan::Null());
401+
}
402+
403+
340404
// Node methods
341405
NAN_METHOD(Connection::NodeConfigureCallbacks) {
342406
Nan::HandleScope scope;

src/connection.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class Connection : public Nan::ObjectWrap {
5555
Baton GetMetadata(bool, std::string, int);
5656
Baton QueryWatermarkOffsets(std::string, int32_t, int64_t*, int64_t*, int);
5757
Baton OffsetsForTimes(std::vector<RdKafka::TopicPartition*> &, int);
58+
Baton SetSaslCredentials(std::string, std::string);
5859

5960
RdKafka::Handle* GetClient();
6061

@@ -74,6 +75,7 @@ class Connection : public Nan::ObjectWrap {
7475

7576
static Nan::Persistent<v8::Function> constructor;
7677
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
78+
Baton rdkafkaErrorToBaton(RdKafka::Error* error);
7779

7880
bool m_has_been_disconnected;
7981
bool m_is_closing;
@@ -90,6 +92,7 @@ class Connection : public Nan::ObjectWrap {
9092
static NAN_METHOD(NodeGetMetadata);
9193
static NAN_METHOD(NodeQueryWatermarkOffsets);
9294
static NAN_METHOD(NodeOffsetsForTimes);
95+
static NAN_METHOD(NodeSetSaslCredentials);
9396
};
9497

9598
} // namespace NodeKafka

src/kafka-consumer.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
504504
Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT
505505
Nan::SetPrototypeMethod(tpl, "offsetsForTimes", NodeOffsetsForTimes);
506506
Nan::SetPrototypeMethod(tpl, "getWatermarkOffsets", NodeGetWatermarkOffsets);
507+
Nan::SetPrototypeMethod(tpl, "setSaslCredentials", NodeSetSaslCredentials);
507508

508509
/*
509510
* @brief Methods exposed to do with message retrieval

src/producer.cc

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ void Producer::Init(v8::Local<v8::Object> exports) {
7171
Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata);
7272
Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT
7373
Nan::SetPrototypeMethod(tpl, "poll", NodePoll);
74+
Nan::SetPrototypeMethod(tpl, "setSaslCredentials", NodeSetSaslCredentials);
7475

7576
/*
7677
* @brief Methods exposed to do with message production
@@ -351,18 +352,6 @@ void Producer::ConfigureCallback(const std::string &string_key, const v8::Local<
351352
}
352353
}
353354

354-
Baton rdkafkaErrorToBaton(RdKafka::Error* error) {
355-
if ( NULL == error) {
356-
return Baton(RdKafka::ERR_NO_ERROR);
357-
}
358-
else {
359-
Baton result(error->code(), error->str(), error->is_fatal(),
360-
error->is_retriable(), error->txn_requires_abort());
361-
delete error;
362-
return result;
363-
}
364-
}
365-
366355
Baton Producer::InitTransactions(int32_t timeout_ms) {
367356
if (!IsConnected()) {
368357
return Baton(RdKafka::ERR__STATE);

0 commit comments

Comments
 (0)