Skip to content

Commit e11480a

Browse files
committed
cls/rgw: add bulk cls_rgw_bi_put_entries() op for reshard
adds a bulk api for reshard to write entries to the target index shard object. this takes care of the bucket stats updates so that rgw's reshard logic doesn't have to worry about it Signed-off-by: Casey Bodley <[email protected]>
1 parent 7ce0b5e commit e11480a

File tree

8 files changed

+248
-0
lines changed

8 files changed

+248
-0
lines changed

src/cls/rgw/cls_rgw.cc

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2880,6 +2880,103 @@ static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist *
28802880
return 0;
28812881
}
28822882

2883+
static int rgw_bi_put_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2884+
{
2885+
rgw_cls_bi_put_entries_op op;
2886+
try {
2887+
auto iter = in->cbegin();
2888+
decode(op, iter);
2889+
} catch (const ceph::buffer::error&) {
2890+
CLS_LOG(0, "ERROR: %s: failed to decode request", __func__);
2891+
return -EINVAL;
2892+
}
2893+
2894+
const size_t limit = cls_get_config(hctx)->osd_max_omap_entries_per_request;
2895+
if (op.entries.size() > limit) {
2896+
int r = -E2BIG;
2897+
CLS_LOG(0, "ERROR: %s: got too many entries (%zu > %zu), returning %d",
2898+
__func__, op.entries.size(), limit, r);
2899+
return r;
2900+
}
2901+
2902+
rgw_bucket_dir_header header;
2903+
int r = read_bucket_header(hctx, &header);
2904+
if (r < 0) {
2905+
CLS_LOG(1, "ERROR: %s: failed to read header", __func__);
2906+
return r;
2907+
}
2908+
2909+
if (op.check_existing) {
2910+
// fetch any existing keys and decrement their stats before overwriting
2911+
std::set<std::string> keys;
2912+
for (const auto& entry : op.entries) {
2913+
keys.insert(entry.idx);
2914+
}
2915+
2916+
std::map<std::string, ceph::buffer::list> vals;
2917+
r = cls_cxx_map_get_vals_by_keys(hctx, keys, &vals);
2918+
if (r < 0) {
2919+
CLS_LOG(0, "ERROR: %s: cls_cxx_map_get_vals_by_keys() returned r=%d",
2920+
__func__, r);
2921+
return r;
2922+
}
2923+
2924+
for (auto& [idx, data] : vals) {
2925+
rgw_cls_bi_entry entry;
2926+
entry.type = bi_type(idx);
2927+
entry.idx = std::move(idx);
2928+
entry.data = std::move(data);
2929+
2930+
cls_rgw_obj_key key;
2931+
RGWObjCategory category;
2932+
rgw_bucket_category_stats stats;
2933+
const bool account = entry.get_info(&key, &category, &stats);
2934+
if (account) {
2935+
auto& dest = header.stats[category];
2936+
dest.total_size -= stats.total_size;
2937+
dest.total_size_rounded -= stats.total_size_rounded;
2938+
dest.num_entries -= stats.num_entries;
2939+
dest.actual_size -= stats.actual_size;
2940+
}
2941+
} // foreach vals
2942+
} // if op.check_existing
2943+
2944+
std::map<std::string, ceph::buffer::list> new_vals;
2945+
2946+
for (auto& entry : op.entries) {
2947+
if (entry.type == BIIndexType::ReshardDeleted) {
2948+
r = cls_cxx_map_remove_key(hctx, entry.idx);
2949+
if (r < 0) {
2950+
CLS_LOG(0, "WARNING: %s: cls_cxx_map_remove_key(%s) returned r=%d",
2951+
__func__, entry.idx.c_str(), r);
2952+
} // not fatal
2953+
continue;
2954+
}
2955+
2956+
cls_rgw_obj_key key;
2957+
RGWObjCategory category;
2958+
rgw_bucket_category_stats stats;
2959+
const bool account = entry.get_info(&key, &category, &stats);
2960+
if (account) {
2961+
auto& dest = header.stats[category];
2962+
dest.total_size += stats.total_size;
2963+
dest.total_size_rounded += stats.total_size_rounded;
2964+
dest.num_entries += stats.num_entries;
2965+
dest.actual_size += stats.actual_size;
2966+
}
2967+
2968+
new_vals.emplace(std::move(entry.idx), std::move(entry.data));
2969+
}
2970+
2971+
r = cls_cxx_map_set_vals(hctx, &new_vals);
2972+
if (r < 0) {
2973+
CLS_LOG(0, "ERROR: %s: cls_cxx_map_set_vals() returned r=%d", __func__, r);
2974+
return r;
2975+
}
2976+
2977+
return write_bucket_header(hctx, &header);
2978+
}
2979+
28832980
/* The plain entries in the bucket index are divided into two regions
28842981
* divided by the special entries that begin with 0x80. Those below
28852982
* ("Low") are ascii entries. Those above ("High") bring in unicode
@@ -4985,6 +5082,7 @@ CLS_INIT(rgw)
49855082
cls_method_handle_t h_rgw_bi_get_op;
49865083
cls_method_handle_t h_rgw_bi_get_vals_op;
49875084
cls_method_handle_t h_rgw_bi_put_op;
5085+
cls_method_handle_t h_rgw_bi_put_entries_op;
49885086
cls_method_handle_t h_rgw_bi_list_op;
49895087
cls_method_handle_t h_rgw_reshard_log_trim_op;
49905088
cls_method_handle_t h_rgw_bi_log_list_op;
@@ -5043,6 +5141,7 @@ CLS_INIT(rgw)
50435141
cls_register_cxx_method(h_class, RGW_BI_GET, CLS_METHOD_RD, rgw_bi_get_op, &h_rgw_bi_get_op);
50445142
cls_register_cxx_method(h_class, RGW_BI_GET_VALS, CLS_METHOD_RD, rgw_bi_get_vals_op, &h_rgw_bi_get_vals_op);
50455143
cls_register_cxx_method(h_class, RGW_BI_PUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_op, &h_rgw_bi_put_op);
5144+
cls_register_cxx_method(h_class, RGW_BI_PUT_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_entries, &h_rgw_bi_put_entries_op);
50465145
cls_register_cxx_method(h_class, RGW_BI_LIST, CLS_METHOD_RD, rgw_bi_list_op, &h_rgw_bi_list_op);
50475146
cls_register_cxx_method(h_class, RGW_RESHARD_LOG_TRIM, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_log_trim_op, &h_rgw_reshard_log_trim_op);
50485147

src/cls/rgw/cls_rgw_client.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,21 @@ void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, const rgw_cls_bi
523523
op.exec(RGW_CLASS, RGW_BI_PUT, in);
524524
}
525525

526+
void cls_rgw_bi_put_entries(librados::ObjectWriteOperation& op,
527+
std::vector<rgw_cls_bi_entry> entries,
528+
bool check_existing)
529+
{
530+
const auto call = rgw_cls_bi_put_entries_op{
531+
.entries = std::move(entries),
532+
.check_existing = check_existing
533+
};
534+
535+
bufferlist in;
536+
encode(call, in);
537+
538+
op.exec(RGW_CLASS, RGW_BI_PUT_ENTRIES, in);
539+
}
540+
526541
/* nb: any entries passed in are replaced with the results of the cls
527542
* call, so caller does not need to clear entries between calls
528543
*/

src/cls/rgw/cls_rgw_client.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,12 @@ int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid,
389389
std::list<rgw_cls_bi_entry> *entries);
390390
int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls_bi_entry& entry);
391391
void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry);
392+
// Write the given array of index entries and update bucket stats accordingly.
393+
// If existing entries may be overwritten, pass check_existing=true to decrement
394+
// their stats first.
395+
void cls_rgw_bi_put_entries(librados::ObjectWriteOperation& op,
396+
std::vector<rgw_cls_bi_entry> entries,
397+
bool check_existing);
392398
int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
393399
const std::string& name, const std::string& marker, uint32_t max,
394400
std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog = false);

src/cls/rgw/cls_rgw_const.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ constexpr int RGWBIAdvanceAndRetryError = -EFBIG;
3535
#define RGW_BI_GET "bi_get"
3636
#define RGW_BI_GET_VALS "bi_get_vals"
3737
#define RGW_BI_PUT "bi_put"
38+
#define RGW_BI_PUT_ENTRIES "bi_put_entries"
3839
#define RGW_BI_LIST "bi_list"
3940

4041
#define RGW_RESHARD_LOG_TRIM "reshard_log_trim"

src/cls/rgw/cls_rgw_ops.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,3 +580,9 @@ void cls_rgw_get_bucket_resharding_op::generate_test_instances(
580580
void cls_rgw_get_bucket_resharding_op::dump(Formatter *f) const
581581
{
582582
}
583+
584+
void rgw_cls_bi_put_entries_op::dump(Formatter *f) const
585+
{
586+
encode_json("entries", entries, f);
587+
encode_json("check_existing", check_existing, f);
588+
}

src/cls/rgw/cls_rgw_ops.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,35 @@ struct rgw_cls_bi_put_op {
777777
};
778778
WRITE_CLASS_ENCODER(rgw_cls_bi_put_op)
779779

780+
struct rgw_cls_bi_put_entries_op {
781+
std::vector<rgw_cls_bi_entry> entries;
782+
bool check_existing = false;
783+
784+
void encode(ceph::buffer::list& bl) const {
785+
ENCODE_START(1, 1, bl);
786+
encode(entries, bl);
787+
encode(check_existing, bl);
788+
ENCODE_FINISH(bl);
789+
}
790+
791+
void decode(ceph::buffer::list::const_iterator& bl) {
792+
DECODE_START(1, bl);
793+
decode(entries, bl);
794+
decode(check_existing, bl);
795+
DECODE_FINISH(bl);
796+
}
797+
798+
void dump(ceph::Formatter *f) const;
799+
800+
static void generate_test_instances(std::list<rgw_cls_bi_put_entries_op*>& o) {
801+
o.push_back(new rgw_cls_bi_put_entries_op);
802+
o.push_back(new rgw_cls_bi_put_entries_op);
803+
o.back()->entries.push_back({.idx = "entry"});
804+
o.back()->check_existing = true;
805+
}
806+
};
807+
WRITE_CLASS_ENCODER(rgw_cls_bi_put_entries_op)
808+
780809
struct rgw_cls_bi_list_op {
781810
uint32_t max;
782811
std::string name_filter; // limit result to one object and its instances

src/test/cls_rgw/test_cls_rgw.cc

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,3 +1479,94 @@ TEST_F(cls_rgw, reshardlog_num)
14791479
index_complete(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, 2, obj1, meta);
14801480
reshardlog_entries(ioctx, bucket_oid, 2u);
14811481
}
1482+
1483+
TEST_F(cls_rgw, bi_put_entries)
1484+
{
1485+
const string src_bucket = str_int("bi_put_entries", 0);
1486+
const string dst_bucket = str_int("bi_put_entries", 1);
1487+
1488+
const cls_rgw_obj_key obj1 = str_int("obj", 1);
1489+
const cls_rgw_obj_key obj2 = str_int("obj", 2);
1490+
const cls_rgw_obj_key obj3 = str_int("obj", 3);
1491+
const cls_rgw_obj_key obj4 = str_int("obj", 4);
1492+
const string tag = str_int("tag", 0);
1493+
const string loc = str_int("loc", 0);
1494+
auto meta = rgw_bucket_dir_entry_meta{
1495+
.category = RGWObjCategory::Main, .size = 8192};
1496+
1497+
// prepare src_bucket and add two objects
1498+
{
1499+
ObjectWriteOperation op;
1500+
cls_rgw_bucket_init_index2(op);
1501+
ASSERT_EQ(0, ioctx.operate(src_bucket, &op));
1502+
1503+
index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj1, loc);
1504+
index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 1, obj1, meta);
1505+
1506+
index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj2, loc);
1507+
index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 2, obj2, meta);
1508+
1509+
test_stats(ioctx, src_bucket, RGWObjCategory::Main, 2, 16384);
1510+
}
1511+
1512+
// prepare dst_bucket and copy the bi entries
1513+
{
1514+
ObjectWriteOperation op;
1515+
cls_rgw_bucket_init_index2(op);
1516+
ASSERT_EQ(0, ioctx.operate(dst_bucket, &op));
1517+
}
1518+
{
1519+
list<rgw_cls_bi_entry> src_entries;
1520+
bool truncated{false};
1521+
ASSERT_EQ(0, cls_rgw_bi_list(ioctx, src_bucket, "", "", 128,
1522+
&src_entries, &truncated));
1523+
ASSERT_EQ(2u, src_entries.size());
1524+
1525+
ObjectWriteOperation op;
1526+
cls_rgw_bi_put_entries(op, {src_entries.begin(), src_entries.end()}, true);
1527+
ASSERT_EQ(0, ioctx.operate(dst_bucket, &op));
1528+
1529+
test_stats(ioctx, dst_bucket, RGWObjCategory::Main, 2, 16384);
1530+
}
1531+
1532+
{
1533+
// start reshard on src_bucket
1534+
set_reshard_status(ioctx, src_bucket, cls_rgw_reshard_status::IN_LOGRECORD);
1535+
1536+
// delete obj1 and log a ReshardDeleted entry
1537+
index_prepare(ioctx, src_bucket, CLS_RGW_OP_DEL, tag, obj1, loc);
1538+
index_complete(ioctx, src_bucket, CLS_RGW_OP_DEL, tag, 3, obj1, meta);
1539+
1540+
// overwrite obj2 and record its reshardlog entry
1541+
index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj2, loc);
1542+
index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 4, obj2, meta);
1543+
1544+
// add two more objects
1545+
index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj3, loc);
1546+
index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 5, obj3, meta);
1547+
1548+
index_prepare(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, obj4, loc);
1549+
index_complete(ioctx, src_bucket, CLS_RGW_OP_ADD, tag, 6, obj4, meta);
1550+
1551+
test_stats(ioctx, src_bucket, RGWObjCategory::Main, 3, 24576);
1552+
}
1553+
1554+
// copy the reshardlog entries from src_bucket to dst_bucket
1555+
{
1556+
list<rgw_cls_bi_entry> src_entries;
1557+
bool truncated{false};
1558+
const bool reshardlog = true;
1559+
ASSERT_EQ(0, cls_rgw_bi_list(ioctx, src_bucket, "", "", 128,
1560+
&src_entries, &truncated, reshardlog));
1561+
ASSERT_EQ(4u, src_entries.size());
1562+
1563+
const auto& entry = src_entries.front();
1564+
EXPECT_EQ(BIIndexType::ReshardDeleted, entry.type);
1565+
1566+
ObjectWriteOperation op;
1567+
cls_rgw_bi_put_entries(op, {src_entries.begin(), src_entries.end()}, true);
1568+
ASSERT_EQ(0, ioctx.operate(dst_bucket, &op));
1569+
1570+
test_stats(ioctx, dst_bucket, RGWObjCategory::Main, 3, 24576);
1571+
}
1572+
}

src/tools/ceph-dencoder/rgw_types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ TYPE(rgw_cls_bi_get_ret)
9999
TYPE(rgw_cls_bi_list_op)
100100
TYPE(rgw_cls_bi_list_ret)
101101
TYPE(rgw_cls_bi_put_op)
102+
TYPE(rgw_cls_bi_put_entries_op)
102103
TYPE(rgw_cls_obj_check_attrs_prefix)
103104
TYPE(rgw_cls_obj_remove_op)
104105
TYPE(rgw_cls_obj_store_pg_ver_op)

0 commit comments

Comments
 (0)