Skip to content

Commit 533c820

Browse files
committed
[#28910] xCluster: Add safe time to UI
Summary: Display the xCluster safe time in yb-master xCluster UI page to improve debugging capabilities. New UI output {image uri=https://github.com/user-attachments/assets/bcd96f4f-fa5a-44d5-9b50-2d828c43d277, width=500px} Fixes #28910 Fixes #28806 Jira: DB-18633 Test Plan: Manually tested Reviewers: jhe Reviewed By: jhe Subscribers: ybase Differential Revision: https://phorge.dev.yugabyte.com/D45287
1 parent a7ef68a commit 533c820

10 files changed

+66
-43
lines changed

src/yb/master/master-path-handlers.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3130,8 +3130,7 @@ void MasterPathHandlers::HandleXCluster(
31303130

31313131
output << "<pre class=\"prettyprint\">"
31323132
<< "state: " << inbound_replication_group.state
3133-
<< "\ndisable_stream: " << BoolToString(inbound_replication_group.disable_stream)
3134-
<< "\ntype: "
3133+
<< (inbound_replication_group.disable_stream ? " (DISABLED)" : "") << "\ntype: "
31353134
<< xcluster::ShortReplicationType(inbound_replication_group.replication_type)
31363135
<< "\nmaster_addrs: " << inbound_replication_group.master_addrs;
31373136
if (!inbound_replication_group.db_scoped_info.empty()) {

src/yb/master/xcluster/add_table_to_xcluster_target_task.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ AddTableToXClusterTargetTask::GetXClusterSafeTimeWithoutDdlQueue() {
210210
const auto namespace_id = table_info_->namespace_id();
211211

212212
auto safe_time_res = xcluster_manager_.GetXClusterSafeTimeForNamespace(
213-
epoch_, namespace_id, XClusterSafeTimeFilter::DDL_QUEUE);
213+
namespace_id, XClusterSafeTimeFilter::DDL_QUEUE);
214214
if (!safe_time_res) {
215215
if (!safe_time_res.status().IsNotFound()) {
216216
return safe_time_res.status();

src/yb/master/xcluster/xcluster_manager.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,8 @@ Status XClusterManager::GetXClusterSafeTimeForNamespace(
344344
}
345345

346346
Result<HybridTime> XClusterManager::GetXClusterSafeTimeForNamespace(
347-
const LeaderEpoch& epoch, const NamespaceId& namespace_id,
348-
const XClusterSafeTimeFilter& filter) {
349-
return XClusterTargetManager::GetXClusterSafeTimeForNamespace(epoch, namespace_id, filter);
347+
const NamespaceId& namespace_id, const XClusterSafeTimeFilter& filter) const {
348+
return XClusterTargetManager::GetXClusterSafeTimeForNamespace(namespace_id, filter);
350349
}
351350

352351
Status XClusterManager::RefreshXClusterSafeTimeMap(const LeaderEpoch& epoch) {

src/yb/master/xcluster/xcluster_manager.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ class XClusterManager : public XClusterManagerIf,
109109
GetXClusterSafeTimeForNamespaceResponsePB* resp, rpc::RpcContext* rpc,
110110
const LeaderEpoch& epoch);
111111
Result<HybridTime> GetXClusterSafeTimeForNamespace(
112-
const LeaderEpoch& epoch, const NamespaceId& namespace_id,
113-
const XClusterSafeTimeFilter& filter) override;
112+
const NamespaceId& namespace_id, const XClusterSafeTimeFilter& filter) const override;
114113

115114
Status RefreshXClusterSafeTimeMap(const LeaderEpoch& epoch) override;
116115

src/yb/master/xcluster/xcluster_manager_if.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ class XClusterManagerIf {
5151
virtual Status SetXClusterNamespaceToSafeTimeMap(
5252
const int64_t leader_term, const XClusterNamespaceToSafeTimeMap& safe_time_map) = 0;
5353
virtual Result<HybridTime> GetXClusterSafeTimeForNamespace(
54-
const LeaderEpoch& epoch, const NamespaceId& namespace_id,
55-
const XClusterSafeTimeFilter& filter) = 0;
54+
const NamespaceId& namespace_id, const XClusterSafeTimeFilter& filter) const = 0;
5655
virtual Status MarkIndexBackfillCompleted(
5756
const std::unordered_set<TableId>& index_ids, const LeaderEpoch& epoch) = 0;
5857

src/yb/master/xcluster/xcluster_safe_time_service-test.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class XClusterSafeTimeServiceMocked : public XClusterSafeTimeService {
5858
return OK();
5959
}
6060

61-
XClusterNamespaceToSafeTimeMap GetXClusterNamespaceToSafeTimeMap() override {
61+
XClusterNamespaceToSafeTimeMap GetXClusterNamespaceToSafeTimeMap() const override {
6262
return safe_time_map_;
6363
}
6464

@@ -133,13 +133,13 @@ class XClusterSafeTimeServiceTest : public YBTest {
133133
Result<HybridTime> GetXClusterSafeTimeWithNoFilter(
134134
XClusterSafeTimeServiceMocked& safe_time_service, const NamespaceId& namespace_id) {
135135
return safe_time_service.GetXClusterSafeTimeForNamespace(
136-
dummy_leader_term, namespace_id, XClusterSafeTimeFilter::NONE);
136+
namespace_id, XClusterSafeTimeFilter::NONE);
137137
}
138138

139139
Result<HybridTime> GetXClusterSafeTimeFilterOutDdlQueue(
140140
XClusterSafeTimeServiceMocked& safe_time_service, const NamespaceId& namespace_id) {
141141
return safe_time_service.GetXClusterSafeTimeForNamespace(
142-
dummy_leader_term, namespace_id, XClusterSafeTimeFilter::DDL_QUEUE);
142+
namespace_id, XClusterSafeTimeFilter::DDL_QUEUE);
143143
}
144144
};
145145

src/yb/master/xcluster/xcluster_safe_time_service.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,11 @@ void XClusterSafeTimeService::ProcessTaskPeriodically() {
133133
}
134134
}
135135

136-
Status XClusterSafeTimeService::GetXClusterSafeTimeInfoFromMap(
137-
const LeaderEpoch& epoch, GetXClusterSafeTimeResponsePB* resp) {
136+
Status XClusterSafeTimeService::ComputeAndGetXClusterSafeTimeInfo(
137+
const LeaderEpoch& epoch, GetXClusterSafeTimeResponsePB& resp) {
138138
// Recompute safe times again before fetching maps.
139139
RETURN_NOT_OK(ComputeSafeTime(epoch.leader_term));
140+
140141
const auto& current_safe_time_map = GetXClusterNamespaceToSafeTimeMap();
141142
XClusterNamespaceToSafeTimeMap max_safe_time_map;
142143
{
@@ -147,7 +148,7 @@ Status XClusterSafeTimeService::GetXClusterSafeTimeInfoFromMap(
147148

148149
for (const auto& [namespace_id, safe_time] : current_safe_time_map) {
149150
// First set all the current safe time values.
150-
auto entry = resp->add_namespace_safe_times();
151+
auto entry = resp.add_namespace_safe_times();
151152
entry->set_namespace_id(namespace_id);
152153
entry->set_safe_time_ht(safe_time.ToUint64());
153154
// Safe time lag is calculated as (current time - current safe time).
@@ -177,7 +178,7 @@ Status XClusterSafeTimeService::GetXClusterSafeTimeInfoFromMap(
177178
}
178179

179180
Result<XClusterNamespaceToSafeTimeMap> XClusterSafeTimeService::GetFilteredXClusterSafeTimeMap(
180-
const XClusterSafeTimeFilter& filter) {
181+
const XClusterSafeTimeFilter& filter) const {
181182
switch (filter) {
182183
case XClusterSafeTimeFilter::NONE:
183184
return GetXClusterNamespaceToSafeTimeMap();
@@ -189,8 +190,7 @@ Result<XClusterNamespaceToSafeTimeMap> XClusterSafeTimeService::GetFilteredXClus
189190

190191
// If the filter removes all tables, then this returns master leader's safe time.
191192
Result<HybridTime> XClusterSafeTimeService::GetXClusterSafeTimeForNamespace(
192-
const int64_t leader_term, const NamespaceId& namespace_id,
193-
const XClusterSafeTimeFilter& filter) {
193+
const NamespaceId& namespace_id, const XClusterSafeTimeFilter& filter) const {
194194
SharedLock lock(mutex_);
195195
SCHECK(safe_time_table_ready_, IllegalState, "Safe time table is not ready yet.");
196196

@@ -603,7 +603,7 @@ Result<bool> XClusterSafeTimeService::CreateTableRequired() {
603603
return !producer_tablet_namespace_map_.empty();
604604
}
605605

606-
XClusterNamespaceToSafeTimeMap XClusterSafeTimeService::GetXClusterNamespaceToSafeTimeMap() {
606+
XClusterNamespaceToSafeTimeMap XClusterSafeTimeService::GetXClusterNamespaceToSafeTimeMap() const {
607607
return master_->xcluster_manager()->GetXClusterNamespaceToSafeTimeMap();
608608
}
609609

src/yb/master/xcluster/xcluster_safe_time_service.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,12 @@ class XClusterSafeTimeService {
7373
Result<std::unordered_map<NamespaceId, uint64_t>> GetEstimatedDataLossMicroSec(
7474
const LeaderEpoch& epoch);
7575

76-
Status GetXClusterSafeTimeInfoFromMap(
77-
const LeaderEpoch& epoch, GetXClusterSafeTimeResponsePB* resp);
76+
// Computes the safe time map and fills it in the response.
77+
Status ComputeAndGetXClusterSafeTimeInfo(
78+
const LeaderEpoch& epoch, GetXClusterSafeTimeResponsePB& resp);
7879

7980
Result<HybridTime> GetXClusterSafeTimeForNamespace(
80-
const int64_t leader_term, const NamespaceId& namespace_id,
81-
const XClusterSafeTimeFilter& filter);
81+
const NamespaceId& namespace_id, const XClusterSafeTimeFilter& filter) const;
8282

8383
xcluster::XClusterConsumerClusterMetrics* TEST_GetMetricsForNamespace(
8484
const NamespaceId& namespace_id);
@@ -106,7 +106,7 @@ class XClusterSafeTimeService {
106106

107107
virtual Result<bool> CreateTableRequired() REQUIRES(mutex_);
108108

109-
virtual XClusterNamespaceToSafeTimeMap GetXClusterNamespaceToSafeTimeMap();
109+
virtual XClusterNamespaceToSafeTimeMap GetXClusterNamespaceToSafeTimeMap() const;
110110

111111
virtual Status SetXClusterSafeTime(
112112
const int64_t leader_term, const XClusterNamespaceToSafeTimeMap& new_safe_time_map);
@@ -125,15 +125,15 @@ class XClusterSafeTimeService {
125125
void EnterIdleMode(const std::string& reason);
126126

127127
Result<XClusterNamespaceToSafeTimeMap> GetFilteredXClusterSafeTimeMap(
128-
const XClusterSafeTimeFilter& filter) REQUIRES_SHARED(mutex_);
128+
const XClusterSafeTimeFilter& filter) const REQUIRES_SHARED(mutex_);
129129

130130
Master* const master_;
131131
CatalogManager* const catalog_manager_;
132132

133133
rpc::Poller poller_;
134134
std::optional<boost::asio::io_context::strand> poll_strand_;
135135

136-
std::shared_mutex mutex_;
136+
mutable std::shared_mutex mutex_;
137137
bool safe_time_table_ready_ GUARDED_BY(mutex_) = false;
138138

139139
std::unique_ptr<client::TableHandle> safe_time_table_;

src/yb/master/xcluster/xcluster_target_manager.cc

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ void XClusterTargetManager::CreateXClusterSafeTimeTableAndStartService() {
155155
Status XClusterTargetManager::GetXClusterSafeTime(
156156
GetXClusterSafeTimeResponsePB* resp, const LeaderEpoch& epoch) {
157157
RETURN_NOT_OK_SET_CODE(
158-
safe_time_service_->GetXClusterSafeTimeInfoFromMap(epoch, resp),
158+
safe_time_service_->ComputeAndGetXClusterSafeTimeInfo(epoch, *resp),
159159
MasterError(MasterErrorPB::INTERNAL_ERROR));
160160

161161
// Also fill out the namespace_name for each entry.
@@ -199,16 +199,14 @@ Status XClusterTargetManager::GetXClusterSafeTimeForNamespace(
199199

200200
RETURN_NOT_OK(safe_time_service_->ComputeSafeTime(epoch.leader_term));
201201
auto safe_time_ht =
202-
VERIFY_RESULT(GetXClusterSafeTimeForNamespace(epoch, req->namespace_id(), req->filter()));
202+
VERIFY_RESULT(GetXClusterSafeTimeForNamespace(req->namespace_id(), req->filter()));
203203
resp->set_safe_time_ht(safe_time_ht.ToUint64());
204204
return Status::OK();
205205
}
206206

207207
Result<HybridTime> XClusterTargetManager::GetXClusterSafeTimeForNamespace(
208-
const LeaderEpoch& epoch, const NamespaceId& namespace_id,
209-
const XClusterSafeTimeFilter& filter) {
210-
return safe_time_service_->GetXClusterSafeTimeForNamespace(
211-
epoch.leader_term, namespace_id, filter);
208+
const NamespaceId& namespace_id, const XClusterSafeTimeFilter& filter) const {
209+
return safe_time_service_->GetXClusterSafeTimeForNamespace(namespace_id, filter);
212210
}
213211

214212
Status XClusterTargetManager::RefreshXClusterSafeTimeMap(const LeaderEpoch& epoch) {
@@ -429,9 +427,11 @@ XClusterTargetManager::GetXClusterStatus() const {
429427
const auto cluster_config_pb = VERIFY_RESULT(catalog_manager_.GetClusterConfig());
430428
const auto replication_infos = catalog_manager_.GetAllXClusterUniverseReplicationInfos();
431429

430+
const auto namespace_safe_time = GetXClusterNamespaceToSafeTimeMap();
431+
432432
for (const auto& replication_info : replication_infos) {
433-
auto replication_group_status =
434-
VERIFY_RESULT(GetUniverseReplicationInfo(replication_info, cluster_config_pb));
433+
auto replication_group_status = VERIFY_RESULT(
434+
GetUniverseReplicationInfo(replication_info, cluster_config_pb, namespace_safe_time));
435435

436436
for (auto& [_, tables] : replication_group_status.table_statuses_by_namespace) {
437437
for (auto& table : tables) {
@@ -454,7 +454,8 @@ XClusterTargetManager::GetXClusterStatus() const {
454454

455455
Result<XClusterInboundReplicationGroupStatus> XClusterTargetManager::GetUniverseReplicationInfo(
456456
const SysUniverseReplicationEntryPB& replication_info_pb,
457-
const SysClusterConfigEntryPB& cluster_config_pb) const {
457+
const SysClusterConfigEntryPB& cluster_config_pb,
458+
const XClusterNamespaceToSafeTimeMap& namespace_safe_time) const {
458459
XClusterInboundReplicationGroupStatus result;
459460
result.replication_group_id =
460461
xcluster::ReplicationGroupId(replication_info_pb.replication_group_id());
@@ -475,10 +476,18 @@ Result<XClusterInboundReplicationGroupStatus> XClusterTargetManager::GetUniverse
475476
for (const auto& namespace_info : replication_info_pb.db_scoped_info().namespace_infos()) {
476477
result.db_scope_namespace_id_map[namespace_info.consumer_namespace_id()] =
477478
namespace_info.producer_namespace_id();
479+
478480
result.db_scoped_info += Format(
479481
"\n namespace: $0\n consumer_namespace_id: $1\n producer_namespace_id: $2",
480482
catalog_manager_.GetNamespaceName(namespace_info.consumer_namespace_id()),
481483
namespace_info.consumer_namespace_id(), namespace_info.producer_namespace_id());
484+
485+
auto safe_time_info = FindOrNull(namespace_safe_time, namespace_info.consumer_namespace_id());
486+
if (safe_time_info) {
487+
result.db_scoped_info += Format(
488+
"\n safe_time: $0",
489+
Timestamp(safe_time_info->GetPhysicalValueMicros()).ToHumanReadableTime());
490+
}
482491
}
483492
}
484493

@@ -566,6 +575,23 @@ Status XClusterTargetManager::PopulateXClusterStatusJson(JsonWriter& jw) const {
566575
}
567576
jw.EndArray();
568577

578+
const auto safe_time_map = GetXClusterNamespaceToSafeTimeMap();
579+
if (!safe_time_map.empty()) {
580+
jw.String("safe_time_map");
581+
jw.StartArray();
582+
for (const auto& [namespace_id, safe_time] : safe_time_map) {
583+
jw.StartObject();
584+
jw.String("consumer_namespace_id");
585+
jw.String(namespace_id);
586+
587+
jw.String("safe_time");
588+
jw.String(safe_time.ToString());
589+
590+
jw.EndObject();
591+
}
592+
jw.EndArray();
593+
}
594+
569595
jw.String("consumer_registry");
570596
jw.Protobuf(cluster_config.consumer_registry());
571597

@@ -602,10 +628,12 @@ Result<XClusterInboundReplicationGroupStatus> XClusterTargetManager::GetUniverse
602628
auto replication_info = catalog_manager_.GetUniverseReplication(replication_group_id);
603629
SCHECK_FORMAT(replication_info, NotFound, "Replication group $0 not found", replication_group_id);
604630

631+
static const XClusterNamespaceToSafeTimeMap empty_namespace_safe_time;
632+
605633
const auto cluster_config = VERIFY_RESULT(catalog_manager_.GetClusterConfig());
606634
// Make pb copy to avoid potential deadlock while calling GetUniverseReplicationInfo.
607635
auto pb = replication_info->LockForRead()->pb;
608-
return GetUniverseReplicationInfo(pb, cluster_config);
636+
return GetUniverseReplicationInfo(pb, cluster_config, empty_namespace_safe_time);
609637
}
610638

611639
Status XClusterTargetManager::ClearXClusterFieldsAfterYsqlDDL(

src/yb/master/xcluster/xcluster_target_manager.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ class TSHeartbeatRequestPB;
2828
class TSHeartbeatResponsePB;
2929

3030
class PostTabletCreateTaskBase;
31-
class XClusterInboundReplicationGroupSetupTaskIf;
3231
class UniverseReplicationInfo;
32+
class XClusterConsumerReplicationStatusPB;
33+
class XClusterInboundReplicationGroupSetupTaskIf;
3334
class XClusterSafeTimeService;
3435
struct XClusterInboundReplicationGroupStatus;
35-
class XClusterConsumerReplicationStatusPB;
3636
struct XClusterSetupUniverseReplicationData;
3737

3838
class XClusterTargetManager {
@@ -54,8 +54,7 @@ class XClusterTargetManager {
5454
GetXClusterSafeTimeForNamespaceResponsePB* resp, const LeaderEpoch& epoch);
5555

5656
Result<HybridTime> GetXClusterSafeTimeForNamespace(
57-
const LeaderEpoch& epoch, const NamespaceId& namespace_id,
58-
const XClusterSafeTimeFilter& filter);
57+
const NamespaceId& namespace_id, const XClusterSafeTimeFilter& filter) const;
5958

6059
Status RefreshXClusterSafeTimeMap(const LeaderEpoch& epoch);
6160

@@ -230,12 +229,12 @@ class XClusterTargetManager {
230229
// table statuses.
231230
Result<XClusterInboundReplicationGroupStatus> GetUniverseReplicationInfo(
232231
const SysUniverseReplicationEntryPB& replication_info_pb,
233-
const SysClusterConfigEntryPB& cluster_config_pb) const;
232+
const SysClusterConfigEntryPB& cluster_config_pb,
233+
const XClusterNamespaceToSafeTimeMap& namespace_safe_time) const;
234234

235235
Status RefreshLocalAutoFlagConfig(const LeaderEpoch& epoch);
236236
Status DoRefreshLocalAutoFlagConfig(const LeaderEpoch& epoch);
237237

238-
239238
// Populate the response with the errors for the given replication group.
240239
Status PopulateReplicationGroupErrors(
241240
const xcluster::ReplicationGroupId& replication_group_id,

0 commit comments

Comments
 (0)