Skip to content

Commit e84c7fc

Browse files
authored
Merge pull request ceph#60676 from awojno-bloomberg/rgw-admin-json
rgw-admin: Add --format option for bucket sync status Reviewed-by: Adam Emerson <[email protected]>
2 parents 954ca45 + 44c31fa commit e84c7fc

File tree

1 file changed

+184
-46
lines changed

1 file changed

+184
-46
lines changed

src/rgw/rgw_admin.cc

Lines changed: 184 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2553,35 +2553,104 @@ std::ostream& operator<<(std::ostream& out, const indented& h) {
25532553
return out << std::setw(h.w) << h.header << std::setw(1) << ' ';
25542554
}
25552555

2556-
static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* driver, const RGWZone& zone,
2556+
struct bucket_source_sync_info {
2557+
const RGWZone& _source;
2558+
std::string_view error;
2559+
std::map<int,std::string> shards_behind;
2560+
int total_shards;
2561+
std::string_view status;
2562+
rgw_bucket bucket_source;
2563+
2564+
bucket_source_sync_info(const RGWZone& source): _source(source) {}
2565+
2566+
void _print_plaintext(std::ostream& out, int width) const {
2567+
out << indented{width, "source zone"} << _source.id << " (" << _source.name << ")" << std::endl;
2568+
if (!error.empty()) {
2569+
out << indented{width} << error << std::endl;
2570+
return;
2571+
}
2572+
out << indented{width, "source bucket"} << bucket_source << std::endl;
2573+
if (!status.empty()) {
2574+
out << indented{width} << status << std::endl;
2575+
return;
2576+
}
2577+
out << indented{width} << "incremental sync on " << total_shards << " shards\n";
2578+
if (!shards_behind.empty()) {
2579+
out << indented{width} << "bucket is behind on " << shards_behind.size() << " shards\n";
2580+
set<int> shard_ids;
2581+
for (auto const& [shard_id, _] : shards_behind) {
2582+
shard_ids.insert(shard_id);
2583+
}
2584+
out << indented{width} << "behind shards: [" << shard_ids << "]\n";
2585+
} else {
2586+
out << indented{width} << "bucket is caught up with source\n";
2587+
}
2588+
}
2589+
2590+
void _print_formatter(std::ostream& out, Formatter* formatter) const {
2591+
formatter->open_object_section("source");
2592+
formatter->dump_string("source_zone", _source.id);
2593+
formatter->dump_string("source_name", _source.name);
2594+
2595+
if (!error.empty()) {
2596+
formatter->dump_string("error", error);
2597+
formatter->close_section();
2598+
formatter->flush(out);
2599+
return;
2600+
}
2601+
2602+
formatter->dump_string("source_bucket", bucket_source.name);
2603+
formatter->dump_string("source_bucket_id", bucket_source.bucket_id);
2604+
2605+
if (!status.empty()) {
2606+
formatter->dump_string("status", status);
2607+
formatter->close_section();
2608+
formatter->flush(out);
2609+
return;
2610+
}
2611+
2612+
formatter->dump_int("total_shards", total_shards);
2613+
formatter->open_array_section("behind_shards");
2614+
for (auto const& [id, marker] : shards_behind) {
2615+
formatter->open_object_section("shard");
2616+
formatter->dump_int("shard_id", id);
2617+
formatter->dump_string("shard_marker", marker);
2618+
formatter->close_section();
2619+
}
2620+
formatter->close_section();
2621+
formatter->close_section();
2622+
formatter->flush(out);
2623+
}
2624+
};
2625+
2626+
static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* driver,
2627+
const RGWZone& zone,
25572628
const RGWZone& source, RGWRESTConn *conn,
25582629
const RGWBucketInfo& bucket_info,
25592630
rgw_sync_bucket_pipe pipe,
2560-
int width, std::ostream& out)
2631+
bucket_source_sync_info& source_sync_info)
25612632
{
2562-
out << indented{width, "source zone"} << source.id << " (" << source.name << ")" << std::endl;
2563-
25642633
// syncing from this zone?
25652634
if (!driver->svc()->zone->zone_syncs_from(zone, source)) {
2566-
out << indented{width} << "does not sync from zone\n";
2635+
source_sync_info.error = "does not sync from zone";
25672636
return 0;
25682637
}
25692638

25702639
if (!pipe.source.bucket) {
2571-
ldpp_dout(dpp, -1) << __func__ << "(): missing source bucket" << dendl;
2640+
source_sync_info.error = fmt::format("{} (): missing source bucket", __func__);
25722641
return -EINVAL;
25732642
}
25742643

25752644
std::unique_ptr<rgw::sal::Bucket> source_bucket;
25762645
int r = init_bucket(*pipe.source.bucket, &source_bucket);
25772646
if (r < 0) {
2578-
ldpp_dout(dpp, -1) << "failed to read source bucket info: " << cpp_strerror(r) << dendl;
2647+
source_sync_info.error = fmt::format("failed to read source bucket info: {}", cpp_strerror(r));
25792648
return r;
25802649
}
25812650

2582-
out << indented{width, "source bucket"} << source_bucket->get_key() << std::endl;
2583-
pipe.source.bucket = source_bucket->get_key();
2651+
source_sync_info.bucket_source = source_bucket->get_key();
25842652

2653+
pipe.source.bucket = source_bucket->get_key();
25852654
pipe.dest.bucket = bucket_info.bucket;
25862655

25872656
uint64_t gen = 0;
@@ -2592,15 +2661,15 @@ static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::Ra
25922661
r = rgw_read_bucket_full_sync_status(dpp, driver, pipe, &full_status, null_yield);
25932662
if (r >= 0) {
25942663
if (full_status.state == BucketSyncState::Init) {
2595-
out << indented{width} << "init: bucket sync has not started\n";
2664+
source_sync_info.status = "init: bucket sync has not started";
25962665
return 0;
25972666
}
25982667
if (full_status.state == BucketSyncState::Stopped) {
2599-
out << indented{width} << "stopped: bucket sync is disabled\n";
2668+
source_sync_info.status = "stopped: bucket sync is disabled";
26002669
return 0;
26012670
}
26022671
if (full_status.state == BucketSyncState::Full) {
2603-
out << indented{width} << "full sync: " << full_status.full.count << " objects completed\n";
2672+
source_sync_info.status = fmt::format("full sync: {} objects completed", full_status.full.count);
26042673
return 0;
26052674
}
26062675
gen = full_status.incremental_gen;
@@ -2609,68 +2678,63 @@ static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::Ra
26092678
// no full status, but there may be per-shard status from before upgrade
26102679
const auto& logs = source_bucket->get_info().layout.logs;
26112680
if (logs.empty()) {
2612-
out << indented{width} << "init: bucket sync has not started\n";
2681+
source_sync_info.status = "init: bucket sync has not started";
26132682
return 0;
26142683
}
26152684
const auto& log = logs.front();
26162685
if (log.gen > 0) {
26172686
// this isn't the backward-compatible case, so we just haven't started yet
2618-
out << indented{width} << "init: bucket sync has not started\n";
2687+
source_sync_info.status = "init: bucket sync has not started";
26192688
return 0;
26202689
}
26212690
if (log.layout.type != rgw::BucketLogType::InIndex) {
2622-
ldpp_dout(dpp, -1) << "unrecognized log layout type " << log.layout.type << dendl;
2691+
source_sync_info.error = fmt::format("unrecognized log layout type {}", to_string(log.layout.type));
26232692
return -EINVAL;
26242693
}
26252694
// use shard count from our log gen=0
26262695
shard_status.resize(rgw::num_shards(log.layout.in_index));
26272696
} else {
2628-
lderr(driver->ctx()) << "failed to read bucket full sync status: " << cpp_strerror(r) << dendl;
2697+
source_sync_info.error = fmt::format("failed to read bucket full sync status: {}", cpp_strerror(r));
26292698
return r;
26302699
}
26312700

26322701
r = rgw_read_bucket_inc_sync_status(dpp, driver, pipe, gen, &shard_status);
26332702
if (r < 0) {
2634-
lderr(driver->ctx()) << "failed to read bucket incremental sync status: " << cpp_strerror(r) << dendl;
2703+
source_sync_info.error = fmt::format("failed to read bucket incremental sync status: {}", cpp_strerror(r));
26352704
return r;
26362705
}
26372706

26382707
const int total_shards = shard_status.size();
2639-
2640-
out << indented{width} << "incremental sync on " << total_shards << " shards\n";
2708+
source_sync_info.total_shards = total_shards;
26412709

26422710
rgw_bucket_index_marker_info remote_info;
26432711
BucketIndexShardsManager remote_markers;
26442712
r = rgw_read_remote_bilog_info(dpp, conn, source_bucket->get_key(),
26452713
remote_info, remote_markers, null_yield);
26462714
if (r < 0) {
2647-
ldpp_dout(dpp, -1) << "failed to read remote log: " << cpp_strerror(r) << dendl;
2715+
source_sync_info.error = fmt::format("failed to read remote log: {}", cpp_strerror(r));
26482716
return r;
26492717
}
26502718

2651-
std::set<int> shards_behind;
2719+
std::map<int, std::string> shards_behind;
26522720
for (const auto& r : remote_markers.get()) {
26532721
auto shard_id = r.first;
26542722
if (r.second.empty()) {
26552723
continue; // empty bucket index shard
26562724
}
26572725
if (shard_id >= total_shards) {
26582726
// unexpected shard id. we don't have status for it, so we're behind
2659-
shards_behind.insert(shard_id);
2727+
shards_behind[shard_id] = r.second;
26602728
continue;
26612729
}
26622730
auto& m = shard_status[shard_id];
26632731
const auto pos = BucketIndexShardsManager::get_shard_marker(m.inc_marker.position);
26642732
if (pos < r.second) {
2665-
shards_behind.insert(shard_id);
2733+
shards_behind[shard_id] = r.second;
26662734
}
26672735
}
2668-
if (!shards_behind.empty()) {
2669-
out << indented{width} << "bucket is behind on " << shards_behind.size() << " shards\n";
2670-
out << indented{width} << "behind shards: [" << shards_behind << "]\n";
2671-
} else {
2672-
out << indented{width} << "bucket is caught up with source\n";
2673-
}
2736+
2737+
source_sync_info.shards_behind = std::move(shards_behind);
26742738
return 0;
26752739
}
26762740

@@ -2881,33 +2945,90 @@ static int bucket_sync_info(rgw::sal::Driver* driver, const RGWBucketInfo& info,
28812945
return 0;
28822946
}
28832947

2948+
struct bucket_sync_status_info {
2949+
std::vector<bucket_source_sync_info> source_status_info;
2950+
rgw::sal::Zone* _zone;
2951+
const rgw::sal::ZoneGroup* _zonegroup;
2952+
const RGWBucketInfo& _bucket_info;
2953+
const int width = 15;
2954+
std::string error;
2955+
2956+
bucket_sync_status_info(const RGWBucketInfo& bucket_info): _bucket_info(bucket_info) {}
2957+
2958+
void print(std::ostream& out, bool use_formatter, Formatter* formatter) {
2959+
if (use_formatter) {
2960+
_print_formatter(out, formatter);
2961+
} else {
2962+
_print_plaintext(out);
2963+
}
2964+
}
2965+
2966+
void _print_plaintext(std::ostream& out) {
2967+
out << indented{width, "realm"} << _zone->get_realm_id() << " (" << _zone->get_realm_name() << ")" << std::endl;
2968+
out << indented{width, "zonegroup"} << _zonegroup->get_id() << " (" << _zonegroup->get_name() << ")" << std::endl;
2969+
out << indented{width, "zone"} << _zone->get_id() << " (" << _zone->get_name() << ")" << std::endl;
2970+
out << indented{width, "bucket"} << _bucket_info.bucket << std::endl;
2971+
out << indented{width, "current time"}
2972+
<< to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms) << "\n\n";
2973+
2974+
if (!error.empty()){
2975+
out << error << std::endl;
2976+
}
2977+
2978+
for (const auto &info : source_status_info) {
2979+
info._print_plaintext(out, width);
2980+
}
2981+
}
2982+
2983+
void _print_formatter(std::ostream& out, Formatter* formatter) {
2984+
formatter->open_object_section("test");
2985+
formatter->dump_string("realm", _zone->get_realm_id());
2986+
formatter->dump_string("realm_name", _zone->get_realm_name());
2987+
formatter->dump_string("zonegroup", _zonegroup->get_id());
2988+
formatter->dump_string("zonegroup_name", _zonegroup->get_name());
2989+
formatter->dump_string("zone", _zone->get_id());
2990+
formatter->dump_string("zone_name", _zone->get_name());
2991+
formatter->dump_string("bucket", _bucket_info.bucket.name);
2992+
formatter->dump_string("bucket_instance_id", _bucket_info.bucket.bucket_id);
2993+
formatter->dump_string("current_time", to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms));
2994+
2995+
if (!error.empty()) {
2996+
formatter->dump_string("error", error);
2997+
}
2998+
2999+
formatter->open_array_section("sources");
3000+
for (const auto &info : source_status_info) {
3001+
info._print_formatter(out, formatter);
3002+
}
3003+
formatter->close_section();
3004+
3005+
formatter->close_section();
3006+
formatter->flush(out);
3007+
}
3008+
3009+
};
3010+
28843011
static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& info,
28853012
const rgw_zone_id& source_zone_id,
28863013
std::optional<rgw_bucket>& opt_source_bucket,
2887-
std::ostream& out)
3014+
bucket_sync_status_info& bucket_sync_info)
28883015
{
28893016
const rgw::sal::ZoneGroup& zonegroup = driver->get_zone()->get_zonegroup();
28903017
rgw::sal::Zone* zone = driver->get_zone();
2891-
constexpr int width = 15;
2892-
2893-
out << indented{width, "realm"} << zone->get_realm_id() << " (" << zone->get_realm_name() << ")\n";
2894-
out << indented{width, "zonegroup"} << zonegroup.get_id() << " (" << zonegroup.get_name() << ")\n";
2895-
out << indented{width, "zone"} << zone->get_id() << " (" << zone->get_name() << ")\n";
2896-
out << indented{width, "bucket"} << info.bucket << "\n";
2897-
out << indented{width, "current time"}
2898-
<< to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms) << "\n\n";
28993018

3019+
bucket_sync_info._zone = zone;
3020+
bucket_sync_info._zonegroup = &zonegroup;
29003021

29013022
if (!static_cast<rgw::sal::RadosStore*>(driver)->ctl()->bucket->bucket_imports_data(info.bucket, null_yield, dpp())) {
2902-
out << "Sync is disabled for bucket " << info.bucket.name << " or bucket has no sync sources" << std::endl;
3023+
bucket_sync_info.error = fmt::format("Sync is disabled for bucket {} or bucket has no sync sources", info.bucket.name);
29033024
return 0;
29043025
}
29053026

29063027
RGWBucketSyncPolicyHandlerRef handler;
29073028

29083029
int r = driver->get_sync_policy_handler(dpp(), std::nullopt, info.bucket, &handler, null_yield);
29093030
if (r < 0) {
2910-
ldpp_dout(dpp(), -1) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl;
3031+
bucket_sync_info.error = fmt::format("ERROR: failed to get policy handler for bucket ({}): r={}: {}", info.bucket.name, r, cpp_strerror(-r));
29113032
return r;
29123033
}
29133034

@@ -2920,13 +3041,12 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
29203041
std::unique_ptr<rgw::sal::Zone> zone;
29213042
int ret = driver->get_zone()->get_zonegroup().get_zone_by_id(source_zone_id.id, &zone);
29223043
if (ret < 0) {
2923-
ldpp_dout(dpp(), -1) << "Source zone not found in zonegroup "
2924-
<< zonegroup.get_name() << dendl;
3044+
bucket_sync_info.error = fmt::format("Source zone not found in zonegroup {}", zonegroup.get_name());
29253045
return -EINVAL;
29263046
}
29273047
auto c = zone_conn_map.find(source_zone_id);
29283048
if (c == zone_conn_map.end()) {
2929-
ldpp_dout(dpp(), -1) << "No connection to zone " << zone->get_name() << dendl;
3049+
bucket_sync_info.error = fmt::format("No connection to zone {}", zone->get_name());
29303050
return -EINVAL;
29313051
}
29323052
zone_ids.insert(source_zone_id);
@@ -2957,10 +3077,15 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
29573077
continue;
29583078
}
29593079
if (pipe.source.zone.value_or(rgw_zone_id()) == z->second.id) {
2960-
bucket_source_sync_status(dpp(), static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zone(), z->second,
3080+
bucket_source_sync_info source_sync_info(z->second);
3081+
auto ret = bucket_source_sync_status(dpp(), static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zone(), z->second,
29613082
c->second,
29623083
info, pipe,
2963-
width, out);
3084+
source_sync_info);
3085+
3086+
if (ret == 0) {
3087+
bucket_sync_info.source_status_info.emplace_back(std::move(source_sync_info));
3088+
}
29643089
}
29653090
}
29663091
}
@@ -3488,6 +3613,7 @@ int main(int argc, const char **argv)
34883613
list<string> tags_rm;
34893614
int placement_inline_data = true;
34903615
bool placement_inline_data_specified = false;
3616+
bool format_arg_passed = false;
34913617

34923618
int64_t max_objects = -1;
34933619
int64_t max_size = -1;
@@ -3867,6 +3993,7 @@ int main(int argc, const char **argv)
38673993
new_bucket_name = val;
38683994
} else if (ceph_argparse_witharg(args, i, &val, "--format", (char*)NULL)) {
38693995
format = val;
3996+
format_arg_passed = true;
38703997
} else if (ceph_argparse_witharg(args, i, &val, "--categories", (char*)NULL)) {
38713998
string cat_str = val;
38723999
list<string> cat_list;
@@ -9901,7 +10028,18 @@ int main(int argc, const char **argv)
990110028
if (ret < 0) {
990210029
return -ret;
990310030
}
9904-
bucket_sync_status(driver, bucket->get_info(), source_zone, opt_source_bucket, std::cout);
10031+
10032+
auto bucket_info = bucket->get_info();
10033+
bucket_sync_status_info bucket_sync_info(bucket_info);
10034+
10035+
ret = bucket_sync_status(driver, bucket_info, source_zone,
10036+
opt_source_bucket, bucket_sync_info);
10037+
10038+
if (ret == 0) {
10039+
bucket_sync_info.print(std::cout, format_arg_passed, formatter.get());
10040+
} else {
10041+
cerr << "failed to get bucket sync status. see logs for more info" << std::endl;
10042+
}
990510043
}
990610044

990710045
if (opt_cmd == OPT::BUCKET_SYNC_MARKERS) {

0 commit comments

Comments
 (0)