@@ -73,6 +73,7 @@ struct connection_t {
7373 const boost::optional<std::string> ca_location;
7474 const std::string user;
7575 const std::string password;
76+ const boost::optional<std::string> mechanism;
7677 utime_t timestamp = ceph_clock_now();
7778
7879 // cleanup of all internal connection resource
@@ -107,8 +108,8 @@ struct connection_t {
107108 // ctor for setting immutable values
108109 connection_t (CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl,
109110 const boost::optional<const std::string&>& _ca_location,
110- const std::string& _user, const std::string& _password) :
111- cct (_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {}
111+ const std::string& _user, const std::string& _password, const boost::optional< const std::string&>& _mechanism ) :
112+ cct (_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password), mechanism(_mechanism) {}
112113
113114 // dtor also destroys the internals
114115 ~connection_t () {
@@ -124,6 +125,7 @@ std::string to_string(const connection_ptr_t& conn) {
124125 str += " \n Broker: " + conn->broker ;
125126 str += conn->use_ssl ? " \n Use SSL" : " " ;
126127 str += conn->ca_location ? " \n CA Location: " + *(conn->ca_location ) : " " ;
128+ str += conn->mechanism ? " \n SASL Mechanism: " + *(conn->mechanism ) : " " ;
127129 return str;
128130}
129131// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
@@ -220,10 +222,18 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) {
220222 if (!conn->user .empty ()) {
221223 // use SSL+SASL
222224 if (rd_kafka_conf_set (conn->temp_conf , " security.protocol" , " SASL_SSL" , errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK ||
223- rd_kafka_conf_set (conn->temp_conf , " sasl.mechanism" , " PLAIN" , errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK ||
224225 rd_kafka_conf_set (conn->temp_conf , " sasl.username" , conn->user .c_str (), errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK ||
225226 rd_kafka_conf_set (conn->temp_conf , " sasl.password" , conn->password .c_str (), errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
226227 ldout (conn->cct , 20 ) << " Kafka connect: successfully configured SSL+SASL security" << dendl;
228+
229+ if (conn->mechanism ) {
230+ if (rd_kafka_conf_set (conn->temp_conf , " sasl.mechanism" , conn->mechanism ->c_str (), errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
231+ ldout (conn->cct , 20 ) << " Kafka connect: successfully configured SASL mechanism" << dendl;
232+ } else {
233+ if (rd_kafka_conf_set (conn->temp_conf , " sasl.mechanism" , " PLAIN" , errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
234+ ldout (conn->cct , 20 ) << " Kafka connect: using default SASL mechanism" << dendl;
235+ }
236+
227237 } else {
228238 // use only SSL
229239 if (rd_kafka_conf_set (conn->temp_conf , " security.protocol" , " SSL" , errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
@@ -239,6 +249,20 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) {
239249 // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
240250
241251 ldout (conn->cct , 20 ) << " Kafka connect: successfully configured security" << dendl;
252+ } else if (!conn->user .empty ()) {
253+ // use SASL+PLAINTEXT
254+ if (rd_kafka_conf_set (conn->temp_conf , " security.protocol" , " SASL_PLAINTEXT" , errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK ||
255+ rd_kafka_conf_set (conn->temp_conf , " sasl.username" , conn->user .c_str (), errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK ||
256+ rd_kafka_conf_set (conn->temp_conf , " sasl.password" , conn->password .c_str (), errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
257+ ldout (conn->cct , 20 ) << " Kafka connect: successfully configured SASL_PLAINTEXT" << dendl;
258+
259+ if (conn->mechanism ) {
260+ if (rd_kafka_conf_set (conn->temp_conf , " sasl.mechanism" , conn->mechanism ->c_str (), errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
261+ ldout (conn->cct , 20 ) << " Kafka connect: successfully configured SASL mechanism" << dendl;
262+ } else {
263+ if (rd_kafka_conf_set (conn->temp_conf , " sasl.mechanism" , " PLAIN" , errstr, sizeof (errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
264+ ldout (conn->cct , 20 ) << " Kafka connect: using default SASL mechanism" << dendl;
265+ }
242266 }
243267
244268 // set the global callback for delivery success/fail
@@ -286,9 +310,10 @@ connection_ptr_t create_new_connection(const std::string& broker, CephContext* c
286310 bool verify_ssl,
287311 boost::optional<const std::string&> ca_location,
288312 const std::string& user,
289- const std::string& password) {
313+ const std::string& password,
314+ boost::optional<const std::string&> mechanism) {
290315 // create connection state
291- connection_ptr_t conn (new connection_t (cct, broker, use_ssl, verify_ssl, ca_location, user, password));
316+ connection_ptr_t conn (new connection_t (cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism ));
292317 return create_connection (conn);
293318}
294319
@@ -529,7 +554,8 @@ class Manager {
529554 connection_ptr_t connect (const std::string& url,
530555 bool use_ssl,
531556 bool verify_ssl,
532- boost::optional<const std::string&> ca_location) {
557+ boost::optional<const std::string&> ca_location,
558+ boost::optional<const std::string&> mechanism) {
533559 if (stopped) {
534560 // TODO: increment counter
535561 ldout (cct, 1 ) << " Kafka connect: manager is stopped" << dendl;
@@ -548,7 +574,7 @@ class Manager {
548574 // this should be validated by the regex in parse_url()
549575 ceph_assert (user.empty () == password.empty ());
550576
551- if (!user.empty () && !use_ssl) {
577+ if (!user.empty () && !use_ssl && ! g_conf (). get_val < bool >( " rgw_allow_notification_secrets_in_cleartext " ) ) {
552578 ldout (cct, 1 ) << " Kafka connect: user/password are only allowed over secure connection" << dendl;
553579 return nullptr ;
554580 }
@@ -568,7 +594,7 @@ class Manager {
568594 ldout (cct, 1 ) << " Kafka connect: max connections exceeded" << dendl;
569595 return nullptr ;
570596 }
571- const auto conn = create_new_connection (broker, cct, use_ssl, verify_ssl, ca_location, user, password);
597+ const auto conn = create_new_connection (broker, cct, use_ssl, verify_ssl, ca_location, user, password, mechanism );
572598 // create_new_connection must always return a connection object
573599 // even if error occurred during creation.
574600 // in such a case the creation will be retried in the main thread
@@ -671,9 +697,10 @@ void shutdown() {
671697}
672698
673699connection_ptr_t connect (const std::string& url, bool use_ssl, bool verify_ssl,
674- boost::optional<const std::string&> ca_location) {
700+ boost::optional<const std::string&> ca_location,
701+ boost::optional<const std::string&> mechanism) {
675702 if (!s_manager) return nullptr ;
676- return s_manager->connect (url, use_ssl, verify_ssl, ca_location);
703+ return s_manager->connect (url, use_ssl, verify_ssl, ca_location, mechanism );
677704}
678705
679706int publish (connection_ptr_t & conn,
0 commit comments