Skip to content

Commit e4fd504

Browse files
committed
rgw/rados: index operations use async_reads/writes()
replace the classes derived from CLSRGWConcurrentIO with classes derived from Reader/Writer/RevertibleWriter and use the async algorithms Signed-off-by: Casey Bodley <[email protected]>
1 parent 2c6f777 commit e4fd504

File tree

9 files changed

+849
-170
lines changed

9 files changed

+849
-170
lines changed

src/cls/rgw/cls_rgw_client.cc

Lines changed: 77 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -230,17 +230,22 @@ static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
230230
return manager->aio_operate(io_ctx, shard_id, oid, &op);
231231
}
232232

233+
void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& op,
234+
uint64_t timeout)
235+
{
236+
const auto call = rgw_cls_tag_timeout_op{.tag_timeout = timeout};
237+
bufferlist in;
238+
encode(call, in);
239+
op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
240+
}
241+
233242
static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
234243
const int shard_id,
235244
const string& oid,
236245
uint64_t timeout,
237246
BucketIndexAioManager *manager) {
238-
bufferlist in;
239-
rgw_cls_tag_timeout_op call;
240-
call.tag_timeout = timeout;
241-
encode(call, in);
242247
ObjectWriteOperation op;
243-
op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
248+
cls_rgw_bucket_set_tag_timeout(op, timeout);
244249
return manager->aio_operate(io_ctx, shard_id, oid, &op);
245250
}
246251

@@ -725,11 +730,16 @@ int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
725730
return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
726731
}
727732

733+
void cls_rgw_bucket_reshard_log_trim(librados::ObjectWriteOperation& op)
734+
{
735+
bufferlist in;
736+
op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in);
737+
}
738+
728739
static bool issue_reshard_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
729740
BucketIndexAioManager *manager) {
730-
bufferlist in;
731741
ObjectWriteOperation op;
732-
op.exec(RGW_CLASS, RGW_RESHARD_LOG_TRIM, in);
742+
cls_rgw_bucket_reshard_log_trim(op);
733743
return manager->aio_operate(io_ctx, shard_id, oid, &op);
734744
}
735745

@@ -738,6 +748,20 @@ int CLSRGWIssueReshardLogTrim::issue_op(int shard_id, const string& oid)
738748
return issue_reshard_log_trim(io_ctx, oid, shard_id, &manager);
739749
}
740750

751+
void cls_rgw_bucket_check_index(librados::ObjectReadOperation& op,
752+
bufferlist& out)
753+
{
754+
bufferlist in;
755+
op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, &out, nullptr);
756+
}
757+
758+
void cls_rgw_bucket_check_index_decode(const bufferlist& out,
759+
rgw_cls_check_index_ret& result)
760+
{
761+
auto p = out.cbegin();
762+
decode(result, p);
763+
}
764+
741765
static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
742766
rgw_cls_check_index_ret *pdata) {
743767
bufferlist in;
@@ -752,11 +776,16 @@ int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
752776
return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]);
753777
}
754778

779+
void cls_rgw_bucket_rebuild_index(librados::ObjectWriteOperation& op)
780+
{
781+
bufferlist in;
782+
op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
783+
}
784+
755785
static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid,
756786
BucketIndexAioManager *manager) {
757-
bufferlist in;
758787
librados::ObjectWriteOperation op;
759-
op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
788+
cls_rgw_bucket_rebuild_index(op);
760789
return manager->aio_operate(io_ctx, shard_id, oid, &op);
761790
}
762791

@@ -786,11 +815,16 @@ int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid)
786815
0, false, &manager, &result[shard_id]);
787816
}
788817

789-
static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
818+
void cls_rgw_bilog_start(ObjectWriteOperation& op)
790819
{
791820
bufferlist in;
792-
librados::ObjectWriteOperation op;
793821
op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in);
822+
}
823+
824+
static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
825+
{
826+
librados::ObjectWriteOperation op;
827+
cls_rgw_bilog_start(op);
794828
return manager->aio_operate(io_ctx, shard_id, oid, &op);
795829
}
796830

@@ -799,11 +833,16 @@ int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid
799833
return issue_resync_bi_log(io_ctx, shard_id, oid, &manager);
800834
}
801835

802-
static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
836+
void cls_rgw_bilog_stop(ObjectWriteOperation& op)
803837
{
804838
bufferlist in;
805-
librados::ObjectWriteOperation op;
806839
op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in);
840+
}
841+
842+
static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
843+
{
844+
librados::ObjectWriteOperation op;
845+
cls_rgw_bilog_stop(op);
807846
return manager->aio_operate(io_ctx, shard_id, oid, &op);
808847
}
809848

@@ -1214,49 +1253,31 @@ void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_re
12141253
op.exec(RGW_CLASS, RGW_RESHARD_REMOVE, in);
12151254
}
12161255

1217-
int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
1218-
const cls_rgw_bucket_instance_entry& entry)
1219-
{
1220-
bufferlist in, out;
1221-
cls_rgw_set_bucket_resharding_op call;
1222-
call.entry = entry;
1223-
encode(call, in);
1224-
librados::ObjectWriteOperation op;
1225-
op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
1226-
return io_ctx.operate(oid, &op);
1227-
}
1228-
1229-
int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const string& oid)
1256+
void cls_rgw_clear_bucket_resharding(librados::ObjectWriteOperation& op)
12301257
{
1231-
bufferlist in, out;
1258+
bufferlist in;
12321259
cls_rgw_clear_bucket_resharding_op call;
12331260
encode(call, in);
1234-
librados::ObjectWriteOperation op;
12351261
op.exec(RGW_CLASS, RGW_CLEAR_BUCKET_RESHARDING, in);
1236-
return io_ctx.operate(oid, &op);
12371262
}
12381263

1239-
int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
1240-
cls_rgw_bucket_instance_entry *entry)
1264+
void cls_rgw_get_bucket_resharding(librados::ObjectReadOperation& op,
1265+
bufferlist& out)
12411266
{
1242-
bufferlist in, out;
1267+
bufferlist in;
12431268
cls_rgw_get_bucket_resharding_op call;
12441269
encode(call, in);
1245-
int r= io_ctx.exec(oid, RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, out);
1246-
if (r < 0)
1247-
return r;
1270+
op.exec(RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, &out, nullptr);
1271+
}
12481272

1273+
void cls_rgw_get_bucket_resharding_decode(const bufferlist& out,
1274+
cls_rgw_bucket_instance_entry& entry)
1275+
{
12491276
cls_rgw_get_bucket_resharding_ret op_ret;
12501277
auto iter = out.cbegin();
1251-
try {
1252-
decode(op_ret, iter);
1253-
} catch (ceph::buffer::error& err) {
1254-
return -EIO;
1255-
}
1256-
1257-
*entry = op_ret.new_instance;
1278+
decode(op_ret, iter);
12581279

1259-
return 0;
1280+
entry = std::move(op_ret.new_instance);
12601281
}
12611282

12621283
void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err)
@@ -1268,17 +1289,24 @@ void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err)
12681289
op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in);
12691290
}
12701291

1271-
static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
1272-
const int shard_id, const string& oid,
1273-
const cls_rgw_bucket_instance_entry& entry,
1274-
BucketIndexAioManager *manager) {
1292+
void cls_rgw_set_bucket_resharding(librados::ObjectWriteOperation& op,
1293+
cls_rgw_reshard_status status)
1294+
{
12751295
bufferlist in;
12761296
cls_rgw_set_bucket_resharding_op call;
1277-
call.entry = entry;
1297+
call.entry.reshard_status = status;
12781298
encode(call, in);
1279-
librados::ObjectWriteOperation op;
1299+
12801300
op.assert_exists(); // the shard must exist; if not fail rather than recreate
12811301
op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
1302+
}
1303+
1304+
static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
1305+
const int shard_id, const string& oid,
1306+
const cls_rgw_bucket_instance_entry& entry,
1307+
BucketIndexAioManager *manager) {
1308+
librados::ObjectWriteOperation op;
1309+
cls_rgw_set_bucket_resharding(op, entry.reshard_status);
12821310
return manager->aio_operate(io_ctx, shard_id, oid, &op);
12831311
}
12841312

src/cls/rgw/cls_rgw_client.h

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,8 @@ class CLSRGWConcurrentIO {
302302
int operator()();
303303
}; // class CLSRGWConcurrentIO
304304

305+
void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& op,
306+
uint64_t timeout);
305307

306308
class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
307309
protected:
@@ -542,6 +544,12 @@ class CLSRGWIssueReshardLogTrim : public CLSRGWConcurrentIO {
542544
CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
543545
};
544546

547+
void cls_rgw_bucket_check_index(librados::ObjectReadOperation& op,
548+
bufferlist& out);
549+
// decode the response; may throw buffer::error
550+
void cls_rgw_bucket_check_index_decode(const bufferlist& out,
551+
rgw_cls_check_index_ret& result);
552+
545553
/**
546554
* Check the bucket index.
547555
*
@@ -563,6 +571,8 @@ class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /*<std::map<std::string
563571
virtual ~CLSRGWIssueBucketCheck() override {}
564572
};
565573

574+
void cls_rgw_bucket_rebuild_index(librados::ObjectWriteOperation& op);
575+
566576
class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
567577
protected:
568578
int issue_op(int shard_id, const std::string& oid) override;
@@ -594,6 +604,9 @@ class CLSRGWIssueSetBucketResharding : public CLSRGWConcurrentIO {
594604
virtual ~CLSRGWIssueSetBucketResharding() override {}
595605
};
596606

607+
void cls_rgw_bilog_start(librados::ObjectWriteOperation& op);
608+
void cls_rgw_bilog_stop(librados::ObjectWriteOperation& op);
609+
597610
class CLSRGWIssueResyncBucketBILog : public CLSRGWConcurrentIO {
598611
protected:
599612
int issue_op(int shard_id, const std::string& oid);
@@ -684,12 +697,15 @@ int cls_rgw_reshard_get(librados::IoCtx& io_ctx, const std::string& oid, cls_rgw
684697
// cls_rgw in the T+4 (X) release.
685698
void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err);
686699

687-
// these overloads which call io_ctx.operate() should not be called in the rgw.
688-
// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()
689-
#ifndef CLS_CLIENT_HIDE_IOCTX
690-
int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid,
691-
const cls_rgw_bucket_instance_entry& entry);
692-
int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid);
693-
int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid,
694-
cls_rgw_bucket_instance_entry *entry);
695-
#endif
700+
void cls_rgw_set_bucket_resharding(librados::ObjectWriteOperation& op,
701+
cls_rgw_reshard_status status);
702+
void cls_rgw_clear_bucket_resharding(librados::ObjectWriteOperation& op);
703+
void cls_rgw_get_bucket_resharding(librados::ObjectReadOperation& op,
704+
bufferlist& out);
705+
// decode the entry; may throw buffer::error
706+
void cls_rgw_get_bucket_resharding_decode(const bufferlist& out,
707+
cls_rgw_bucket_instance_entry& entry);
708+
709+
// Try to remove all reshard log entries from the bucket index. Return success
710+
// if any entries were removed, and -ENODATA once they're all gone.
711+
void cls_rgw_bucket_reshard_log_trim(librados::ObjectWriteOperation& op);

src/cls/rgw/cls_rgw_ops.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77

88
struct rgw_cls_tag_timeout_op
99
{
10-
uint64_t tag_timeout;
11-
12-
rgw_cls_tag_timeout_op() : tag_timeout(0) {}
10+
uint64_t tag_timeout = 0;
1311

1412
void encode(ceph::buffer::list &bl) const {
1513
ENCODE_START(1, 1, bl);

0 commit comments

Comments
 (0)