Skip to content

Commit 7299783

Browse files
reshard: guarantee no duplicated index entries exist before starting
reshard There will be duplicated index entries remaining after reshard failed, that can lead to redundant copys in a new reshard process. What's more, if the duplicated entry is deleting operation, and the same entry was written again before a new resharding, the dst index may be deleted wrongly. So duplicated index entries should be cleared after reshard failed and before a new reshard autom automatically. For convenience, rgw-admin can list and purge reshard logsi manually. Signed-off-by: Mingyuan Liang <[email protected]>
1 parent 8f68b3f commit 7299783

File tree

8 files changed

+230
-12
lines changed

8 files changed

+230
-12
lines changed

src/cls/rgw/cls_rgw.cc

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3822,6 +3822,48 @@ static int rgw_bi_log_stop(cls_method_context_t hctx, bufferlist *in, bufferlist
38223822
return write_bucket_header(hctx, &header);
38233823
}
38243824

3825+
static int rgw_reshard_log_trim_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3826+
{
3827+
string key_begin(1, BI_PREFIX_CHAR);
3828+
key_begin.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX]);
3829+
3830+
string key_end;
3831+
key_end = BI_PREFIX_CHAR;
3832+
key_end.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX + 1]);
3833+
3834+
// list a single key to detect whether the range is empty
3835+
const size_t max_entries = 1;
3836+
std::set<std::string> keys;
3837+
bool more = false;
3838+
3839+
int rc = cls_cxx_map_get_keys(hctx, key_begin, max_entries, &keys, &more);
3840+
if (rc < 0) {
3841+
CLS_LOG(1, "ERROR: cls_cxx_map_get_keys failed rc=%d", rc);
3842+
return rc;
3843+
}
3844+
3845+
if (keys.empty()) {
3846+
CLS_LOG(20, "range is empty key_begin=%s", key_begin.c_str());
3847+
return -ENODATA;
3848+
}
3849+
3850+
const std::string& first_key = *keys.begin();
3851+
if (key_end < first_key) {
3852+
CLS_LOG(20, "listed key %s past key_end=%s", first_key.c_str(), key_end.c_str());
3853+
return -ENODATA;
3854+
}
3855+
3856+
CLS_LOG(20, "listed key %s, removing through %s",
3857+
first_key.c_str(), key_end.c_str());
3858+
3859+
rc = cls_cxx_map_remove_range(hctx, first_key, key_end);
3860+
if (rc < 0) {
3861+
CLS_LOG(1, "ERROR: cls_cxx_map_remove_range failed rc=%d", rc);
3862+
return rc;
3863+
}
3864+
return 0;
3865+
}
3866+
38253867
static void usage_record_prefix_by_time(uint64_t epoch, string& key)
38263868
{
38273869
char buf[32];
@@ -4975,6 +5017,7 @@ CLS_INIT(rgw)
49755017
cls_method_handle_t h_rgw_bi_get_vals_op;
49765018
cls_method_handle_t h_rgw_bi_put_op;
49775019
cls_method_handle_t h_rgw_bi_list_op;
5020+
cls_method_handle_t h_rgw_reshard_log_trim_op;
49785021
cls_method_handle_t h_rgw_bi_log_list_op;
49795022
cls_method_handle_t h_rgw_bi_log_trim_op;
49805023
cls_method_handle_t h_rgw_bi_log_resync_op;
@@ -5032,6 +5075,7 @@ CLS_INIT(rgw)
50325075
cls_register_cxx_method(h_class, RGW_BI_GET_VALS, CLS_METHOD_RD, rgw_bi_get_vals_op, &h_rgw_bi_get_vals_op);
50335076
cls_register_cxx_method(h_class, RGW_BI_PUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_op, &h_rgw_bi_put_op);
50345077
cls_register_cxx_method(h_class, RGW_BI_LIST, CLS_METHOD_RD, rgw_bi_list_op, &h_rgw_bi_list_op);
5078+
cls_register_cxx_method(h_class, RGW_RESHARD_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_log_trim_op, &h_rgw_reshard_log_trim_op);
50355079

50365080
cls_register_cxx_method(h_class, RGW_BI_LOG_LIST, CLS_METHOD_RD, rgw_bi_log_list, &h_rgw_bi_log_list_op);
50375081
cls_register_cxx_method(h_class, RGW_BI_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_log_trim, &h_rgw_bi_log_trim_op);

src/cls/rgw/cls_rgw_client.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,19 @@ int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
733733
return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
734734
}
735735

736+
static bool issue_reshard_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
737+
BucketIndexAioManager *manager) {
738+
bufferlist in;
739+
ObjectWriteOperation op;
740+
op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in);
741+
return manager->aio_operate(io_ctx, shard_id, oid, &op);
742+
}
743+
744+
int CLSRGWIssueReshardLogTrim::issue_op(int shard_id, const string& oid)
745+
{
746+
return issue_reshard_log_trim(io_ctx, oid, shard_id, &manager);
747+
}
748+
736749
static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
737750
rgw_cls_check_index_ret *pdata) {
738751
bufferlist in;

src/cls/rgw/cls_rgw_client.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,23 @@ class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO {
522522
virtual ~CLSRGWIssueBILogTrim() override {}
523523
};
524524

525+
class CLSRGWIssueReshardLogTrim : public CLSRGWConcurrentIO {
526+
protected:
527+
int issue_op(int shard_id, const std::string& oid) override;
528+
// Trim until -ENODATA is returned.
529+
int valid_ret_code() override { return -ENODATA; }
530+
bool need_multiple_rounds() override { return true; }
531+
void add_object(int shard, const std::string& oid) override { objs_container[shard] = oid; }
532+
void reset_container(std::map<int, std::string>& objs) override {
533+
objs_container.swap(objs);
534+
iter = objs_container.begin();
535+
objs.clear();
536+
}
537+
public:
538+
CLSRGWIssueReshardLogTrim(librados::IoCtx& io_ctx, std::map<int, std::string>& _bucket_objs, uint32_t max_aio) :
539+
CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
540+
};
541+
525542
/**
526543
* Check the bucket index.
527544
*

src/cls/rgw/cls_rgw_const.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ constexpr int RGWBIAdvanceAndRetryError = -EFBIG;
3737
#define RGW_BI_PUT "bi_put"
3838
#define RGW_BI_LIST "bi_list"
3939

40+
#define RGW_RESHARD_LOG_TRIM "reshard_log_trim"
41+
4042
#define RGW_BI_LOG_LIST "bi_log_list"
4143
#define RGW_BI_LOG_TRIM "bi_log_trim"
4244
#define RGW_DIR_SUGGEST_CHANGES "dir_suggest_changes"

src/rgw/driver/rados/rgw_rados.cc

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7720,8 +7720,8 @@ int RGWRados::reshard_failed_while_logrecord(RGWBucketInfo& bucket_info,
77207720
} else {
77217721
ldpp_dout(dpp,20) << __func__ << ": reshard lock success, " <<
77227722
"that means the reshard has failed for bucekt " << bucket_info.bucket.bucket_id << dendl;
7723-
// clear the RESHARD_IN_PROGRESS status after reshard failed, also set bucket instance
7724-
// status to CLS_RGW_RESHARD_NONE
7723+
// clear the RESHARD_IN_PROGRESS status after reshard failed, set bucket instance status
7724+
// to CLS_RGW_RESHARD_NONE, also clear the reshard log entries
77257725
ret = RGWBucketReshard::clear_resharding(this->driver, bucket_info, bucket_attrs, dpp, y);
77267726
reshard_lock.unlock();
77277727
if (ret < 0) {
@@ -9351,6 +9351,18 @@ int RGWRados::bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs)
93519351
return 0;
93529352
}
93539353

9354+
int RGWRados::trim_reshard_log_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y)
9355+
{
9356+
librados::IoCtx index_pool;
9357+
map<int, string> bucket_objs;
9358+
9359+
int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
9360+
if (r < 0) {
9361+
return r;
9362+
}
9363+
return CLSRGWIssueReshardLogTrim(index_pool, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
9364+
}
9365+
93549366
int RGWRados::gc_operate(const DoutPrefixProvider *dpp, string& oid, librados::ObjectWriteOperation *op, optional_yield y)
93559367
{
93569368
return rgw_rados_operate(dpp, gc_pool_ctx, oid, op, y);

src/rgw/driver/rados/rgw_rados.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1540,6 +1540,8 @@ class RGWRados
15401540
std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y);
15411541
int bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs);
15421542

1543+
int trim_reshard_log_entries(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y);
1544+
15431545
int cls_obj_usage_log_add(const DoutPrefixProvider *dpp, const std::string& oid, rgw_usage_log_info& info, optional_yield y);
15441546
int cls_obj_usage_log_read(const DoutPrefixProvider *dpp, const std::string& oid, const std::string& user, const std::string& bucket, uint64_t start_epoch,
15451547
uint64_t end_epoch, uint32_t max_entries, std::string& read_iter,

src/rgw/driver/rados/rgw_reshard.cc

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,8 @@ static int init_target_index(rgw::sal::RadosStore* store,
436436
{
437437
int ret = store->svc()->bi->init_index(dpp, bucket_info, index, true);
438438
if (ret == -EOPNOTSUPP) {
439-
ldpp_dout(dpp, 0) << "WARNING: " << "init_index() does not supported logrecord" << dendl;
439+
ldpp_dout(dpp, 0) << "WARNING: " << "init_index() does not supported logrecord, "
440+
<< "falling back to block reshard mode." << dendl;
440441
support_logrecord = false;
441442
ret = store->svc()->bi->init_index(dpp, bucket_info, index, false);
442443
}
@@ -571,6 +572,25 @@ static int init_target_layout(rgw::sal::RadosStore* store,
571572
store->svc()->bi->clean_index(dpp, bucket_info, target);
572573
return ret;
573574
}
575+
576+
// trim the reshard log entries to guarantee that any existing log entries are cleared,
577+
// if there are no reshard log entries, this is a no-op that costs little time
578+
if (support_logrecord) {
579+
ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield);
580+
if (ret == -EOPNOTSUPP) {
581+
// not an error, logrecord is not supported, change to block reshard
582+
ldpp_dout(dpp, 0) << "WARNING: " << "trim_reshard_log_entries() does not supported"
583+
<< " logrecord, falling back to block reshard mode." << dendl;
584+
bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
585+
support_logrecord = false;
586+
return 0;
587+
}
588+
if (ret < 0) {
589+
ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to trim reshard log entries: "
590+
<< cpp_strerror(ret) << dendl;
591+
return ret;
592+
}
593+
}
574594
return 0;
575595
} // init_target_layout
576596

@@ -591,6 +611,13 @@ static int revert_target_layout(rgw::sal::RadosStore* store,
591611
"target index with: " << cpp_strerror(ret) << dendl;
592612
ret = 0; // non-fatal error
593613
}
614+
// trim the reshard log entries written in logrecord state
615+
ret = store->getRados()->trim_reshard_log_entries(dpp, bucket_info, null_yield);
616+
if (ret < 0) {
617+
ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to trim "
618+
"reshard log entries: " << cpp_strerror(ret) << dendl;
619+
ret = 0; // non-fatal error
620+
}
594621

595622
// retry in case of racing writes to the bucket instance metadata
596623
static constexpr auto max_retries = 10;
@@ -677,7 +704,9 @@ static int init_reshard(rgw::sal::RadosStore* store,
677704
}
678705
if (ret == -EOPNOTSUPP) {
679706
ldpp_dout(dpp, 0) << "WARNING: " << "set_resharding_status()"
680-
<< " doesn't support logrecords" << dendl;
707+
<< " doesn't support logrecords,"
708+
<< " fallback to blocking mode." << dendl;
709+
bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
681710
support_logrecord = false;
682711
}
683712
}
@@ -1251,15 +1280,24 @@ int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& curr
12511280
if (ret < 0) {
12521281
return ret;
12531282
}
1254-
}
12551283

1256-
// block the client op and complete the resharding
1257-
ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
1258-
int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
1259-
formatter, bucket_info.layout.resharding, dpp, y);
1260-
if (ret < 0) {
1261-
ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl;
1262-
return ret;
1284+
// block the client op and complete the resharding
1285+
ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
1286+
ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
1287+
formatter, bucket_info.layout.resharding, dpp, y);
1288+
if (ret < 0) {
1289+
ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl;
1290+
return ret;
1291+
}
1292+
} else {
1293+
// setting InProgress state, but doing InLogrecord state
1294+
ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
1295+
int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
1296+
formatter, rgw::BucketReshardState::InLogrecord, dpp, y);
1297+
if (ret < 0) {
1298+
ldpp_dout(dpp, 0) << __func__ << ": failed in logrecord state of reshard ret = " << ret << dendl;
1299+
return ret;
1300+
}
12631301
}
12641302
return 0;
12651303
} // RGWBucketReshard::do_reshard

src/rgw/rgw_admin.cc

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,8 @@ void usage()
317317
cout << " reshard cancel cancel resharding a bucket\n";
318318
cout << " reshard stale-instances list list stale-instances from bucket resharding\n";
319319
cout << " reshard stale-instances delete cleanup stale-instances from bucket resharding\n";
320+
cout << " reshardlog list list bucket resharding log\n";
321+
cout << " reshardlog purge trim bucket resharding log\n";
320322
cout << " sync error list list sync error\n";
321323
cout << " sync error trim trim sync error\n";
322324
cout << " mfa create create a new MFA TOTP token\n";
@@ -863,6 +865,8 @@ enum class OPT {
863865
MFA_RESYNC,
864866
RESHARD_STALE_INSTANCES_LIST,
865867
RESHARD_STALE_INSTANCES_DELETE,
868+
RESHARDLOG_LIST,
869+
RESHARDLOG_PURGE,
866870
PUBSUB_TOPIC_LIST,
867871
PUBSUB_TOPIC_GET,
868872
PUBSUB_TOPIC_RM,
@@ -1112,6 +1116,8 @@ static SimpleCmd::Commands all_cmds = {
11121116
{ "reshard stale list", OPT::RESHARD_STALE_INSTANCES_LIST },
11131117
{ "reshard stale-instances delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
11141118
{ "reshard stale delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
1119+
{ "reshardlog list", OPT::RESHARDLOG_LIST},
1120+
{ "reshardlog purge", OPT::RESHARDLOG_PURGE},
11151121
{ "topic list", OPT::PUBSUB_TOPIC_LIST },
11161122
{ "topic get", OPT::PUBSUB_TOPIC_GET },
11171123
{ "topic rm", OPT::PUBSUB_TOPIC_RM },
@@ -11039,6 +11045,90 @@ int main(int argc, const char **argv)
1103911045
}
1104011046
}
1104111047

11048+
if (opt_cmd == OPT::RESHARDLOG_LIST) {
11049+
if (bucket_name.empty()) {
11050+
cerr << "ERROR: bucket not specified" << std::endl;
11051+
return EINVAL;
11052+
}
11053+
int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
11054+
if (ret < 0) {
11055+
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
11056+
return -ret;
11057+
}
11058+
11059+
list<rgw_cls_bi_entry> entries;
11060+
bool is_truncated;
11061+
if (max_entries < 0)
11062+
max_entries = 1000;
11063+
11064+
const auto& index = bucket->get_info().layout.current_index;
11065+
if (index.layout.type == rgw::BucketIndexType::Indexless) {
11066+
cerr << "ERROR: indexless bucket has no index to purge" << std::endl;
11067+
return EINVAL;
11068+
}
11069+
11070+
int max_shards = rgw::num_shards(index);
11071+
11072+
formatter->open_array_section("entries");
11073+
int i = (specified_shard_id ? shard_id : 0);
11074+
for (; i < max_shards; i++) {
11075+
formatter->open_object_section("shard");
11076+
encode_json("shard_id", i, formatter.get());
11077+
formatter->open_array_section("single shard entries");
11078+
RGWRados::BucketShard bs(static_cast<rgw::sal::RadosStore*>(driver)->getRados());
11079+
int ret = bs.init(dpp(), bucket->get_info(), index, i, null_yield);
11080+
if (ret < 0) {
11081+
cerr << "ERROR: bs.init(bucket=" << bucket << ", shard=" << i << "): " << cpp_strerror(-ret) << std::endl;
11082+
return -ret;
11083+
}
11084+
11085+
marker.clear();
11086+
do {
11087+
entries.clear();
11088+
ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->bi_list(bs, "", marker, max_entries,
11089+
&entries, &is_truncated,
11090+
true, null_yield);
11091+
if (ret < 0) {
11092+
cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
11093+
return -ret;
11094+
}
11095+
11096+
list<rgw_cls_bi_entry>::iterator iter;
11097+
for (iter = entries.begin(); iter != entries.end(); ++iter) {
11098+
rgw_cls_bi_entry& entry = *iter;
11099+
formatter->dump_string("idx", entry.idx);
11100+
marker = entry.idx;
11101+
}
11102+
formatter->flush(cout);
11103+
} while (is_truncated);
11104+
formatter->close_section();
11105+
formatter->close_section();
11106+
formatter->flush(cout);
11107+
11108+
if (specified_shard_id)
11109+
break;
11110+
}
11111+
formatter->close_section();
11112+
formatter->flush(cout);
11113+
}
11114+
11115+
if (opt_cmd == OPT::RESHARDLOG_PURGE) {
11116+
if (bucket_name.empty()) {
11117+
cerr << "ERROR: bucket not specified" << std::endl;
11118+
return EINVAL;
11119+
}
11120+
int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket);
11121+
if (ret < 0) {
11122+
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
11123+
return -ret;
11124+
}
11125+
ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->trim_reshard_log_entries(dpp(), bucket->get_info(), null_yield);
11126+
if (ret < 0) {
11127+
cerr << "ERROR: trim_reshard_log_entries(): " << cpp_strerror(-ret) << std::endl;
11128+
return -ret;
11129+
}
11130+
}
11131+
1104211132
if (opt_cmd == OPT::PUBSUB_NOTIFICATION_LIST) {
1104311133
if (bucket_name.empty()) {
1104411134
cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;

0 commit comments

Comments
 (0)