Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions proto/redpanda/core/admin/v2/shadow_link.proto
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ message PlainConfig {
// PLAIN username
string username = 1;
// Password
string password = 2 [(google.api.field_behavior) = INPUT_ONLY];
string password = 2
[(google.api.field_behavior) = INPUT_ONLY, debug_redact = true];
// Indicates that the password has been set
bool password_set = 3 [(google.api.field_behavior) = OUTPUT_ONLY];
// Timestamp of when the password was last set - only valid if password_set
Expand All @@ -451,7 +452,8 @@ message ScramConfig {
// SCRAM username
string username = 1;
// Password
string password = 2 [(google.api.field_behavior) = INPUT_ONLY];
string password = 2
[(google.api.field_behavior) = INPUT_ONLY, debug_redact = true];
// Indicates that the password has been set
bool password_set = 3 [(google.api.field_behavior) = OUTPUT_ONLY];
// Timestamp of when the password was last set - only valid if password_set
Expand Down
3 changes: 2 additions & 1 deletion proto/redpanda/core/common/v1/tls.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ message TLSPEMSettings {
string ca = 1;
// Key and Cert are optional but if one is provided, then both must be
// The key
string key = 2 [(google.api.field_behavior) = INPUT_ONLY];
string key = 2
[(google.api.field_behavior) = INPUT_ONLY, debug_redact = true];
// The SHA-256 of the key, in base64 format
string key_fingerprint = 3 [(google.api.field_behavior) = OUTPUT_ONLY];
// The cert
Expand Down
13 changes: 13 additions & 0 deletions src/v/cluster_link/link.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ void link::update_config(
config,
revision);
chunked_vector<::model::topic> new_topics_to_replicate;
chunked_vector<::model::topic> topics_no_longer_mirroring;
for (const auto& [topic, m] : config.state.mirror_topics) {
if (
!_config.state.mirror_topics.contains(topic)
Expand All @@ -218,6 +219,11 @@ void link::update_config(
new_topics_to_replicate.push_back(topic);
}
}
for (const auto& [topic, _] : _config.state.mirror_topics) {
if (!config.state.mirror_topics.contains(topic)) {
topics_no_longer_mirroring.push_back(topic);
}
}
_config = std::move(config);
maybe_update_connection_configuration();

Expand All @@ -244,6 +250,13 @@ void link::update_config(
_config.name);
_replication_mgr.stop_replicators();
} else {
for (const auto& topic : topics_no_longer_mirroring) {
vlog(
cllog.debug,
"Topic {} is no longer mirrored, stopping its replicators",
topic);
_replication_mgr.stop_replicators(topic);
}
for (const auto& [topic, _] : _config.state.mirror_topics) {
if (requires_active_replicators(topic)) {
continue;
Expand Down
12 changes: 11 additions & 1 deletion src/v/redpanda/admin/services/shadow_link/converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,13 @@ chunked_vector<shadow_topic> create_shadow_topics(
return model_to_shadow_topic(p.first, p.second, status_report);
});

std::ranges::sort(
shadow_topics.begin(),
shadow_topics.end(),
[](const shadow_topic& a, const shadow_topic& b) {
return a.get_name() < b.get_name();
});

return shadow_topics;
}

Expand Down Expand Up @@ -1215,7 +1222,10 @@ chunked_vector<topic_partition_information> status_to_partition_information(
info.set_high_watermark(report.shadow_partition_high_watermark);
resp.emplace_back(std::move(info));
}

std::ranges::sort(
resp,
std::ranges::less{},
&topic_partition_information::get_partition_id);
return resp;
}
} // namespace
Expand Down
10 changes: 4 additions & 6 deletions src/v/redpanda/admin/services/shadow_link/shadow_link.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ shadow_link_service_impl::shadow_link_service_impl(
ss::future<proto::admin::create_shadow_link_response>
shadow_link_service_impl::create_shadow_link(
serde::pb::rpc::context, proto::admin::create_shadow_link_request req) {
vlog(sllog.trace, "create_shadow_link: {}", req);
auto redirect_node = redirect_to(model::controller_ntp);
if (redirect_node) {
vlog(
Expand All @@ -45,7 +44,7 @@ shadow_link_service_impl::create_shadow_link(
*redirect_node)
.create_shadow_link(serde::pb::rpc::context{}, std::move(req));
}

vlog(sllog.info, "create_shadow_link: {}", req);
auto md = convert_create_to_metadata(std::move(req));
auto get_resp = _service->local().get_cluster_link(md.name);
if (get_resp.has_value()) {
Expand All @@ -65,7 +64,6 @@ shadow_link_service_impl::create_shadow_link(
ss::future<proto::admin::delete_shadow_link_response>
shadow_link_service_impl::delete_shadow_link(
serde::pb::rpc::context ctx, proto::admin::delete_shadow_link_request req) {
vlog(sllog.trace, "delete_shadow_link: {}", req);
auto redirect_node = redirect_to(model::controller_ntp);
if (redirect_node) {
vlog(
Expand All @@ -78,7 +76,7 @@ shadow_link_service_impl::delete_shadow_link(
*redirect_node)
.delete_shadow_link(ctx, std::move(req));
}

vlog(sllog.info, "delete_shadow_link: {}", req);
handle_error(
co_await _service->local().delete_cluster_link(
cluster_link::model::name_t{req.get_name()}, req.get_force()));
Expand Down Expand Up @@ -147,7 +145,6 @@ shadow_link_service_impl::list_shadow_links(
ss::future<proto::admin::update_shadow_link_response>
shadow_link_service_impl::update_shadow_link(
serde::pb::rpc::context ctx, proto::admin::update_shadow_link_request req) {
vlog(sllog.trace, "update_shadow_link: {}", req);
auto redirect_node = redirect_to(model::controller_ntp);
if (redirect_node) {
vlog(
Expand All @@ -161,6 +158,7 @@ shadow_link_service_impl::update_shadow_link(
.update_shadow_link(ctx, std::move(req));
}

vlog(sllog.info, "update_shadow_link: {}", req);
auto link_name = cluster_link::model::name_t{
req.get_shadow_link().get_name()};

Expand All @@ -187,7 +185,6 @@ shadow_link_service_impl::update_shadow_link(
ss::future<proto::admin::fail_over_response>
shadow_link_service_impl::fail_over(
serde::pb::rpc::context ctx, proto::admin::fail_over_request req) {
vlog(sllog.trace, "fail_over_request: {}", req);
auto redirect_node = redirect_to(model::controller_ntp);
if (redirect_node) {
vlog(
Expand All @@ -200,6 +197,7 @@ shadow_link_service_impl::fail_over(
*redirect_node)
.fail_over(ctx, std::move(req));
}
vlog(sllog.info, "fail_over_request: {}", req);
auto link_name = cluster_link::model::name_t{req.get_name()};
auto current_link = handle_error(
_service->local().get_cluster_link(link_name));
Expand Down
57 changes: 41 additions & 16 deletions tests/rptest/tests/cluster_linking_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1787,7 +1787,7 @@ def test_topic_delete(self, source_cluster_spec):
topic = TopicSpec(name="source-topic", partition_count=5, replication_factor=3)

self.source_default_client().create_topic(topic)
shadow_link = self.create_link("test-link")
self.create_link("test-link")

self.target_cluster.service.wait_until(
lambda: self.topic_partitions_exists_in_target(topic),
Expand All @@ -1806,29 +1806,54 @@ def test_topic_delete(self, source_cluster_spec):
):
target_client.delete_topic(topic.name)

shadow_link.configurations.topic_metadata_sync_options.auto_create_shadow_topic_filters.extend(
[
shadow_link_pb2.NameFilter(
pattern_type=shadow_link_pb2.PATTERN_TYPE_LITERAL,
filter_type=shadow_link_pb2.FILTER_TYPE_EXCLUDE,
name=topic.name,
),
]
)
update_mask: google.protobuf.field_mask_pb2.FieldMask = google.protobuf.field_mask_pb2.FieldMask(
paths=[
"configurations.topic_metadata_sync_options.auto_create_shadow_topic_filters"
]
)
self.update_link(shadow_link=shadow_link, update_mask=update_mask)
def update_link_config(include: bool) -> None:
shadow_link = self.get_link("test-link")
shadow_link.configurations.topic_metadata_sync_options.ClearField(
"auto_create_shadow_topic_filters"
)
filter_type = (
shadow_link_pb2.FILTER_TYPE_INCLUDE
if include
else shadow_link_pb2.FILTER_TYPE_EXCLUDE
)
shadow_link.configurations.topic_metadata_sync_options.auto_create_shadow_topic_filters.extend(
[
shadow_link_pb2.NameFilter(
pattern_type=shadow_link_pb2.PATTERN_TYPE_LITERAL,
filter_type=filter_type,
name=topic.name,
),
]
)
update_mask: google.protobuf.field_mask_pb2.FieldMask = (
google.protobuf.field_mask_pb2.FieldMask(
paths=["configurations.topic_metadata_sync_options"]
)
)
self.update_link(shadow_link=shadow_link, update_mask=update_mask)

# Update the link to exclude the topic from autocreation filters
update_link_config(include=False)
# Now the topic should be deletable, as it is not in the autocreate filters
target_client.delete_topic(topic.name)
link_state = self.get_link("test-link")
assert len(link_state.status.shadow_topics) == 0, (
"Expected empty shadow_topic list. "
f"Instead got {link_state.status.shadow_topics}"
)
# Re-add the topic to the autocreation filters
update_link_config(include=True)
# Verify that the shadow topic is re-created
self.target_cluster.service.wait_until(
lambda: self.topic_partitions_exists_in_target(topic),
timeout_sec=30,
backoff_sec=1,
err_msg=f"Topic {topic.name} not found in target cluster after re-adding to autocreation filters",
)

# Replicate more data to ensure replication still works
with self.producer_consumer(topic=topic.name, msg_size=128, msg_cnt=200000):
self.verify()

@cluster(num_nodes=7)
def test_replication_with_transactions(self):
Expand Down
Loading