Skip to content

Commit 44c31fa

Browse files
rgw-admin: Add --format option for bucket sync status
Signed-off-by: Alex Wojno <[email protected]>
1 parent c0451e1 commit 44c31fa

File tree

1 file changed

+184
-46
lines changed

1 file changed

+184
-46
lines changed

src/rgw/rgw_admin.cc

Lines changed: 184 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2549,35 +2549,104 @@ std::ostream& operator<<(std::ostream& out, const indented& h) {
25492549
return out << std::setw(h.w) << h.header << std::setw(1) << ' ';
25502550
}
25512551

2552-
static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* driver, const RGWZone& zone,
2552+
struct bucket_source_sync_info {
2553+
const RGWZone& _source;
2554+
std::string_view error;
2555+
std::map<int,std::string> shards_behind;
2556+
int total_shards;
2557+
std::string_view status;
2558+
rgw_bucket bucket_source;
2559+
2560+
bucket_source_sync_info(const RGWZone& source): _source(source) {}
2561+
2562+
void _print_plaintext(std::ostream& out, int width) const {
2563+
out << indented{width, "source zone"} << _source.id << " (" << _source.name << ")" << std::endl;
2564+
if (!error.empty()) {
2565+
out << indented{width} << error << std::endl;
2566+
return;
2567+
}
2568+
out << indented{width, "source bucket"} << bucket_source << std::endl;
2569+
if (!status.empty()) {
2570+
out << indented{width} << status << std::endl;
2571+
return;
2572+
}
2573+
out << indented{width} << "incremental sync on " << total_shards << " shards\n";
2574+
if (!shards_behind.empty()) {
2575+
out << indented{width} << "bucket is behind on " << shards_behind.size() << " shards\n";
2576+
set<int> shard_ids;
2577+
for (auto const& [shard_id, _] : shards_behind) {
2578+
shard_ids.insert(shard_id);
2579+
}
2580+
out << indented{width} << "behind shards: [" << shard_ids << "]\n";
2581+
} else {
2582+
out << indented{width} << "bucket is caught up with source\n";
2583+
}
2584+
}
2585+
2586+
void _print_formatter(std::ostream& out, Formatter* formatter) const {
2587+
formatter->open_object_section("source");
2588+
formatter->dump_string("source_zone", _source.id);
2589+
formatter->dump_string("source_name", _source.name);
2590+
2591+
if (!error.empty()) {
2592+
formatter->dump_string("error", error);
2593+
formatter->close_section();
2594+
formatter->flush(out);
2595+
return;
2596+
}
2597+
2598+
formatter->dump_string("source_bucket", bucket_source.name);
2599+
formatter->dump_string("source_bucket_id", bucket_source.bucket_id);
2600+
2601+
if (!status.empty()) {
2602+
formatter->dump_string("status", status);
2603+
formatter->close_section();
2604+
formatter->flush(out);
2605+
return;
2606+
}
2607+
2608+
formatter->dump_int("total_shards", total_shards);
2609+
formatter->open_array_section("behind_shards");
2610+
for (auto const& [id, marker] : shards_behind) {
2611+
formatter->open_object_section("shard");
2612+
formatter->dump_int("shard_id", id);
2613+
formatter->dump_string("shard_marker", marker);
2614+
formatter->close_section();
2615+
}
2616+
formatter->close_section();
2617+
formatter->close_section();
2618+
formatter->flush(out);
2619+
}
2620+
};
2621+
2622+
static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* driver,
2623+
const RGWZone& zone,
25532624
const RGWZone& source, RGWRESTConn *conn,
25542625
const RGWBucketInfo& bucket_info,
25552626
rgw_sync_bucket_pipe pipe,
2556-
int width, std::ostream& out)
2627+
bucket_source_sync_info& source_sync_info)
25572628
{
2558-
out << indented{width, "source zone"} << source.id << " (" << source.name << ")" << std::endl;
2559-
25602629
// syncing from this zone?
25612630
if (!driver->svc()->zone->zone_syncs_from(zone, source)) {
2562-
out << indented{width} << "does not sync from zone\n";
2631+
source_sync_info.error = "does not sync from zone";
25632632
return 0;
25642633
}
25652634

25662635
if (!pipe.source.bucket) {
2567-
ldpp_dout(dpp, -1) << __func__ << "(): missing source bucket" << dendl;
2636+
source_sync_info.error = fmt::format("{} (): missing source bucket", __func__);
25682637
return -EINVAL;
25692638
}
25702639

25712640
std::unique_ptr<rgw::sal::Bucket> source_bucket;
25722641
int r = init_bucket(*pipe.source.bucket, &source_bucket);
25732642
if (r < 0) {
2574-
ldpp_dout(dpp, -1) << "failed to read source bucket info: " << cpp_strerror(r) << dendl;
2643+
source_sync_info.error = fmt::format("failed to read source bucket info: {}", cpp_strerror(r));
25752644
return r;
25762645
}
25772646

2578-
out << indented{width, "source bucket"} << source_bucket->get_key() << std::endl;
2579-
pipe.source.bucket = source_bucket->get_key();
2647+
source_sync_info.bucket_source = source_bucket->get_key();
25802648

2649+
pipe.source.bucket = source_bucket->get_key();
25812650
pipe.dest.bucket = bucket_info.bucket;
25822651

25832652
uint64_t gen = 0;
@@ -2588,15 +2657,15 @@ static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::Ra
25882657
r = rgw_read_bucket_full_sync_status(dpp, driver, pipe, &full_status, null_yield);
25892658
if (r >= 0) {
25902659
if (full_status.state == BucketSyncState::Init) {
2591-
out << indented{width} << "init: bucket sync has not started\n";
2660+
source_sync_info.status = "init: bucket sync has not started";
25922661
return 0;
25932662
}
25942663
if (full_status.state == BucketSyncState::Stopped) {
2595-
out << indented{width} << "stopped: bucket sync is disabled\n";
2664+
source_sync_info.status = "stopped: bucket sync is disabled";
25962665
return 0;
25972666
}
25982667
if (full_status.state == BucketSyncState::Full) {
2599-
out << indented{width} << "full sync: " << full_status.full.count << " objects completed\n";
2668+
source_sync_info.status = fmt::format("full sync: {} objects completed", full_status.full.count);
26002669
return 0;
26012670
}
26022671
gen = full_status.incremental_gen;
@@ -2605,68 +2674,63 @@ static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::Ra
26052674
// no full status, but there may be per-shard status from before upgrade
26062675
const auto& logs = source_bucket->get_info().layout.logs;
26072676
if (logs.empty()) {
2608-
out << indented{width} << "init: bucket sync has not started\n";
2677+
source_sync_info.status = "init: bucket sync has not started";
26092678
return 0;
26102679
}
26112680
const auto& log = logs.front();
26122681
if (log.gen > 0) {
26132682
// this isn't the backward-compatible case, so we just haven't started yet
2614-
out << indented{width} << "init: bucket sync has not started\n";
2683+
source_sync_info.status = "init: bucket sync has not started";
26152684
return 0;
26162685
}
26172686
if (log.layout.type != rgw::BucketLogType::InIndex) {
2618-
ldpp_dout(dpp, -1) << "unrecognized log layout type " << log.layout.type << dendl;
2687+
source_sync_info.error = fmt::format("unrecognized log layout type {}", to_string(log.layout.type));
26192688
return -EINVAL;
26202689
}
26212690
// use shard count from our log gen=0
26222691
shard_status.resize(rgw::num_shards(log.layout.in_index));
26232692
} else {
2624-
lderr(driver->ctx()) << "failed to read bucket full sync status: " << cpp_strerror(r) << dendl;
2693+
source_sync_info.error = fmt::format("failed to read bucket full sync status: {}", cpp_strerror(r));
26252694
return r;
26262695
}
26272696

26282697
r = rgw_read_bucket_inc_sync_status(dpp, driver, pipe, gen, &shard_status);
26292698
if (r < 0) {
2630-
lderr(driver->ctx()) << "failed to read bucket incremental sync status: " << cpp_strerror(r) << dendl;
2699+
source_sync_info.error = fmt::format("failed to read bucket incremental sync status: {}", cpp_strerror(r));
26312700
return r;
26322701
}
26332702

26342703
const int total_shards = shard_status.size();
2635-
2636-
out << indented{width} << "incremental sync on " << total_shards << " shards\n";
2704+
source_sync_info.total_shards = total_shards;
26372705

26382706
rgw_bucket_index_marker_info remote_info;
26392707
BucketIndexShardsManager remote_markers;
26402708
r = rgw_read_remote_bilog_info(dpp, conn, source_bucket->get_key(),
26412709
remote_info, remote_markers, null_yield);
26422710
if (r < 0) {
2643-
ldpp_dout(dpp, -1) << "failed to read remote log: " << cpp_strerror(r) << dendl;
2711+
source_sync_info.error = fmt::format("failed to read remote log: {}", cpp_strerror(r));
26442712
return r;
26452713
}
26462714

2647-
std::set<int> shards_behind;
2715+
std::map<int, std::string> shards_behind;
26482716
for (const auto& r : remote_markers.get()) {
26492717
auto shard_id = r.first;
26502718
if (r.second.empty()) {
26512719
continue; // empty bucket index shard
26522720
}
26532721
if (shard_id >= total_shards) {
26542722
// unexpected shard id. we don't have status for it, so we're behind
2655-
shards_behind.insert(shard_id);
2723+
shards_behind[shard_id] = r.second;
26562724
continue;
26572725
}
26582726
auto& m = shard_status[shard_id];
26592727
const auto pos = BucketIndexShardsManager::get_shard_marker(m.inc_marker.position);
26602728
if (pos < r.second) {
2661-
shards_behind.insert(shard_id);
2729+
shards_behind[shard_id] = r.second;
26622730
}
26632731
}
2664-
if (!shards_behind.empty()) {
2665-
out << indented{width} << "bucket is behind on " << shards_behind.size() << " shards\n";
2666-
out << indented{width} << "behind shards: [" << shards_behind << "]\n";
2667-
} else {
2668-
out << indented{width} << "bucket is caught up with source\n";
2669-
}
2732+
2733+
source_sync_info.shards_behind = std::move(shards_behind);
26702734
return 0;
26712735
}
26722736

@@ -2877,33 +2941,90 @@ static int bucket_sync_info(rgw::sal::Driver* driver, const RGWBucketInfo& info,
28772941
return 0;
28782942
}
28792943

2944+
struct bucket_sync_status_info {
2945+
std::vector<bucket_source_sync_info> source_status_info;
2946+
rgw::sal::Zone* _zone;
2947+
const rgw::sal::ZoneGroup* _zonegroup;
2948+
const RGWBucketInfo& _bucket_info;
2949+
const int width = 15;
2950+
std::string error;
2951+
2952+
bucket_sync_status_info(const RGWBucketInfo& bucket_info): _bucket_info(bucket_info) {}
2953+
2954+
void print(std::ostream& out, bool use_formatter, Formatter* formatter) {
2955+
if (use_formatter) {
2956+
_print_formatter(out, formatter);
2957+
} else {
2958+
_print_plaintext(out);
2959+
}
2960+
}
2961+
2962+
void _print_plaintext(std::ostream& out) {
2963+
out << indented{width, "realm"} << _zone->get_realm_id() << " (" << _zone->get_realm_name() << ")" << std::endl;
2964+
out << indented{width, "zonegroup"} << _zonegroup->get_id() << " (" << _zonegroup->get_name() << ")" << std::endl;
2965+
out << indented{width, "zone"} << _zone->get_id() << " (" << _zone->get_name() << ")" << std::endl;
2966+
out << indented{width, "bucket"} << _bucket_info.bucket << std::endl;
2967+
out << indented{width, "current time"}
2968+
<< to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms) << "\n\n";
2969+
2970+
if (!error.empty()){
2971+
out << error << std::endl;
2972+
}
2973+
2974+
for (const auto &info : source_status_info) {
2975+
info._print_plaintext(out, width);
2976+
}
2977+
}
2978+
2979+
void _print_formatter(std::ostream& out, Formatter* formatter) {
2980+
formatter->open_object_section("test");
2981+
formatter->dump_string("realm", _zone->get_realm_id());
2982+
formatter->dump_string("realm_name", _zone->get_realm_name());
2983+
formatter->dump_string("zonegroup", _zonegroup->get_id());
2984+
formatter->dump_string("zonegroup_name", _zonegroup->get_name());
2985+
formatter->dump_string("zone", _zone->get_id());
2986+
formatter->dump_string("zone_name", _zone->get_name());
2987+
formatter->dump_string("bucket", _bucket_info.bucket.name);
2988+
formatter->dump_string("bucket_instance_id", _bucket_info.bucket.bucket_id);
2989+
formatter->dump_string("current_time", to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms));
2990+
2991+
if (!error.empty()) {
2992+
formatter->dump_string("error", error);
2993+
}
2994+
2995+
formatter->open_array_section("sources");
2996+
for (const auto &info : source_status_info) {
2997+
info._print_formatter(out, formatter);
2998+
}
2999+
formatter->close_section();
3000+
3001+
formatter->close_section();
3002+
formatter->flush(out);
3003+
}
3004+
3005+
};
3006+
28803007
static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& info,
28813008
const rgw_zone_id& source_zone_id,
28823009
std::optional<rgw_bucket>& opt_source_bucket,
2883-
std::ostream& out)
3010+
bucket_sync_status_info& bucket_sync_info)
28843011
{
28853012
const rgw::sal::ZoneGroup& zonegroup = driver->get_zone()->get_zonegroup();
28863013
rgw::sal::Zone* zone = driver->get_zone();
2887-
constexpr int width = 15;
2888-
2889-
out << indented{width, "realm"} << zone->get_realm_id() << " (" << zone->get_realm_name() << ")\n";
2890-
out << indented{width, "zonegroup"} << zonegroup.get_id() << " (" << zonegroup.get_name() << ")\n";
2891-
out << indented{width, "zone"} << zone->get_id() << " (" << zone->get_name() << ")\n";
2892-
out << indented{width, "bucket"} << info.bucket << "\n";
2893-
out << indented{width, "current time"}
2894-
<< to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms) << "\n\n";
28953014

3015+
bucket_sync_info._zone = zone;
3016+
bucket_sync_info._zonegroup = &zonegroup;
28963017

28973018
if (!static_cast<rgw::sal::RadosStore*>(driver)->ctl()->bucket->bucket_imports_data(info.bucket, null_yield, dpp())) {
2898-
out << "Sync is disabled for bucket " << info.bucket.name << " or bucket has no sync sources" << std::endl;
3019+
bucket_sync_info.error = fmt::format("Sync is disabled for bucket {} or bucket has no sync sources", info.bucket.name);
28993020
return 0;
29003021
}
29013022

29023023
RGWBucketSyncPolicyHandlerRef handler;
29033024

29043025
int r = driver->get_sync_policy_handler(dpp(), std::nullopt, info.bucket, &handler, null_yield);
29053026
if (r < 0) {
2906-
ldpp_dout(dpp(), -1) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl;
3027+
bucket_sync_info.error = fmt::format("ERROR: failed to get policy handler for bucket ({}): r={}: {}", info.bucket.name, r, cpp_strerror(-r));
29073028
return r;
29083029
}
29093030

@@ -2916,13 +3037,12 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
29163037
std::unique_ptr<rgw::sal::Zone> zone;
29173038
int ret = driver->get_zone()->get_zonegroup().get_zone_by_id(source_zone_id.id, &zone);
29183039
if (ret < 0) {
2919-
ldpp_dout(dpp(), -1) << "Source zone not found in zonegroup "
2920-
<< zonegroup.get_name() << dendl;
3040+
bucket_sync_info.error = fmt::format("Source zone not found in zonegroup {}", zonegroup.get_name());
29213041
return -EINVAL;
29223042
}
29233043
auto c = zone_conn_map.find(source_zone_id);
29243044
if (c == zone_conn_map.end()) {
2925-
ldpp_dout(dpp(), -1) << "No connection to zone " << zone->get_name() << dendl;
3045+
bucket_sync_info.error = fmt::format("No connection to zone {}", zone->get_name());
29263046
return -EINVAL;
29273047
}
29283048
zone_ids.insert(source_zone_id);
@@ -2953,10 +3073,15 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
29533073
continue;
29543074
}
29553075
if (pipe.source.zone.value_or(rgw_zone_id()) == z->second.id) {
2956-
bucket_source_sync_status(dpp(), static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zone(), z->second,
3076+
bucket_source_sync_info source_sync_info(z->second);
3077+
auto ret = bucket_source_sync_status(dpp(), static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zone(), z->second,
29573078
c->second,
29583079
info, pipe,
2959-
width, out);
3080+
source_sync_info);
3081+
3082+
if (ret == 0) {
3083+
bucket_sync_info.source_status_info.emplace_back(std::move(source_sync_info));
3084+
}
29603085
}
29613086
}
29623087
}
@@ -3484,6 +3609,7 @@ int main(int argc, const char **argv)
34843609
list<string> tags_rm;
34853610
int placement_inline_data = true;
34863611
bool placement_inline_data_specified = false;
3612+
bool format_arg_passed = false;
34873613

34883614
int64_t max_objects = -1;
34893615
int64_t max_size = -1;
@@ -3863,6 +3989,7 @@ int main(int argc, const char **argv)
38633989
new_bucket_name = val;
38643990
} else if (ceph_argparse_witharg(args, i, &val, "--format", (char*)NULL)) {
38653991
format = val;
3992+
format_arg_passed = true;
38663993
} else if (ceph_argparse_witharg(args, i, &val, "--categories", (char*)NULL)) {
38673994
string cat_str = val;
38683995
list<string> cat_list;
@@ -9845,7 +9972,18 @@ int main(int argc, const char **argv)
98459972
if (ret < 0) {
98469973
return -ret;
98479974
}
9848-
bucket_sync_status(driver, bucket->get_info(), source_zone, opt_source_bucket, std::cout);
9975+
9976+
auto bucket_info = bucket->get_info();
9977+
bucket_sync_status_info bucket_sync_info(bucket_info);
9978+
9979+
ret = bucket_sync_status(driver, bucket_info, source_zone,
9980+
opt_source_bucket, bucket_sync_info);
9981+
9982+
if (ret == 0) {
9983+
bucket_sync_info.print(std::cout, format_arg_passed, formatter.get());
9984+
} else {
9985+
cerr << "failed to get bucket sync status. see logs for more info" << std::endl;
9986+
}
98499987
}
98509988

98519989
if (opt_cmd == OPT::BUCKET_SYNC_MARKERS) {

0 commit comments

Comments
 (0)