1212#include < boost/algorithm/string/split.hpp>
1313#include < boost/algorithm/string.hpp>
1414#include < boost/algorithm/string/predicate.hpp>
15+ #include < boost/asio/spawn.hpp>
1516#include < boost/variant.hpp>
1617
1718#include " include/scope_guard.h"
@@ -303,13 +304,13 @@ static bool obj_has_expired(
303304 return (timediff >= cmp);
304305}
305306
306- static bool pass_object_lock_check (rgw::sal::Driver* driver, rgw::sal::Object* obj, const DoutPrefixProvider *dpp)
307+ static bool pass_object_lock_check (rgw::sal::Driver* driver, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y )
307308{
308309 if (!obj->get_bucket ()->get_info ().obj_lock_enabled ()) {
309310 return true ;
310311 }
311312 std::unique_ptr<rgw::sal::Object::ReadOp> read_op = obj->get_read_op ();
312- int ret = read_op->prepare (null_yield , dpp);
313+ int ret = read_op->prepare (y , dpp);
313314 if (ret < 0 ) {
314315 if (ret == -ENOENT) {
315316 return true ;
@@ -697,7 +698,7 @@ class LCOpAction {
697698public:
698699 virtual ~LCOpAction () {}
699700
700- virtual bool check (lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) {
701+ virtual bool check (lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y ) {
701702 return false ;
702703 }
703704
@@ -716,7 +717,7 @@ class LCOpAction {
716717 return true ;
717718 }
718719
719- virtual int process (lc_op_ctx& oc) {
720+ virtual int process (lc_op_ctx& oc, optional_yield y ) {
720721 return 0 ;
721722 }
722723
@@ -726,7 +727,7 @@ class LCOpAction {
726727class LCOpFilter {
727728public:
728729virtual ~LCOpFilter () {}
729- virtual bool check (const DoutPrefixProvider *dpp, lc_op_ctx& oc) {
730+ virtual bool check (const DoutPrefixProvider *dpp, lc_op_ctx& oc, optional_yield y ) {
730731 return false ;
731732 }
732733}; /* LCOpFilter */
@@ -756,7 +757,7 @@ class LCOpRule {
756757 void build ();
757758 void update ();
758759 int process (rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
759- WorkQ* wq);
760+ WorkQ* wq, optional_yield y );
760761}; /* LCOpRule */
761762
762763using WorkItem =
@@ -771,23 +772,23 @@ class WorkQ : public Thread
771772{
772773public:
773774 using unique_lock = std::unique_lock<std::mutex>;
774- using work_f = std::function<void (RGWLC::LCWorker*, WorkQ*, WorkItem&)>;
775- using dequeue_result = std::variant< void *, WorkItem>;
775+ using work_f = std::function<void (RGWLC::LCWorker*, WorkQ*, WorkItem&, optional_yield )>;
776+ using dequeue_result = std::list< WorkItem>;
776777
777778 static constexpr uint32_t FLAG_NONE = 0x0000 ;
778779 static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001 ;
779780 static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002 ;
780781 static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004 ;
781782
782783private:
783- const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
784+ const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi, optional_yield ) {};
784785 RGWLC::LCWorker* wk;
785786 uint32_t qmax;
786787 int ix;
787788 std::mutex mtx;
788789 std::condition_variable cv;
789790 uint32_t flags;
790- vector <WorkItem> items;
791+ std::list <WorkItem> items;
791792 work_f f;
792793
793794public:
@@ -829,7 +830,7 @@ class WorkQ : public Thread
829830 }
830831
831832private:
832- dequeue_result dequeue () {
833+ dequeue_result dequeue (size_t max_items= 1 ) {
833834 unique_lock uniq (mtx);
834835 while ((!wk->get_lc ()->going_down ()) &&
835836 (items.size () == 0 )) {
@@ -841,25 +842,39 @@ class WorkQ : public Thread
841842 cv.wait_for (uniq, 200ms);
842843 }
843844 if (items.size () > 0 ) {
844- auto item = items.back ();
845- items.pop_back ();
845+ size_t split_size = std::min (max_items, items.size ());
846+ dequeue_result result;
847+ result.splice (result.begin (), items, items.begin (), std::next (items.begin (), split_size));
846848 if (flags & FLAG_EWAIT_SYNC) {
847849 flags &= ~FLAG_EWAIT_SYNC;
848850 cv.notify_one ();
849851 }
850- return {item} ;
852+ return result ;
851853 }
852- return nullptr ;
854+ return dequeue_result{} ;
853855 }
854856
855857 void * entry () override {
856858 while (!wk->get_lc ()->going_down ()) {
857- auto item = dequeue ();
858- if (item.index () == 0 ) {
859- /* going down */
860- break ;
859+ boost::asio::io_context context;
860+ for (auto & item : items) {
861+ if (item.index () != 0 ) {
862+ boost::asio::spawn (context, [&](boost::asio::yield_context yield) {
863+ try {
864+ optional_yield y (yield);
865+ f (wk, this , item, y);
866+ } catch (const std::exception& e) {
867+ ldpp_dout (wk->dpp , 0 ) << " Coroutine error: " << e.what () << dendl;
868+ }
869+ });
870+ }
871+ }
872+ try {
873+ context.run ();
874+ } catch (const std::system_error& e) {
875+ ldpp_dout (wk->dpp , 0 ) << " ERROR: WorkQ context run returned error r="
876+ << -e.code ().value () << dendl;
861877 }
862- f (wk, this , std::get<WorkItem>(item));
863878 }
864879 return nullptr ;
865880 }
@@ -1039,11 +1054,11 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
10391054 return 0 ;
10401055} /* RGWLC::handle_multipart_expiration */
10411056
1042- static int read_obj_tags (const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl)
1057+ static int read_obj_tags (const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl, optional_yield y )
10431058{
10441059 std::unique_ptr<rgw::sal::Object::ReadOp> rop = obj->get_read_op ();
10451060
1046- return rop->get_attr (dpp, RGW_ATTR_TAGS, tags_bl, null_yield );
1061+ return rop->get_attr (dpp, RGW_ATTR_TAGS, tags_bl, y );
10471062}
10481063
10491064static bool is_valid_op (const lc_op& op)
@@ -1089,15 +1104,15 @@ static inline bool has_all_tags(const lc_op& rule_action,
10891104 return tag_count == rule_action.obj_tags ->count ();
10901105}
10911106
1092- static int check_tags (const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip)
1107+ static int check_tags (const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip, optional_yield y )
10931108{
10941109 auto & op = oc.op ;
10951110
10961111 if (op.obj_tags != boost::none) {
10971112 *skip = true ;
10981113
10991114 bufferlist tags_bl;
1100- int ret = read_obj_tags (dpp, oc.obj .get (), tags_bl);
1115+ int ret = read_obj_tags (dpp, oc.obj .get (), tags_bl, y );
11011116 if (ret < 0 ) {
11021117 if (ret != -ENODATA) {
11031118 ldpp_dout (oc.dpp , 5 ) << " ERROR: read_obj_tags returned r="
@@ -1129,7 +1144,7 @@ static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip)
11291144
11301145class LCOpFilter_Tags : public LCOpFilter {
11311146public:
1132- bool check (const DoutPrefixProvider *dpp, lc_op_ctx& oc) override {
1147+ bool check (const DoutPrefixProvider *dpp, lc_op_ctx& oc, optional_yield y ) override {
11331148 auto & o = oc.o ;
11341149
11351150 if (o.is_delete_marker ()) {
@@ -1138,7 +1153,7 @@ class LCOpFilter_Tags : public LCOpFilter {
11381153
11391154 bool skip;
11401155
1141- int ret = check_tags (dpp, oc, &skip);
1156+ int ret = check_tags (dpp, oc, &skip, y );
11421157 if (ret < 0 ) {
11431158 if (ret == -ENOENT) {
11441159 return false ;
@@ -1157,7 +1172,7 @@ class LCOpAction_CurrentExpiration : public LCOpAction {
11571172public:
11581173 LCOpAction_CurrentExpiration (op_env& env) {}
11591174
1160- bool check (lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1175+ bool check (lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y ) override {
11611176 auto & o = oc.o ;
11621177 if (!o.is_current ()) {
11631178 ldpp_dout (dpp, 20 ) << __func__ << " (): key=" << o.key
@@ -1252,7 +1267,7 @@ class LCOpAction_NonCurrentExpiration : public LCOpAction {
12521267 LCOpAction_NonCurrentExpiration (op_env& env)
12531268 {}
12541269
1255- bool check (lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1270+ bool check (lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y ) override {
12561271 auto & o = oc.o ;
12571272 if (o.is_current ()) {
12581273 ldpp_dout (dpp, 20 ) << __func__ << " (): key=" << o.key
@@ -1305,7 +1320,7 @@ class LCOpAction_DMExpiration : public LCOpAction {
13051320public:
13061321 LCOpAction_DMExpiration (op_env& env) {}
13071322
1308- bool check (lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1323+ bool check (lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y ) override {
13091324 auto & o = oc.o ;
13101325 if (!o.is_delete_marker ()) {
13111326 ldpp_dout (dpp, 20 ) << __func__ << " (): key=" << o.key
@@ -1360,7 +1375,7 @@ class LCOpAction_Transition : public LCOpAction {
13601375 LCOpAction_Transition (const transition_action& _transition)
13611376 : transition(_transition) {}
13621377
1363- bool check (lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1378+ bool check (lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp, optional_yield y ) override {
13641379 auto & o = oc.o ;
13651380
13661381 if (o.is_delete_marker ()) {
@@ -1405,7 +1420,7 @@ class LCOpAction_Transition : public LCOpAction {
14051420 return need_to_process;
14061421 }
14071422
1408- int delete_tier_obj (lc_op_ctx& oc) {
1423+ int delete_tier_obj (lc_op_ctx& oc, optional_yield y ) {
14091424 int ret = 0 ;
14101425
14111426 /* If bucket has versioning enabled, create delete_marker for current version
@@ -1479,7 +1494,7 @@ class LCOpAction_Transition : public LCOpAction {
14791494 }
14801495
14811496 if (delete_object) {
1482- ret = delete_tier_obj (oc);
1497+ ret = delete_tier_obj (oc, y );
14831498 if (ret < 0 ) {
14841499 ldpp_dout (oc.dpp , 0 ) << " ERROR: Deleting tier object(" << oc.o .key << " ) failed ret=" << ret << dendl;
14851500 return ret;
@@ -1554,13 +1569,13 @@ class LCOpAction_Transition : public LCOpAction {
15541569 if (!r && oc.tier ->is_tier_type_s3 ()) {
15551570 ldpp_dout (oc.dpp , 30 ) << " Found cloud s3 tier: " << target_placement.storage_class << dendl;
15561571 if (!oc.o .is_current () &&
1557- !pass_object_lock_check (oc.driver , oc.obj .get (), oc.dpp )) {
1572+ !pass_object_lock_check (oc.driver , oc.obj .get (), oc.dpp , y )) {
15581573 /* Skip objects which has object lock enabled. */
15591574 ldpp_dout (oc.dpp , 10 ) << " Object(key:" << oc.o .key << " ) is locked. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl;
15601575 return 0 ;
15611576 }
15621577
1563- r = transition_obj_to_cloud (oc);
1578+ r = transition_obj_to_cloud (oc, y );
15641579 if (r < 0 ) {
15651580 ldpp_dout (oc.dpp , 0 ) << " ERROR: failed to transition obj(key:" << oc.o .key << " ) to cloud (r=" << r << " )"
15661581 << dendl;
@@ -1692,7 +1707,7 @@ void LCOpRule::update()
16921707
16931708int LCOpRule::process (rgw_bucket_dir_entry& o,
16941709 const DoutPrefixProvider *dpp,
1695- WorkQ* wq)
1710+ WorkQ* wq, optional_yield y )
16961711{
16971712 lc_op_ctx ctx (env, o, next_key_name, num_noncurrent, effective_mtime, dpp, wq);
16981713 shared_ptr<LCOpAction> *selected = nullptr ; // n.b., req'd by sharing
@@ -1701,7 +1716,7 @@ int LCOpRule::process(rgw_bucket_dir_entry& o,
17011716 for (auto & a : actions) {
17021717 real_time action_exp;
17031718
1704- if (a->check (ctx, &action_exp, dpp)) {
1719+ if (a->check (ctx, &action_exp, dpp, y )) {
17051720 if (action_exp > exp) {
17061721 exp = action_exp;
17071722 selected = &a;
@@ -1723,7 +1738,7 @@ int LCOpRule::process(rgw_bucket_dir_entry& o,
17231738
17241739 bool cont = false ;
17251740 for (auto & f : filters) {
1726- if (f->check (dpp, ctx)) {
1741+ if (f->check (dpp, ctx, y )) {
17271742 cont = true ;
17281743 break ;
17291744 }
@@ -1736,7 +1751,7 @@ int LCOpRule::process(rgw_bucket_dir_entry& o,
17361751 return 0 ;
17371752 }
17381753
1739- int r = (*selected)->process (ctx);
1754+ int r = (*selected)->process (ctx, y );
17401755 if (r < 0 ) {
17411756 ldpp_dout (dpp, 0 ) << " ERROR: remove_expired_obj "
17421757 << env.bucket << " :" << o.key
@@ -1822,7 +1837,7 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
18221837 ldpp_dout (wk->get_lc (), 20 )
18231838 << __func__ << " (): key=" << o.key << wq->thr_name ()
18241839 << dendl;
1825- int ret = op_rule.process (o, wk->dpp , wq);
1840+ int ret = op_rule.process (o, wk->dpp , wq, y );
18261841 if (ret < 0 ) {
18271842 ldpp_dout (wk->get_lc (), 20 )
18281843 << " ERROR: orule.process() returned ret=" << ret
0 commit comments