Skip to content

Commit fe24b4c

Browse files
authored
Merge pull request ceph#60670 from cbodley/wip-cls-rgw-shard-io
rgw/rados: add async algorithms for concurrent index operations Reviewed-by: Adam Emerson <[email protected]>
2 parents 60cee4c + 09d084a commit fe24b4c

35 files changed

+3828
-1042
lines changed

src/cls/rgw/cls_rgw_client.cc

Lines changed: 33 additions & 309 deletions
Large diffs are not rendered by default.

src/cls/rgw/cls_rgw_client.h

Lines changed: 22 additions & 270 deletions
Original file line numberDiff line numberDiff line change
@@ -266,98 +266,8 @@ class BucketIndexShardsManager {
266266
void cls_rgw_bucket_init_index(librados::ObjectWriteOperation& o);
267267
void cls_rgw_bucket_init_index2(librados::ObjectWriteOperation& o);
268268

269-
class CLSRGWConcurrentIO {
270-
protected:
271-
librados::IoCtx& io_ctx;
272-
273-
// map of shard # to oid; the shards that are remaining to be processed
274-
std::map<int, std::string>& objs_container;
275-
// iterator to work through objs_container
276-
std::map<int, std::string>::iterator iter;
277-
278-
uint32_t max_aio;
279-
BucketIndexAioManager manager;
280-
281-
virtual int issue_op(int shard_id, const std::string& oid) = 0;
282-
283-
virtual void cleanup() {}
284-
virtual int valid_ret_code() { return 0; }
285-
// Return true if multiple rounds of OPs might be needed, this happens when
286-
// OP needs to be re-send until a certain code is returned.
287-
virtual bool need_multiple_rounds() { return false; }
288-
// Add a new object to the end of the container.
289-
virtual void add_object(int shard, const std::string& oid) {}
290-
virtual void reset_container(std::map<int, std::string>& objs) {}
291-
292-
public:
293-
294-
CLSRGWConcurrentIO(librados::IoCtx& ioc,
295-
std::map<int, std::string>& _objs_container,
296-
uint32_t _max_aio) :
297-
io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio)
298-
{}
299-
300-
virtual ~CLSRGWConcurrentIO() {}
301-
302-
int operator()();
303-
}; // class CLSRGWConcurrentIO
304-
305-
306-
class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
307-
protected:
308-
int issue_op(int shard_id, const std::string& oid) override;
309-
int valid_ret_code() override { return -EEXIST; }
310-
void cleanup() override;
311-
public:
312-
CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc,
313-
std::map<int, std::string>& _bucket_objs,
314-
uint32_t _max_aio) :
315-
CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
316-
virtual ~CLSRGWIssueBucketIndexInit() override {}
317-
};
318-
319-
320-
class CLSRGWIssueBucketIndexInit2 : public CLSRGWConcurrentIO {
321-
protected:
322-
int issue_op(int shard_id, const std::string& oid) override;
323-
int valid_ret_code() override { return -EEXIST; }
324-
void cleanup() override;
325-
public:
326-
CLSRGWIssueBucketIndexInit2(librados::IoCtx& ioc,
327-
std::map<int, std::string>& _bucket_objs,
328-
uint32_t _max_aio) :
329-
CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
330-
virtual ~CLSRGWIssueBucketIndexInit2() override {}
331-
};
332-
333-
334-
class CLSRGWIssueBucketIndexClean : public CLSRGWConcurrentIO {
335-
protected:
336-
int issue_op(int shard_id, const std::string& oid) override;
337-
int valid_ret_code() override {
338-
return -ENOENT;
339-
}
340-
341-
public:
342-
CLSRGWIssueBucketIndexClean(librados::IoCtx& ioc,
343-
std::map<int, std::string>& _bucket_objs,
344-
uint32_t _max_aio) :
345-
CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio)
346-
{}
347-
virtual ~CLSRGWIssueBucketIndexClean() override {}
348-
};
349-
350-
351-
class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
352-
uint64_t tag_timeout;
353-
protected:
354-
int issue_op(int shard_id, const std::string& oid) override;
355-
public:
356-
CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, std::map<int, std::string>& _bucket_objs,
357-
uint32_t _max_aio, uint64_t _tag_timeout) :
358-
CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {}
359-
virtual ~CLSRGWIssueSetTagTimeout() override {}
360-
};
269+
void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& op,
270+
uint64_t timeout);
361271

362272
void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
363273
bool absolute,
@@ -424,54 +334,6 @@ int cls_rgw_usage_log_trim(librados::IoCtx& io_ctx, const std::string& oid, cons
424334
uint64_t start_epoch, uint64_t end_epoch);
425335
#endif
426336

427-
428-
/**
429-
* Std::list the bucket with the starting object and filter prefix.
430-
* NOTE: this method do listing requests for each bucket index shards identified by
431-
* the keys of the *list_results* std::map, which means the std::map should be populated
432-
* by the caller to fill with each bucket index object id.
433-
*
434-
* io_ctx - IO context for rados.
435-
* start_obj - marker for the listing.
436-
* filter_prefix - filter prefix.
437-
* num_entries - number of entries to request for each object (note the total
438-
* amount of entries returned depends on the number of shardings).
439-
* list_results - the std::list results keyed by bucket index object id.
440-
* max_aio - the maximum number of AIO (for throttling).
441-
*
442-
* Return 0 on success, a failure code otherwise.
443-
*/
444-
445-
class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
446-
cls_rgw_obj_key start_obj;
447-
std::string filter_prefix;
448-
std::string delimiter;
449-
uint32_t num_entries;
450-
bool list_versions;
451-
std::map<int, rgw_cls_list_ret>& result; // request_id -> return value
452-
453-
protected:
454-
int issue_op(int shard_id, const std::string& oid) override;
455-
void reset_container(std::map<int, std::string>& objs) override;
456-
457-
public:
458-
CLSRGWIssueBucketList(librados::IoCtx& io_ctx,
459-
const cls_rgw_obj_key& _start_obj,
460-
const std::string& _filter_prefix,
461-
const std::string& _delimiter,
462-
uint32_t _num_entries,
463-
bool _list_versions,
464-
std::map<int, std::string>& oids, // shard_id -> shard_oid
465-
// shard_id -> return value
466-
std::map<int, rgw_cls_list_ret>& list_results,
467-
uint32_t max_aio) :
468-
CLSRGWConcurrentIO(io_ctx, oids, max_aio),
469-
start_obj(_start_obj), filter_prefix(_filter_prefix), delimiter(_delimiter),
470-
num_entries(_num_entries), list_versions(_list_versions),
471-
result(list_results)
472-
{}
473-
};
474-
475337
void cls_rgw_bucket_list_op(librados::ObjectReadOperation& op,
476338
const cls_rgw_obj_key& start_obj,
477339
const std::string& filter_prefix,
@@ -484,133 +346,20 @@ void cls_rgw_bilog_list(librados::ObjectReadOperation& op,
484346
const std::string& marker, uint32_t max,
485347
cls_rgw_bi_log_list_ret *pdata, int *ret = nullptr);
486348

487-
class CLSRGWIssueBILogList : public CLSRGWConcurrentIO {
488-
std::map<int, cls_rgw_bi_log_list_ret>& result;
489-
BucketIndexShardsManager& marker_mgr;
490-
uint32_t max;
491-
protected:
492-
int issue_op(int shard_id, const std::string& oid) override;
493-
public:
494-
CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
495-
std::map<int, std::string>& oids,
496-
std::map<int, cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
497-
CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists),
498-
marker_mgr(_marker_mgr), max(_max) {}
499-
virtual ~CLSRGWIssueBILogList() override {}
500-
};
501-
502349
void cls_rgw_bilog_trim(librados::ObjectWriteOperation& op,
503350
const std::string& start_marker,
504351
const std::string& end_marker);
505352

506-
class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO {
507-
BucketIndexShardsManager& start_marker_mgr;
508-
BucketIndexShardsManager& end_marker_mgr;
509-
protected:
510-
int issue_op(int shard_id, const std::string& oid) override;
511-
// Trim until -ENODATA is returned.
512-
int valid_ret_code() override { return -ENODATA; }
513-
bool need_multiple_rounds() override { return true; }
514-
void add_object(int shard, const std::string& oid) override { objs_container[shard] = oid; }
515-
void reset_container(std::map<int, std::string>& objs) override {
516-
objs_container.swap(objs);
517-
iter = objs_container.begin();
518-
objs.clear();
519-
}
520-
public:
521-
CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr,
522-
BucketIndexShardsManager& _end_marker_mgr, std::map<int, std::string>& _bucket_objs, uint32_t max_aio) :
523-
CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio),
524-
start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {}
525-
virtual ~CLSRGWIssueBILogTrim() override {}
526-
};
527-
528-
class CLSRGWIssueReshardLogTrim : public CLSRGWConcurrentIO {
529-
protected:
530-
int issue_op(int shard_id, const std::string& oid) override;
531-
// Trim until -ENODATA is returned.
532-
int valid_ret_code() override { return -ENODATA; }
533-
bool need_multiple_rounds() override { return true; }
534-
void add_object(int shard, const std::string& oid) override { objs_container[shard] = oid; }
535-
void reset_container(std::map<int, std::string>& objs) override {
536-
objs_container.swap(objs);
537-
iter = objs_container.begin();
538-
objs.clear();
539-
}
540-
public:
541-
CLSRGWIssueReshardLogTrim(librados::IoCtx& io_ctx, std::map<int, std::string>& _bucket_objs, uint32_t max_aio) :
542-
CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
543-
};
353+
void cls_rgw_bucket_check_index(librados::ObjectReadOperation& op,
354+
bufferlist& out);
355+
// decode the response; may throw buffer::error
356+
void cls_rgw_bucket_check_index_decode(const bufferlist& out,
357+
rgw_cls_check_index_ret& result);
544358

545-
/**
546-
* Check the bucket index.
547-
*
548-
* io_ctx - IO context for rados.
549-
* bucket_objs_ret - check result for all shards.
550-
* max_aio - the maximum number of AIO (for throttling).
551-
*
552-
* Return 0 on success, a failure code otherwise.
553-
*/
554-
class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /*<std::map<std::string, rgw_cls_check_index_ret> >*/ {
555-
std::map<int, rgw_cls_check_index_ret>& result;
556-
protected:
557-
int issue_op(int shard_id, const std::string& oid) override;
558-
public:
559-
CLSRGWIssueBucketCheck(librados::IoCtx& ioc, std::map<int, std::string>& oids,
560-
std::map<int, rgw_cls_check_index_ret>& bucket_objs_ret,
561-
uint32_t _max_aio) :
562-
CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {}
563-
virtual ~CLSRGWIssueBucketCheck() override {}
564-
};
359+
void cls_rgw_bucket_rebuild_index(librados::ObjectWriteOperation& op);
565360

566-
class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
567-
protected:
568-
int issue_op(int shard_id, const std::string& oid) override;
569-
public:
570-
CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, std::map<int, std::string>& bucket_objs,
571-
uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {}
572-
virtual ~CLSRGWIssueBucketRebuild() override {}
573-
};
574-
575-
class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO {
576-
std::map<int, rgw_cls_list_ret>& result;
577-
protected:
578-
int issue_op(int shard_id, const std::string& oid) override;
579-
public:
580-
CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, std::map<int, std::string>& oids, std::map<int, rgw_cls_list_ret>& dir_headers,
581-
uint32_t max_aio) :
582-
CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
583-
virtual ~CLSRGWIssueGetDirHeader() override {}
584-
};
585-
586-
class CLSRGWIssueSetBucketResharding : public CLSRGWConcurrentIO {
587-
cls_rgw_bucket_instance_entry entry;
588-
protected:
589-
int issue_op(int shard_id, const std::string& oid) override;
590-
public:
591-
CLSRGWIssueSetBucketResharding(librados::IoCtx& ioc, std::map<int, std::string>& _bucket_objs,
592-
const cls_rgw_bucket_instance_entry& _entry,
593-
uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), entry(_entry) {}
594-
virtual ~CLSRGWIssueSetBucketResharding() override {}
595-
};
596-
597-
class CLSRGWIssueResyncBucketBILog : public CLSRGWConcurrentIO {
598-
protected:
599-
int issue_op(int shard_id, const std::string& oid);
600-
public:
601-
CLSRGWIssueResyncBucketBILog(librados::IoCtx& io_ctx, std::map<int, std::string>& _bucket_objs, uint32_t max_aio) :
602-
CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
603-
virtual ~CLSRGWIssueResyncBucketBILog() override {}
604-
};
605-
606-
class CLSRGWIssueBucketBILogStop : public CLSRGWConcurrentIO {
607-
protected:
608-
int issue_op(int shard_id, const std::string& oid);
609-
public:
610-
CLSRGWIssueBucketBILogStop(librados::IoCtx& io_ctx, std::map<int, std::string>& _bucket_objs, uint32_t max_aio) :
611-
CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
612-
virtual ~CLSRGWIssueBucketBILogStop() override {}
613-
};
361+
void cls_rgw_bilog_start(librados::ObjectWriteOperation& op);
362+
void cls_rgw_bilog_stop(librados::ObjectWriteOperation& op);
614363

615364
int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, const std::string& oid,
616365
boost::intrusive_ptr<RGWGetDirHeader_CB> cb);
@@ -684,12 +433,15 @@ int cls_rgw_reshard_get(librados::IoCtx& io_ctx, const std::string& oid, cls_rgw
684433
// cls_rgw in the T+4 (X) release.
685434
void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err);
686435

687-
// these overloads which call io_ctx.operate() should not be called in the rgw.
688-
// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()
689-
#ifndef CLS_CLIENT_HIDE_IOCTX
690-
int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid,
691-
const cls_rgw_bucket_instance_entry& entry);
692-
int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid);
693-
int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const std::string& oid,
694-
cls_rgw_bucket_instance_entry *entry);
695-
#endif
436+
void cls_rgw_set_bucket_resharding(librados::ObjectWriteOperation& op,
437+
cls_rgw_reshard_status status);
438+
void cls_rgw_clear_bucket_resharding(librados::ObjectWriteOperation& op);
439+
void cls_rgw_get_bucket_resharding(librados::ObjectReadOperation& op,
440+
bufferlist& out);
441+
// decode the entry; may throw buffer::error
442+
void cls_rgw_get_bucket_resharding_decode(const bufferlist& out,
443+
cls_rgw_bucket_instance_entry& entry);
444+
445+
// Try to remove all reshard log entries from the bucket index. Return success
446+
// if any entries were removed, and -ENODATA once they're all gone.
447+
void cls_rgw_bucket_reshard_log_trim(librados::ObjectWriteOperation& op);

src/cls/rgw/cls_rgw_ops.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77

88
struct rgw_cls_tag_timeout_op
99
{
10-
uint64_t tag_timeout;
11-
12-
rgw_cls_tag_timeout_op() : tag_timeout(0) {}
10+
uint64_t tag_timeout = 0;
1311

1412
void encode(ceph::buffer::list &bl) const {
1513
ENCODE_START(1, 1, bl);

src/rgw/driver/posix/rgw_sal_posix.cc

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2468,7 +2468,7 @@ int POSIXBucket::set_acl(const DoutPrefixProvider* dpp,
24682468
return write_attrs(dpp, y);
24692469
}
24702470

2471-
int POSIXBucket::read_stats(const DoutPrefixProvider *dpp,
2471+
int POSIXBucket::read_stats(const DoutPrefixProvider *dpp, optional_yield y,
24722472
const bucket_index_layout_generation& idx_layout,
24732473
int shard_id, std::string* bucket_ver, std::string* master_ver,
24742474
std::map<RGWObjCategory, RGWStorageStats>& stats,
@@ -2477,14 +2477,14 @@ int POSIXBucket::read_stats(const DoutPrefixProvider *dpp,
24772477
auto& main = stats[RGWObjCategory::Main];
24782478

24792479
// TODO: bucket stats shouldn't have to list all objects
2480-
return dir->for_each(dpp, [this, dpp, &main] (const char* name) {
2480+
return dir->for_each(dpp, [this, dpp, y, &main] (const char* name) {
24812481
if (name[0] == '.') {
24822482
/* Skip dotfiles */
24832483
return 0;
24842484
}
24852485

24862486
std::unique_ptr<FSEnt> dent;
2487-
int ret = dir->get_ent(dpp, null_yield, name, std::string(), dent);
2487+
int ret = dir->get_ent(dpp, y, name, std::string(), dent);
24882488
if (ret < 0) {
24892489
ret = errno;
24902490
ldpp_dout(dpp, 0) << "ERROR: could not get ent for object " << name << ": "
@@ -2617,17 +2617,19 @@ int POSIXBucket::remove_objs_from_index(const DoutPrefixProvider *dpp, std::list
26172617
return 0;
26182618
}
26192619

2620-
int POSIXBucket::check_index(const DoutPrefixProvider *dpp, std::map<RGWObjCategory, RGWStorageStats>& existing_stats, std::map<RGWObjCategory, RGWStorageStats>& calculated_stats)
2620+
int POSIXBucket::check_index(const DoutPrefixProvider *dpp, optional_yield y,
2621+
std::map<RGWObjCategory, RGWStorageStats>& existing_stats,
2622+
std::map<RGWObjCategory, RGWStorageStats>& calculated_stats)
26212623
{
26222624
return 0;
26232625
}
26242626

2625-
int POSIXBucket::rebuild_index(const DoutPrefixProvider *dpp)
2627+
int POSIXBucket::rebuild_index(const DoutPrefixProvider *dpp, optional_yield y)
26262628
{
26272629
return 0;
26282630
}
26292631

2630-
int POSIXBucket::set_tag_timeout(const DoutPrefixProvider *dpp, uint64_t timeout)
2632+
int POSIXBucket::set_tag_timeout(const DoutPrefixProvider *dpp, optional_yield y, uint64_t timeout)
26312633
{
26322634
return 0;
26332635
}
@@ -2907,6 +2909,12 @@ int POSIXObject::list_parts(const DoutPrefixProvider* dpp, CephContext* cct,
29072909
return -EOPNOTSUPP;
29082910
}
29092911

2912+
bool POSIXObject::is_sync_completed(const DoutPrefixProvider* dpp, optional_yield y,
2913+
const ceph::real_time& obj_mtime)
2914+
{
2915+
return false;
2916+
}
2917+
29102918
int POSIXObject::load_obj_state(const DoutPrefixProvider* dpp, optional_yield y, bool follow_olh)
29112919
{
29122920
int ret = stat(dpp);

0 commit comments

Comments
 (0)