Skip to content

Commit 9302fbb

Browse files
committed
rgw: track initiator of reshard queue entries
The logic for managing the reshard queue (log) can vary depending on whether the entry was added by an admin or by dynamic resharding. For example, if it's a reshard reduction, dynamic resharding won't overwrite the queue entry so as not to disrupt the reduction wait period. On the other hand, and admin should be able to overwrite the entry at will. So we now track the initiator of each entry on the queue. This adds another field to that at rest data structure, and it updates the logic to make use of it. Signed-off-by: J. Eric Ivancich <[email protected]>
1 parent f8f95ab commit 9302fbb

File tree

6 files changed

+77
-24
lines changed

6 files changed

+77
-24
lines changed

src/cls/rgw/cls_rgw_types.cc

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,19 @@ void rgw_usage_log_entry::generate_test_instances(list<rgw_usage_log_entry *> &o
814814
o.push_back(new rgw_usage_log_entry);
815815
}
816816

817+
std::string to_string(cls_rgw_reshard_initiator i) {
818+
switch (i) {
819+
case cls_rgw_reshard_initiator::Unknown:
820+
return "unknown";
821+
case cls_rgw_reshard_initiator::Admin:
822+
return "administrator";
823+
case cls_rgw_reshard_initiator::Dynamic:
824+
return "dynamic resharding";
825+
default:
826+
return "error";
827+
}
828+
}
829+
817830
void cls_rgw_reshard_entry::generate_key(const string& tenant, const string& bucket_name, string *key)
818831
{
819832
*key = tenant + ":" + bucket_name;
@@ -827,12 +840,13 @@ void cls_rgw_reshard_entry::get_key(string *key) const
827840
void cls_rgw_reshard_entry::dump(Formatter *f) const
828841
{
829842
utime_t ut(time);
830-
encode_json("time",ut, f);
843+
encode_json("time", ut, f);
831844
encode_json("tenant", tenant, f);
832845
encode_json("bucket_name", bucket_name, f);
833846
encode_json("bucket_id", bucket_id, f);
834847
encode_json("old_num_shards", old_num_shards, f);
835848
encode_json("tentative_new_num_shards", new_num_shards, f);
849+
encode_json("initiator", to_string(initiator), f);
836850
}
837851

838852
void cls_rgw_reshard_entry::generate_test_instances(list<cls_rgw_reshard_entry*>& ls)

src/cls/rgw/cls_rgw_types.h

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,25 +1325,40 @@ struct cls_rgw_lc_entry {
13251325
};
13261326
WRITE_CLASS_ENCODER(cls_rgw_lc_entry);
13271327

1328+
1329+
// used to track the initiator of a reshard entry on the reshard queue (log)
1330+
enum class cls_rgw_reshard_initiator : uint8_t {
1331+
Unknown = 0,
1332+
Admin = 1,
1333+
Dynamic = 2,
1334+
};
1335+
std::string to_string(cls_rgw_reshard_initiator i);
1336+
inline std::ostream& operator<<(std::ostream& out, cls_rgw_reshard_initiator i) {
1337+
return out << to_string(i);
1338+
}
1339+
1340+
13281341
struct cls_rgw_reshard_entry
13291342
{
13301343
ceph::real_time time;
13311344
std::string tenant;
13321345
std::string bucket_name;
13331346
std::string bucket_id;
1334-
uint32_t old_num_shards{0};
1335-
uint32_t new_num_shards{0};
1347+
uint32_t old_num_shards {0};
1348+
uint32_t new_num_shards {0};
1349+
cls_rgw_reshard_initiator initiator {cls_rgw_reshard_initiator::Unknown};
13361350

13371351
cls_rgw_reshard_entry() {}
13381352

13391353
void encode(ceph::buffer::list& bl) const {
1340-
ENCODE_START(2, 1, bl);
1354+
ENCODE_START(3, 1, bl);
13411355
encode(time, bl);
13421356
encode(tenant, bl);
13431357
encode(bucket_name, bl);
13441358
encode(bucket_id, bl);
13451359
encode(old_num_shards, bl);
13461360
encode(new_num_shards, bl);
1361+
encode(initiator, bl);
13471362
ENCODE_FINISH(bl);
13481363
}
13491364

@@ -1359,6 +1374,11 @@ struct cls_rgw_reshard_entry
13591374
}
13601375
decode(old_num_shards, bl);
13611376
decode(new_num_shards, bl);
1377+
if (struct_v >= 3) {
1378+
decode(initiator, bl);
1379+
} else {
1380+
initiator = cls_rgw_reshard_initiator::Unknown;
1381+
}
13621382
DECODE_FINISH(bl);
13631383
}
13641384

src/rgw/driver/rados/rgw_rados.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10424,6 +10424,7 @@ int RGWRados::add_bucket_to_reshard(const DoutPrefixProvider *dpp,
1042410424
entry.bucket_id = bucket_info.bucket.bucket_id;
1042510425
entry.old_num_shards = num_source_shards;
1042610426
entry.new_num_shards = new_num_shards;
10427+
entry.initiator = cls_rgw_reshard_initiator::Dynamic;
1042710428

1042810429
return reshard.add(dpp, entry, y);
1042910430
}

src/rgw/driver/rados/rgw_reshard.cc

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,8 +1032,11 @@ int RGWBucketReshard::get_status(const DoutPrefixProvider *dpp, list<cls_rgw_buc
10321032
int RGWBucketReshard::execute(int num_shards,
10331033
ReshardFaultInjector& fault,
10341034
int max_op_entries,
1035-
const DoutPrefixProvider *dpp, optional_yield y,
1036-
bool verbose, ostream *out,
1035+
const cls_rgw_reshard_initiator initiator,
1036+
const DoutPrefixProvider *dpp,
1037+
optional_yield y,
1038+
bool verbose,
1039+
ostream *out,
10371040
Formatter *formatter,
10381041
RGWReshard* reshard_log)
10391042
{
@@ -1046,7 +1049,7 @@ int RGWBucketReshard::execute(int num_shards,
10461049
auto unlock = make_scope_guard([this] { reshard_lock.unlock(); });
10471050

10481051
if (reshard_log) {
1049-
ret = reshard_log->update(dpp, bucket_info, y);
1052+
ret = reshard_log->update(dpp, bucket_info, initiator, y);
10501053
if (ret < 0) {
10511054
return ret;
10521055
}
@@ -1134,19 +1137,23 @@ int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry,
11341137

11351138
librados::ObjectWriteOperation op;
11361139

1137-
// if we're reducing, we don't want to overwrite an existing entry
1138-
// in order to not interfere with the reshard reduction wait period
1139-
const bool create_only = entry.new_num_shards < entry.old_num_shards;
1140+
// if this is dynamic resharding and we're reducing, we don't want
1141+
// to overwrite an existing entry in order to not interfere with the
1142+
// reshard reduction wait period
1143+
const bool create_only =
1144+
entry.initiator == cls_rgw_reshard_initiator::Dynamic &&
1145+
entry.new_num_shards < entry.old_num_shards;
11401146

11411147
cls_rgw_reshard_add(op, entry, create_only);
11421148

11431149
int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, y);
11441150
if (create_only && ret == -EEXIST) {
1145-
ldpp_dout(dpp, 20) << "INFO: did not write reshard queue entry for oid=" <<
1151+
ldpp_dout(dpp, 20) <<
1152+
"INFO: did not write reshard queue entry for oid=" <<
11461153
logshard_oid << " tenant=" << entry.tenant << " bucket=" <<
11471154
entry.bucket_name <<
1148-
", because it's a reshard reduction and an entry for that bucket already exists" <<
1149-
dendl;
1155+
", because it's a dynamic reshard reduction and an entry for that "
1156+
"bucket already exists" << dendl;
11501157
// this is not an error so just fall through
11511158
} else if (ret < 0) {
11521159
ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" <<
@@ -1157,12 +1164,16 @@ int RGWReshard::add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry,
11571164
return 0;
11581165
}
11591166

1160-
int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, optional_yield y)
1167+
int RGWReshard::update(const DoutPrefixProvider *dpp,
1168+
const RGWBucketInfo& bucket_info,
1169+
const cls_rgw_reshard_initiator initiator,
1170+
optional_yield y)
11611171
{
11621172
cls_rgw_reshard_entry entry;
11631173
entry.bucket_name = bucket_info.bucket.name;
11641174
entry.bucket_id = bucket_info.bucket.bucket_id;
11651175
entry.tenant = bucket_info.bucket.tenant;
1176+
entry.initiator = initiator;
11661177

11671178
int ret = get(dpp, entry);
11681179
if (ret < 0) {
@@ -1171,7 +1182,7 @@ int RGWReshard::update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucke
11711182

11721183
ret = add(dpp, entry, y);
11731184
if (ret < 0) {
1174-
ldpp_dout(dpp, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
1185+
ldpp_dout(dpp, 0) << __func__ << ": Error in updating entry bucket " << entry.bucket_name << ": " <<
11751186
cpp_strerror(-ret) << dendl;
11761187
}
11771188

@@ -1354,9 +1365,12 @@ int RGWReshard::process_entry(const cls_rgw_reshard_entry& entry,
13541365
}
13551366
}
13561367

1357-
// if reshard reduction, perform extra sanity checks in part to
1358-
// prevent chasing constantly changing entry count
1359-
if (entry.new_num_shards < entry.old_num_shards) {
1368+
// if *dynamic* reshard reduction, perform extra sanity checks in
1369+
// part to prevent chasing constantly changing entry count. If
1370+
// *admin*-initiated (or unknown-initiated) reshard reduction, skip
1371+
// this step and proceed.
1372+
if (entry.initiator == cls_rgw_reshard_initiator::Dynamic &&
1373+
entry.new_num_shards < entry.old_num_shards) {
13601374
const bool may_reduce =
13611375
store->ctx()->_conf.get_val<bool>("rgw_dynamic_resharding_may_reduce");
13621376
if (! may_reduce) {
@@ -1446,8 +1460,8 @@ int RGWReshard::process_entry(const cls_rgw_reshard_entry& entry,
14461460
RGWBucketReshard br(store, bucket_info, bucket_attrs, nullptr);
14471461

14481462
ReshardFaultInjector f; // no fault injected
1449-
ret = br.execute(entry.new_num_shards, f, max_entries, dpp, y,
1450-
false, nullptr, nullptr, this);
1463+
ret = br.execute(entry.new_num_shards, f, max_entries, entry.initiator,
1464+
dpp, y, false, nullptr, nullptr, this);
14511465
if (ret < 0) {
14521466
ldpp_dout(dpp, 0) << __func__ <<
14531467
": Error during resharding bucket " << entry.bucket_name << ":" <<
@@ -1491,7 +1505,7 @@ int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvid
14911505
continue;
14921506
}
14931507

1494-
for(auto& entry: entries) { // logshard entries
1508+
for(auto& entry : entries) { // logshard entries
14951509
process_entry(entry, max_entries, dpp, y);
14961510

14971511
Clock::time_point now = Clock::now();

src/rgw/driver/rados/rgw_reshard.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ class RGWBucketReshard {
100100
const std::map<std::string, bufferlist>& _bucket_attrs,
101101
RGWBucketReshardLock* _outer_reshard_lock);
102102
int execute(int num_shards, ReshardFaultInjector& f,
103-
int max_op_entries, const DoutPrefixProvider *dpp, optional_yield y,
103+
int max_op_entries, const cls_rgw_reshard_initiator initiator,
104+
const DoutPrefixProvider *dpp, optional_yield y,
104105
bool verbose = false, std::ostream *out = nullptr,
105106
ceph::Formatter *formatter = nullptr,
106107
RGWReshard *reshard_log = nullptr);
@@ -222,7 +223,7 @@ class RGWReshard {
222223
public:
223224
RGWReshard(rgw::sal::RadosStore* _store, bool _verbose = false, std::ostream *_out = nullptr, Formatter *_formatter = nullptr);
224225
int add(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry, optional_yield y);
225-
int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, optional_yield y);
226+
int update(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_reshard_initiator initiator, optional_yield y);
226227
int get(const DoutPrefixProvider *dpp, cls_rgw_reshard_entry& entry);
227228
int remove(const DoutPrefixProvider *dpp, const cls_rgw_reshard_entry& entry, optional_yield y);
228229
int list(const DoutPrefixProvider *dpp, int logshard_num, std::string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);

src/rgw/rgw_admin.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8346,7 +8346,9 @@ int main(int argc, const char **argv)
83468346
} else if (inject_delay_at) {
83478347
fault.inject(*inject_delay_at, InjectDelay{inject_delay, dpp()});
83488348
}
8349-
ret = br.execute(num_shards, fault, max_entries, dpp(), null_yield,
8349+
ret = br.execute(num_shards, fault, max_entries,
8350+
cls_rgw_reshard_initiator::Admin,
8351+
dpp(), null_yield,
83508352
verbose, &cout, formatter.get());
83518353
return -ret;
83528354
}
@@ -8374,6 +8376,7 @@ int main(int argc, const char **argv)
83748376
entry.bucket_id = bucket->get_info().bucket.bucket_id;
83758377
entry.old_num_shards = num_source_shards;
83768378
entry.new_num_shards = num_shards;
8379+
entry.initiator = cls_rgw_reshard_initiator::Admin;
83778380

83788381
return reshard.add(dpp(), entry, null_yield);
83798382
}

0 commit comments

Comments
 (0)