Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions proto/redpanda/core/admin/v2/shadow_link.proto
Original file line number Diff line number Diff line change
Expand Up @@ -422,14 +422,30 @@ message SecuritySettingsSyncOptions {
// ACL filters
repeated ACLFilter acl_filters = 4;
}
// Authentication config. Currently only supporting SASL/SCRAM,
// however made as a oneof for expansion
// Authentication config. Supports:
// * SASL/SCRAM
// * SASL/PLAIN
message AuthenticationConfiguration {
oneof authentication {
// SASL/SCRAM configuration
ScramConfig scram_configuration = 1;
// SASL/PLAIN configuration
PlainConfig plain_configuration = 2;
}
}
// PLAIN settings
message PlainConfig {
// PLAIN username
string username = 1;
// Password
string password = 2 [(google.api.field_behavior) = INPUT_ONLY];
// Indicates that the password has been set
bool password_set = 3 [(google.api.field_behavior) = OUTPUT_ONLY];
// Timestamp of when the password was last set - only valid if password_set
// is true
google.protobuf.Timestamp password_set_at = 4
[(google.api.field_behavior) = OUTPUT_ONLY];
}
// SCRAM settings
message ScramConfig {
// SCRAM username
Expand Down
20 changes: 14 additions & 6 deletions src/go/rpk/gen/protocomments/admin/v2/comments.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/v/cluster/cluster_link/frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -981,8 +981,8 @@ errc frontend::validator::validate_connection_config(
}

if (
c.mechanism != "SCRAM-SHA-256"
&& c.mechanism != "SCRAM-SHA-512") {
c.mechanism != "SCRAM-SHA-256" && c.mechanism != "SCRAM-SHA-512"
&& c.mechanism != "PLAIN") {
vlog(
cluster::clusterlog.warn,
"Unsupported SCRAM mechanism: {}",
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message at line 988 still refers to 'Unsupported SCRAM mechanism' but PLAIN is not a SCRAM mechanism. The message should be updated to 'Unsupported authentication mechanism' to accurately reflect that both SCRAM and PLAIN mechanisms are now supported.

Suggested change
"Unsupported SCRAM mechanism: {}",
"Unsupported authentication mechanism: {}",

Copilot uses AI. Check for mistakes.
Expand Down
33 changes: 31 additions & 2 deletions src/v/kafka/client/broker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "net/connection.h"
#include "random/generators.h"
#include "security/oidc_authenticator.h"
#include "security/plain_authenticator.h"
#include "security/scram_authenticator.h"
#include "thirdparty/c-ares/ares.h"
#include "utils/backoff_policy.h"
Expand Down Expand Up @@ -320,7 +321,8 @@ ss::future<> remote_broker::do_authenticate() {
if (
mechanism != security::scram_sha256_authenticator::name
&& mechanism != security::scram_sha512_authenticator::name
&& mechanism != security::oidc::sasl_authenticator::name) {
&& mechanism != security::oidc::sasl_authenticator::name
&& mechanism != security::plain_authenticator::name) {
throw broker_error{
_node_id,
error_code::sasl_authentication_failed,
Expand Down Expand Up @@ -348,11 +350,17 @@ ss::future<> remote_broker::do_authenticate() {

if (mechanism == security::scram_sha256_authenticator::name) {
co_await do_authenticate_scram256(username, password);

} else if (mechanism == security::scram_sha512_authenticator::name) {
co_await do_authenticate_scram512(username, password);
} else if (mechanism == security::oidc::sasl_authenticator::name) {
co_await do_authenticate_oauthbearer(password);
} else if (mechanism == security::plain_authenticator::name) {
co_await do_authenticate_plain(username, password);
} else {
throw broker_error{
_node_id,
error_code::sasl_authentication_failed,
fmt_with_ctx(ssx::sformat, "Unknown mechanism: {}", mechanism)};
}
Comment on lines 353 to 364
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added else-if at line 357-358 makes the earlier check at lines 321-328 redundant. The earlier check validates that mechanism must be one of scram_sha256, scram_sha512, oidc, or plain, but the else block at lines 359-364 now provides the same validation with a better error message. Consider removing the earlier validation block to avoid duplication.

Copilot uses AI. Check for mistakes.
}

Expand Down Expand Up @@ -520,6 +528,27 @@ ss::future<> remote_broker::do_authenticate_oauthbearer(ss::sstring token) {
}
}

ss::future<> remote_broker::do_authenticate_plain(
ss::sstring username, ss::sstring password) {
sasl_authenticate_request req;
std::string bytes;
// 2 - number of null characters in the PLAIN auth message
bytes.reserve(2 + username.size() + password.size());
bytes.push_back('\0');
bytes.append(username.cbegin(), username.cend());
bytes.push_back('\0');
bytes.append(password.cbegin(), password.cend());
req.data.auth_bytes = bytes::from_string(std::move(bytes));
Comment on lines +534 to +541
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick:

Suggested change
std::string bytes;
// 2 - number of null characters in the PLAIN auth message
bytes.reserve(2 + username.size() + password.size());
bytes.push_back('\0');
bytes.append(username.cbegin(), username.cend());
bytes.push_back('\0');
bytes.append(password.cbegin(), password.cend());
req.data.auth_bytes = bytes::from_string(std::move(bytes));
auto bytes = fmt::format("\0{}\0{}", username, password);
req.data.auth_bytes = bytes::from_string(bytes);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I tried this but I think the std::string_view that gets created via the bytes::from_string reports a length of 0 since the string begins with a null character

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You, my bad, you'll need:

    auto bytes = fmt::format("\0{}\0{}", username, password);
    req.data.auth_bytes = bytes::from_string(std::string_view{bytes.begin(), bytes.end()});

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That unfortunately won't work either... end() is start() + size() and in this case size() is 0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work if the format string is a std::string_view!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work if the format string is a std::string_view!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide an example of what you mean please

auto res = co_await do_dispatch(
std::move(req), get_sasl_authenticate_request_version());
if (res.data.errored()) {
throw broker_error{
_node_id,
res.data.error_code,
res.data.error_message.value_or("<no error message>")};
}
}

namespace {
template<typename ReqT>
api_version get_auth_request_version(
Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/client/broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ class remote_broker
ss::future<>
do_authenticate_scram512(ss::sstring username, ss::sstring password);
ss::future<> do_authenticate_oauthbearer(ss::sstring token);
ss::future<>
do_authenticate_plain(ss::sstring username, ss::sstring password);
ss::future<security::server_first_message>
send_scram_client_first(const security::client_first_message& client_first);
ss::future<security::server_final_message>
Expand Down
62 changes: 44 additions & 18 deletions src/v/redpanda/admin/services/shadow_link/converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ using proto::admin::authentication_configuration;
using proto::admin::consumer_offset_sync_options;
using proto::admin::create_shadow_link_request;
using proto::admin::name_filter;
using proto::admin::plain_config;
using proto::admin::schema_registry_sync_options;
using proto::admin::schema_registry_sync_options_shadow_schema_registry_topic;
using proto::admin::scram_config;
Expand Down Expand Up @@ -407,6 +408,19 @@ create_authn_settings(const authentication_configuration& authn_config) {
scram_mechanism_to_string(scram.get_scram_mechanism())};
return creds;
},
[](const plain_config& plain)
-> cluster_link::model::connection_config::authn_variant {
if (plain.get_username().empty() || plain.get_password().empty()) {
throw std::invalid_argument(
"When setting PLAIN configuration, must provide username and "
"password");
}
cluster_link::model::scram_credentials creds;
creds.username = plain.get_username();
creds.password = plain.get_password();
creds.mechanism = "PLAIN";
return creds;
},
[](std::monostate)
-> cluster_link::model::connection_config::authn_variant {
throw std::invalid_argument(
Expand Down Expand Up @@ -524,26 +538,38 @@ authentication_configuration create_authentication_configuration(
authn,
[](const cluster_link::model::scram_credentials& scram)
-> authentication_configuration {
scram_config scram_proto;
scram_proto.set_username(ss::sstring{scram.username});
scram_proto.set_password_set(true);
scram_proto.set_password_set_at(
absl::FromChrono(
model::to_time_point(scram.password_last_updated)));
scram_proto.set_scram_mechanism(
proto::admin::scram_mechanism::unspecified);
if (scram.mechanism == "SCRAM-SHA-256") {
scram_proto.set_scram_mechanism(
proto::admin::scram_mechanism::scram_sha_256);
} else if (scram.mechanism == "SCRAM-SHA-512") {
scram_proto.set_scram_mechanism(
proto::admin::scram_mechanism::scram_sha_512);
authentication_configuration authn;
if (scram.mechanism == "PLAIN") {
plain_config plain_proto;
plain_proto.set_username(ss::sstring{scram.username});
plain_proto.set_password_set(true);
plain_proto.set_password_set_at(
absl::FromChrono(
model::to_time_point(scram.password_last_updated)));
authn.set_plain_configuration(std::move(plain_proto));
} else {
throw std::invalid_argument(
ssx::sformat("Unknown SCRAM mechanism: {}", scram.mechanism));
scram_config scram_proto;
scram_proto.set_username(ss::sstring{scram.username});
scram_proto.set_password_set(true);
scram_proto.set_password_set_at(
absl::FromChrono(
model::to_time_point(scram.password_last_updated)));
scram_proto.set_scram_mechanism(
proto::admin::scram_mechanism::unspecified);
if (scram.mechanism == "SCRAM-SHA-256") {
scram_proto.set_scram_mechanism(
proto::admin::scram_mechanism::scram_sha_256);
} else if (scram.mechanism == "SCRAM-SHA-512") {
scram_proto.set_scram_mechanism(
proto::admin::scram_mechanism::scram_sha_512);
} else {
throw std::invalid_argument(
ssx::sformat(
"Unknown SCRAM mechanism: {}", scram.mechanism));
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message refers to 'SCRAM mechanism' but this error can now be triggered when using PLAIN authentication (which is not a SCRAM mechanism). Consider changing to 'Unknown authentication mechanism: {}' or adding separate validation to distinguish between SCRAM and PLAIN cases.

Suggested change
"Unknown SCRAM mechanism: {}", scram.mechanism));
"Unknown authentication mechanism: {}", scram.mechanism));

Copilot uses AI. Check for mistakes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're gonna do that, it can go in another else on the outer scope. It's unreachable, though.

}

authn.set_scram_configuration(std::move(scram_proto));
}
authentication_configuration authn;
authn.set_scram_configuration(std::move(scram_proto));
return authn;
});
}
Expand Down
67 changes: 67 additions & 0 deletions src/v/redpanda/admin/services/shadow_link/tests/converter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,53 @@ TEST(converter_test, create_with_authn_config_scram_unspecified) {
serde::pb::rpc::invalid_argument_exception);
}

TEST(converter_test, create_with_plain) {
const auto name = "test-link";
const auto username = "test-user";
const auto password = "test-password";

proto::admin::shadow_link shadow_link;
proto::admin::create_shadow_link_request req;
proto::admin::shadow_link_configurations shadow_link_configurations;
proto::admin::shadow_link_client_options shadow_link_client_options;
proto::admin::authentication_configuration authn_config;
proto::admin::plain_config plain_config;

plain_config.set_username(ss::sstring{username});
plain_config.set_password(ss::sstring{password});
authn_config.set_plain_configuration(std::move(plain_config));
shadow_link_client_options.set_authentication_configuration(
std::move(authn_config));

shadow_link_client_options.set_bootstrap_servers({"localhost:9092"});
shadow_link_configurations.set_client_options(
std::move(shadow_link_client_options));

shadow_link.set_configurations(std::move(shadow_link_configurations));
shadow_link.set_name(ss::sstring{name});
req.set_shadow_link(std::move(shadow_link));

auto now = model::to_time_point(model::timestamp::now());
auto md = admin::convert_create_to_metadata(std::move(req));

ASSERT_TRUE(md.connection.authn_config.has_value());
ASSERT_TRUE(
std::holds_alternative<cluster_link::model::scram_credentials>(
md.connection.authn_config.value()));
const auto& md_authn_config
= std::get<cluster_link::model::scram_credentials>(
md.connection.authn_config.value());

EXPECT_EQ(md_authn_config.username, username);
EXPECT_EQ(md_authn_config.password, password);
EXPECT_EQ(md_authn_config.mechanism, "PLAIN");
auto pwd_updated = model::to_time_point(
md_authn_config.password_last_updated);
// Expect the password updated time to be within 10s
EXPECT_GE(pwd_updated, now - 5s);
EXPECT_LE(pwd_updated, now + 5s);
}

TEST(converter_test, create_with_tls_flag_only) {
const auto name = "test-link";
proto::admin::shadow_link shadow_link;
Expand Down Expand Up @@ -745,6 +792,26 @@ TEST(converter_test, metadata_to_shadow_link_authn_invalid_scram) {
admin::metadata_to_shadow_link(std::move(md), {}), std::invalid_argument);
}

TEST(converter_test, metadata_to_shadow_link_authn_plain) {
cluster_link::model::metadata md;
md.connection.authn_config = cluster_link::model::scram_credentials{
.username = "test-user",
.password = "test-password",
.mechanism = "PLAIN"};

auto sl = admin::metadata_to_shadow_link(std::move(md), {});

const auto& client_options = sl.get_configurations().get_client_options();
ASSERT_TRUE(client_options.has_authentication_configuration());
ASSERT_TRUE(client_options.get_authentication_configuration()
.has_plain_configuration());
const auto& plain_config = client_options.get_authentication_configuration()
.get_plain_configuration();
EXPECT_EQ(plain_config.get_username(), "test-user");
EXPECT_TRUE(plain_config.get_password_set());
EXPECT_TRUE(plain_config.get_password().empty());
}

TEST(converter_test, metadata_to_shadow_link_tls_file) {
const auto ca_file = "ca_file.pem";
const auto key_file = "key_file.pem";
Expand Down
Loading
Loading