Skip to content

Commit 0863349

Browse files
authored
Merge pull request ceph#56493 from igomon-bloomberg/wip_kafka_sasl_username_password_attrs
rgw/s3-notifications: use user-name/password topic attributes for SASL authentication Reviewed-by: Yuval Lifshitz <[email protected]>
2 parents 1215afa + 334009f commit 0863349

File tree

7 files changed

+81
-15
lines changed

7 files changed

+81
-15
lines changed

doc/radosgw/notifications.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ updating, use the name of an existing topic and different endpoint values).
164164
[&Attributes.entry.13.key=max_retries&Attributes.entry.13.value=<retries number>]
165165
[&Attributes.entry.14.key=retry_sleep_duration&Attributes.entry.14.value=<sleep seconds>]
166166
[&Attributes.entry.15.key=Policy&Attributes.entry.15.value=<policy-JSON-string>]
167+
[&Attributes.entry.16.key=user-name&Attributes.entry.16.value=<user-name-string>]
168+
[&Attributes.entry.17.key=password&Attributes.entry.17.value=<password-string>]
167169

168170
Request parameters:
169171

@@ -252,6 +254,10 @@ Request parameters:
252254
- user/password: This should be provided over HTTPS. If not, the config parameter `rgw_allow_notification_secrets_in_cleartext` must be `true` in order to create topics.
253255
- user/password: This should be provided together with ``use-ssl``. If not, the broker credentials will be sent over insecure transport.
254256
- mechanism: may be provided together with user/password (default: ``PLAIN``). The supported SASL mechanisms are:
257+
- ``user-name``: User name to use when connecting to the Kafka broker. If both this parameter and URI user are provided then this parameter overrides the URI user.
258+
The same security considerations are in place for this parameter as are for user/password.
259+
- ``password``: Password to use when connecting to the Kafka broker. If both this parameter and URI password are provided then this parameter overrides the URI password.
260+
The same security considerations are in place for this parameter as are for user/password.
255261

256262
- PLAIN
257263
- SCRAM-SHA-256

src/rgw/driver/rados/rgw_pubsub_push.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,13 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
303303
cct(_cct),
304304
topic(_topic),
305305
ack_level(get_ack_level(args)) {
306-
if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true),
307-
args.get_optional("ca-location"), args.get_optional("mechanism"))) {
306+
if (!kafka::connect(conn_name, _endpoint,
307+
get_bool(args, "use-ssl", false),
308+
get_bool(args, "verify-ssl", true),
309+
args.get_optional("ca-location"),
310+
args.get_optional("mechanism"),
311+
args.get_optional("user-name"),
312+
args.get_optional("password"))) {
308313
throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
309314
}
310315
}

src/rgw/rgw_kafka.cc

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,9 @@ class Manager {
554554
bool use_ssl,
555555
bool verify_ssl,
556556
boost::optional<const std::string&> ca_location,
557-
boost::optional<const std::string&> mechanism) {
557+
boost::optional<const std::string&> mechanism,
558+
boost::optional<const std::string&> topic_user_name,
559+
boost::optional<const std::string&> topic_password) {
558560
if (stopped) {
559561
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
560562
return false;
@@ -568,6 +570,21 @@ class Manager {
568570
return false;
569571
}
570572

573+
// check if username/password was already supplied via topic attributes
574+
// and if also provided as part of the endpoint URL issue a warning
575+
if (topic_user_name.has_value()) {
576+
if (!user.empty()) {
577+
ldout(cct, 5) << "Kafka connect: username provided via both topic attributes and endpoint URL: using topic attributes" << dendl;
578+
}
579+
user = topic_user_name.get();
580+
}
581+
if (topic_password.has_value()) {
582+
if (!password.empty()) {
583+
ldout(cct, 5) << "Kafka connect: password provided via both topic attributes and endpoint URL: using topic attributes" << dendl;
584+
}
585+
password = topic_password.get();
586+
}
587+
571588
// this should be validated by the regex in parse_url()
572589
ceph_assert(user.empty() == password.empty());
573590

@@ -694,9 +711,11 @@ void shutdown() {
694711

695712
bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
696713
boost::optional<const std::string&> ca_location,
697-
boost::optional<const std::string&> mechanism) {
714+
boost::optional<const std::string&> mechanism,
715+
boost::optional<const std::string&> user_name,
716+
boost::optional<const std::string&> password) {
698717
if (!s_manager) return false;
699-
return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism);
718+
return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password);
700719
}
701720

702721
int publish(const std::string& conn_name,

src/rgw/rgw_kafka.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,14 @@ bool init(CephContext* cct);
2222
void shutdown();
2323

2424
// connect to a kafka endpoint
25-
bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism);
25+
bool connect(std::string& broker,
26+
const std::string& url,
27+
bool use_ssl,
28+
bool verify_ssl,
29+
boost::optional<const std::string&> ca_location,
30+
boost::optional<const std::string&> mechanism,
31+
boost::optional<const std::string&> user_name,
32+
boost::optional<const std::string&> password);
2633

2734
// publish a message over a connection that was already created
2835
int publish(const std::string& conn_name,

src/rgw/rgw_rest_pubsub.cc

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ bool verify_transport_security(CephContext *cct, const RGWEnv& env) {
3737
// make sure that if user/password are passed inside URL, it is over secure connection
3838
// update rgw_pubsub_dest to indicate that a password is stored in the URL
3939
bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct,
40-
const RGWEnv& env, std::string& message)
40+
const req_info& ri, std::string& message)
4141
{
4242
if (dest.push_endpoint.empty()) {
4343
return true;
@@ -48,11 +48,31 @@ bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct
4848
message = "Malformed URL for push-endpoint";
4949
return false;
5050
}
51+
52+
const auto& args=ri.args;
53+
auto topic_user_name=args.get_optional("user-name");
54+
auto topic_password=args.get_optional("password");
55+
56+
// check if username/password was already supplied via topic attributes
57+
// and if also provided as part of the endpoint URL issue a warning
58+
if (topic_user_name.has_value()) {
59+
if (!user.empty()) {
60+
message = "Username provided via both topic attributes and endpoint URL: using topic attributes";
61+
}
62+
user = topic_user_name.get();
63+
}
64+
if (topic_password.has_value()) {
65+
if (!password.empty()) {
66+
message = "Password provided via both topic attributes and endpoint URL: using topic attributes";
67+
}
68+
password = topic_password.get();
69+
}
70+
5171
// this should be verified inside parse_url()
5272
ceph_assert(user.empty() == password.empty());
5373
if (!user.empty()) {
5474
dest.stored_secret = true;
55-
if (!verify_transport_security(cct, env)) {
75+
if (!verify_transport_security(cct, *ri.env)) {
5676
message = "Topic contains secrets that must be transmitted over a secure transport";
5777
return false;
5878
}
@@ -241,7 +261,7 @@ class RGWPSCreateTopicOp : public RGWOp {
241261
s->info.args.get_int("max_retries", reinterpret_cast<int *>(&dest.max_retries), rgw::notify::DEFAULT_GLOBAL_VALUE);
242262
s->info.args.get_int("retry_sleep_duration", reinterpret_cast<int *>(&dest.retry_sleep_duration), rgw::notify::DEFAULT_GLOBAL_VALUE);
243263

244-
if (!validate_and_update_endpoint_secret(dest, s->cct, *s->info.env, s->err.message)) {
264+
if (!validate_and_update_endpoint_secret(dest, s->cct, s->info, s->err.message)) {
245265
return -EINVAL;
246266
}
247267
// Store topic Policy.
@@ -729,7 +749,7 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
729749
rgw::notify::DEFAULT_GLOBAL_VALUE);
730750
} else if (attribute_name == "push-endpoint") {
731751
dest.push_endpoint = s->info.args.get("AttributeValue");
732-
if (!validate_and_update_endpoint_secret(dest, s->cct, *s->info.env, s->err.message)) {
752+
if (!validate_and_update_endpoint_secret(dest, s->cct, s->info, s->err.message)) {
733753
return -EINVAL;
734754
}
735755
} else if (attribute_name == "Policy") {
@@ -755,7 +775,8 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
755775
};
756776
static constexpr std::initializer_list<const char*> args = {
757777
"verify-ssl", "use-ssl", "ca-location", "amqp-ack-level",
758-
"amqp-exchange", "kafka-ack-level", "mechanism", "cloudevents"};
778+
"amqp-exchange", "kafka-ack-level", "mechanism", "cloudevents",
779+
"user-name", "password"};
759780
if (std::find(args.begin(), args.end(), attribute_name) != args.end()) {
760781
replace_str(attribute_name, s->info.args.get("AttributeValue"));
761782
return 0;

src/rgw/rgw_rest_s3.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5174,8 +5174,6 @@ void update_attribute_map(const std::string& input, AttributeMap& map) {
51745174
void parse_post_action(const std::string& post_body, req_state* s)
51755175
{
51765176
if (post_body.size() > 0) {
5177-
ldpp_dout(s, 10) << "Content of POST: " << post_body << dendl;
5178-
51795177
if (post_body.find("Action") != string::npos) {
51805178
const boost::char_separator<char> sep("&");
51815179
const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep);

src/test/rgw/bucket_notification/test_bn.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4389,7 +4389,7 @@ def test_ps_s3_topic_no_permissions():
43894389
conn2.delete_bucket(bucket_name)
43904390

43914391

4392-
def kafka_security(security_type, mechanism='PLAIN'):
4392+
def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False):
43934393
""" test pushing kafka s3 notification securly to master """
43944394
conn = connection()
43954395
zonegroup = get_config_zonegroup()
@@ -4400,7 +4400,10 @@ def kafka_security(security_type, mechanism='PLAIN'):
44004400
topic_name = bucket_name+'_topic'
44014401
# create s3 topic
44024402
if security_type == 'SASL_SSL':
4403-
endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
4403+
if not use_topic_attrs_for_creds:
4404+
endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
4405+
else:
4406+
endpoint_address = 'kafka://' + kafka_server + ':9094'
44044407
elif security_type == 'SSL':
44054408
endpoint_address = 'kafka://' + kafka_server + ':9093'
44064409
elif security_type == 'SASL_PLAINTEXT':
@@ -4413,6 +4416,8 @@ def kafka_security(security_type, mechanism='PLAIN'):
44134416
elif security_type == 'SASL_SSL':
44144417
KAFKA_DIR = os.environ['KAFKA_DIR']
44154418
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt&mechanism='+mechanism
4419+
if use_topic_attrs_for_creds:
4420+
endpoint_args += '&user-name=alice&password=alice-secret'
44164421
else:
44174422
KAFKA_DIR = os.environ['KAFKA_DIR']
44184423
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt'
@@ -4485,6 +4490,11 @@ def test_ps_s3_notification_push_kafka_security_ssl_sasl():
44854490
kafka_security('SASL_SSL')
44864491

44874492

4493+
@attr('kafka_security_test')
4494+
def test_ps_s3_notification_push_kafka_security_ssl_sasl_attrs():
4495+
kafka_security('SASL_SSL', use_topic_attrs_for_creds=True)
4496+
4497+
44884498
@attr('kafka_security_test')
44894499
def test_ps_s3_notification_push_kafka_security_sasl():
44904500
kafka_security('SASL_PLAINTEXT')

0 commit comments

Comments
 (0)