Skip to content

Commit 51fd551

Browse files
Merge pull request #28708 from michael-redpanda/sl/support-plain
Add support for PLAIN authn for Shadow Linking
2 parents 310cbb7 + 8acce17 commit 51fd551

File tree

10 files changed

+346
-71
lines changed

10 files changed

+346
-71
lines changed

proto/redpanda/core/admin/v2/shadow_link.proto

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -422,14 +422,30 @@ message SecuritySettingsSyncOptions {
422422
// ACL filters
423423
repeated ACLFilter acl_filters = 4;
424424
}
425-
// Authentication config. Currently only supporting SASL/SCRAM,
426-
// however made as a oneof for expansion
425+
// Authentication config. Supports:
426+
// * SASL/SCRAM
427+
// * SASL/PLAIN
427428
message AuthenticationConfiguration {
428429
oneof authentication {
429430
// SASL/SCRAM configuration
430431
ScramConfig scram_configuration = 1;
432+
// SASL/PLAIN configuration
433+
PlainConfig plain_configuration = 2;
431434
}
432435
}
436+
// PLAIN settings
437+
message PlainConfig {
438+
// PLAIN username
439+
string username = 1;
440+
// Password
441+
string password = 2 [(google.api.field_behavior) = INPUT_ONLY];
442+
// Indicates that the password has been set
443+
bool password_set = 3 [(google.api.field_behavior) = OUTPUT_ONLY];
444+
// Timestamp of when the password was last set - only valid if password_set
445+
// is true
446+
google.protobuf.Timestamp password_set_at = 4
447+
[(google.api.field_behavior) = OUTPUT_ONLY];
448+
}
433449
// SCRAM settings
434450
message ScramConfig {
435451
// SCRAM username

src/go/rpk/gen/protocomments/admin/v2/comments.pb.go

Lines changed: 14 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/v/cluster/cluster_link/frontend.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -981,8 +981,8 @@ errc frontend::validator::validate_connection_config(
981981
}
982982

983983
if (
984-
c.mechanism != "SCRAM-SHA-256"
985-
&& c.mechanism != "SCRAM-SHA-512") {
984+
c.mechanism != "SCRAM-SHA-256" && c.mechanism != "SCRAM-SHA-512"
985+
&& c.mechanism != "PLAIN") {
986986
vlog(
987987
cluster::clusterlog.warn,
988988
"Unsupported SCRAM mechanism: {}",

src/v/kafka/client/broker.cc

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "net/connection.h"
1616
#include "random/generators.h"
1717
#include "security/oidc_authenticator.h"
18+
#include "security/plain_authenticator.h"
1819
#include "security/scram_authenticator.h"
1920
#include "thirdparty/c-ares/ares.h"
2021
#include "utils/backoff_policy.h"
@@ -320,7 +321,8 @@ ss::future<> remote_broker::do_authenticate() {
320321
if (
321322
mechanism != security::scram_sha256_authenticator::name
322323
&& mechanism != security::scram_sha512_authenticator::name
323-
&& mechanism != security::oidc::sasl_authenticator::name) {
324+
&& mechanism != security::oidc::sasl_authenticator::name
325+
&& mechanism != security::plain_authenticator::name) {
324326
throw broker_error{
325327
_node_id,
326328
error_code::sasl_authentication_failed,
@@ -348,11 +350,17 @@ ss::future<> remote_broker::do_authenticate() {
348350

349351
if (mechanism == security::scram_sha256_authenticator::name) {
350352
co_await do_authenticate_scram256(username, password);
351-
352353
} else if (mechanism == security::scram_sha512_authenticator::name) {
353354
co_await do_authenticate_scram512(username, password);
354355
} else if (mechanism == security::oidc::sasl_authenticator::name) {
355356
co_await do_authenticate_oauthbearer(password);
357+
} else if (mechanism == security::plain_authenticator::name) {
358+
co_await do_authenticate_plain(username, password);
359+
} else {
360+
throw broker_error{
361+
_node_id,
362+
error_code::sasl_authentication_failed,
363+
fmt_with_ctx(ssx::sformat, "Unknown mechanism: {}", mechanism)};
356364
}
357365
}
358366

@@ -520,6 +528,27 @@ ss::future<> remote_broker::do_authenticate_oauthbearer(ss::sstring token) {
520528
}
521529
}
522530

531+
ss::future<> remote_broker::do_authenticate_plain(
532+
ss::sstring username, ss::sstring password) {
533+
sasl_authenticate_request req;
534+
std::string bytes;
535+
// 2 - number of null characters in the PLAIN auth message
536+
bytes.reserve(2 + username.size() + password.size());
537+
bytes.push_back('\0');
538+
bytes.append(username.cbegin(), username.cend());
539+
bytes.push_back('\0');
540+
bytes.append(password.cbegin(), password.cend());
541+
req.data.auth_bytes = bytes::from_string(std::move(bytes));
542+
auto res = co_await do_dispatch(
543+
std::move(req), get_sasl_authenticate_request_version());
544+
if (res.data.errored()) {
545+
throw broker_error{
546+
_node_id,
547+
res.data.error_code,
548+
res.data.error_message.value_or("<no error message>")};
549+
}
550+
}
551+
523552
namespace {
524553
template<typename ReqT>
525554
api_version get_auth_request_version(

src/v/kafka/client/broker.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ class remote_broker
191191
ss::future<>
192192
do_authenticate_scram512(ss::sstring username, ss::sstring password);
193193
ss::future<> do_authenticate_oauthbearer(ss::sstring token);
194+
ss::future<>
195+
do_authenticate_plain(ss::sstring username, ss::sstring password);
194196
ss::future<security::server_first_message>
195197
send_scram_client_first(const security::client_first_message& client_first);
196198
ss::future<security::server_final_message>

src/v/redpanda/admin/services/shadow_link/converter.cc

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ using proto::admin::authentication_configuration;
3030
using proto::admin::consumer_offset_sync_options;
3131
using proto::admin::create_shadow_link_request;
3232
using proto::admin::name_filter;
33+
using proto::admin::plain_config;
3334
using proto::admin::schema_registry_sync_options;
3435
using proto::admin::schema_registry_sync_options_shadow_schema_registry_topic;
3536
using proto::admin::scram_config;
@@ -407,6 +408,19 @@ create_authn_settings(const authentication_configuration& authn_config) {
407408
scram_mechanism_to_string(scram.get_scram_mechanism())};
408409
return creds;
409410
},
411+
[](const plain_config& plain)
412+
-> cluster_link::model::connection_config::authn_variant {
413+
if (plain.get_username().empty() || plain.get_password().empty()) {
414+
throw std::invalid_argument(
415+
"When setting PLAIN configuration, must provide username and "
416+
"password");
417+
}
418+
cluster_link::model::scram_credentials creds;
419+
creds.username = plain.get_username();
420+
creds.password = plain.get_password();
421+
creds.mechanism = "PLAIN";
422+
return creds;
423+
},
410424
[](std::monostate)
411425
-> cluster_link::model::connection_config::authn_variant {
412426
throw std::invalid_argument(
@@ -524,26 +538,38 @@ authentication_configuration create_authentication_configuration(
524538
authn,
525539
[](const cluster_link::model::scram_credentials& scram)
526540
-> authentication_configuration {
527-
scram_config scram_proto;
528-
scram_proto.set_username(ss::sstring{scram.username});
529-
scram_proto.set_password_set(true);
530-
scram_proto.set_password_set_at(
531-
absl::FromChrono(
532-
model::to_time_point(scram.password_last_updated)));
533-
scram_proto.set_scram_mechanism(
534-
proto::admin::scram_mechanism::unspecified);
535-
if (scram.mechanism == "SCRAM-SHA-256") {
536-
scram_proto.set_scram_mechanism(
537-
proto::admin::scram_mechanism::scram_sha_256);
538-
} else if (scram.mechanism == "SCRAM-SHA-512") {
539-
scram_proto.set_scram_mechanism(
540-
proto::admin::scram_mechanism::scram_sha_512);
541+
authentication_configuration authn;
542+
if (scram.mechanism == "PLAIN") {
543+
plain_config plain_proto;
544+
plain_proto.set_username(ss::sstring{scram.username});
545+
plain_proto.set_password_set(true);
546+
plain_proto.set_password_set_at(
547+
absl::FromChrono(
548+
model::to_time_point(scram.password_last_updated)));
549+
authn.set_plain_configuration(std::move(plain_proto));
541550
} else {
542-
throw std::invalid_argument(
543-
ssx::sformat("Unknown SCRAM mechanism: {}", scram.mechanism));
551+
scram_config scram_proto;
552+
scram_proto.set_username(ss::sstring{scram.username});
553+
scram_proto.set_password_set(true);
554+
scram_proto.set_password_set_at(
555+
absl::FromChrono(
556+
model::to_time_point(scram.password_last_updated)));
557+
scram_proto.set_scram_mechanism(
558+
proto::admin::scram_mechanism::unspecified);
559+
if (scram.mechanism == "SCRAM-SHA-256") {
560+
scram_proto.set_scram_mechanism(
561+
proto::admin::scram_mechanism::scram_sha_256);
562+
} else if (scram.mechanism == "SCRAM-SHA-512") {
563+
scram_proto.set_scram_mechanism(
564+
proto::admin::scram_mechanism::scram_sha_512);
565+
} else {
566+
throw std::invalid_argument(
567+
ssx::sformat(
568+
"Unknown SCRAM mechanism: {}", scram.mechanism));
569+
}
570+
571+
authn.set_scram_configuration(std::move(scram_proto));
544572
}
545-
authentication_configuration authn;
546-
authn.set_scram_configuration(std::move(scram_proto));
547573
return authn;
548574
});
549575
}

src/v/redpanda/admin/services/shadow_link/tests/converter_test.cc

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,53 @@ TEST(converter_test, create_with_authn_config_scram_unspecified) {
247247
serde::pb::rpc::invalid_argument_exception);
248248
}
249249

250+
TEST(converter_test, create_with_plain) {
251+
const auto name = "test-link";
252+
const auto username = "test-user";
253+
const auto password = "test-password";
254+
255+
proto::admin::shadow_link shadow_link;
256+
proto::admin::create_shadow_link_request req;
257+
proto::admin::shadow_link_configurations shadow_link_configurations;
258+
proto::admin::shadow_link_client_options shadow_link_client_options;
259+
proto::admin::authentication_configuration authn_config;
260+
proto::admin::plain_config plain_config;
261+
262+
plain_config.set_username(ss::sstring{username});
263+
plain_config.set_password(ss::sstring{password});
264+
authn_config.set_plain_configuration(std::move(plain_config));
265+
shadow_link_client_options.set_authentication_configuration(
266+
std::move(authn_config));
267+
268+
shadow_link_client_options.set_bootstrap_servers({"localhost:9092"});
269+
shadow_link_configurations.set_client_options(
270+
std::move(shadow_link_client_options));
271+
272+
shadow_link.set_configurations(std::move(shadow_link_configurations));
273+
shadow_link.set_name(ss::sstring{name});
274+
req.set_shadow_link(std::move(shadow_link));
275+
276+
auto now = model::to_time_point(model::timestamp::now());
277+
auto md = admin::convert_create_to_metadata(std::move(req));
278+
279+
ASSERT_TRUE(md.connection.authn_config.has_value());
280+
ASSERT_TRUE(
281+
std::holds_alternative<cluster_link::model::scram_credentials>(
282+
md.connection.authn_config.value()));
283+
const auto& md_authn_config
284+
= std::get<cluster_link::model::scram_credentials>(
285+
md.connection.authn_config.value());
286+
287+
EXPECT_EQ(md_authn_config.username, username);
288+
EXPECT_EQ(md_authn_config.password, password);
289+
EXPECT_EQ(md_authn_config.mechanism, "PLAIN");
290+
auto pwd_updated = model::to_time_point(
291+
md_authn_config.password_last_updated);
292+
// Expect the password updated time to be within 10s
293+
EXPECT_GE(pwd_updated, now - 5s);
294+
EXPECT_LE(pwd_updated, now + 5s);
295+
}
296+
250297
TEST(converter_test, create_with_tls_flag_only) {
251298
const auto name = "test-link";
252299
proto::admin::shadow_link shadow_link;
@@ -745,6 +792,26 @@ TEST(converter_test, metadata_to_shadow_link_authn_invalid_scram) {
745792
admin::metadata_to_shadow_link(std::move(md), {}), std::invalid_argument);
746793
}
747794

795+
TEST(converter_test, metadata_to_shadow_link_authn_plain) {
796+
cluster_link::model::metadata md;
797+
md.connection.authn_config = cluster_link::model::scram_credentials{
798+
.username = "test-user",
799+
.password = "test-password",
800+
.mechanism = "PLAIN"};
801+
802+
auto sl = admin::metadata_to_shadow_link(std::move(md), {});
803+
804+
const auto& client_options = sl.get_configurations().get_client_options();
805+
ASSERT_TRUE(client_options.has_authentication_configuration());
806+
ASSERT_TRUE(client_options.get_authentication_configuration()
807+
.has_plain_configuration());
808+
const auto& plain_config = client_options.get_authentication_configuration()
809+
.get_plain_configuration();
810+
EXPECT_EQ(plain_config.get_username(), "test-user");
811+
EXPECT_TRUE(plain_config.get_password_set());
812+
EXPECT_TRUE(plain_config.get_password().empty());
813+
}
814+
748815
TEST(converter_test, metadata_to_shadow_link_tls_file) {
749816
const auto ca_file = "ca_file.pem";
750817
const auto key_file = "key_file.pem";

0 commit comments

Comments
 (0)