@@ -804,121 +804,64 @@ struct complete_op_data {
804804 }
805805};
806806
807- class RGWIndexCompletionThread : public RGWRadosThread , public DoutPrefixProvider {
808- RGWRados *store;
809-
810- uint64_t interval_msec () override {
811- return 0 ;
812- }
813-
814- list<complete_op_data *> completions;
815-
816- ceph::mutex completions_lock =
817- ceph::make_mutex (" RGWIndexCompletionThread::completions_lock" );
818- public:
819- RGWIndexCompletionThread (RGWRados *_store)
820- : RGWRadosThread(_store, " index-complete" ), store(_store) {}
821-
822- int process (const DoutPrefixProvider *dpp) override ;
823-
824- void add_completion (complete_op_data *completion) {
825- {
826- std::lock_guard l{completions_lock};
827- completions.push_back (completion);
828- }
829-
830- signal ();
831- }
832-
833- CephContext *get_cct () const override { return store->ctx (); }
834- unsigned get_subsys () const { return dout_subsys; }
835- std::ostream& gen_prefix (std::ostream& out) const { return out << " rgw index completion thread: " ; }
836- };
837-
838- int RGWIndexCompletionThread::process (const DoutPrefixProvider *dpp)
839- {
840- list<complete_op_data *> comps;
841-
842- {
843- std::lock_guard l{completions_lock};
844- completions.swap (comps);
845- }
846-
847- for (auto c : comps) {
848- std::unique_ptr<complete_op_data> up{c};
807+ class RGWIndexCompletionManager {
808+ RGWRados* const store;
809+ const int num_shards;
810+ ceph::containers::tiny_vector<ceph::mutex> locks;
811+ std::vector<set<complete_op_data*>> completions;
812+ std::vector<complete_op_data*> retry_completions;
849813
850- if ( going_down ()) {
851- continue ;
852- }
853- ldpp_dout ( this , 20 ) << __func__ << " (): handling completion for key= " << c-> key << dendl ;
814+ std::condition_variable cond;
815+ std::mutex retry_completions_lock ;
816+ bool _stop{ false };
817+ std::thread retry_thread ;
854818
855- RGWRados::BucketShard bs (store);
856- RGWBucketInfo bucket_info;
819+ std::atomic<int > cur_shard {0 };
857820
858- int r = bs.init (c->obj .bucket , c->obj , &bucket_info, this );
859- if (r < 0 ) {
860- ldpp_dout (this , 0 ) << " ERROR: " << __func__ << " (): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl;
861- /* not much to do */
862- continue ;
821+ void process ();
822+
823+ void add_completion (complete_op_data *completion);
824+
825+ void stop () {
826+ if (retry_thread.joinable ()) {
827+ _stop = true ;
828+ cond.notify_all ();
829+ retry_thread.join ();
863830 }
864831
865- r = store->guard_reshard (this , &bs, c->obj , bucket_info,
866- [&](RGWRados::BucketShard *bs) -> int {
867- librados::ObjectWriteOperation o;
868- cls_rgw_guard_bucket_resharding (o, -ERR_BUSY_RESHARDING);
869- cls_rgw_bucket_complete_op (o, c->op , c->tag , c->ver , c->key , c->dir_meta , &c->remove_objs ,
870- c->log_op , c->bilog_op , &c->zones_trace );
871- return bs->bucket_obj .operate (this , &o, null_yield);
872- });
873- if (r < 0 ) {
874- ldpp_dout (this , 0 ) << " ERROR: " << __func__ << " (): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
875- /* ignoring error, can't do anything about it */
876- continue ;
877- }
878- r = store->svc .datalog_rados ->add_entry (this , bucket_info, bs.shard_id );
879- if (r < 0 ) {
880- ldpp_dout (this , -1 ) << " ERROR: failed writing data log" << dendl;
832+ for (int i = 0 ; i < num_shards; ++i) {
833+ std::lock_guard l{locks[i]};
834+ for (auto c : completions[i]) {
835+ c->stop ();
836+ }
881837 }
838+ completions.clear ();
839+ }
840+
841+ int next_shard () {
842+ int result = cur_shard % num_shards;
843+ cur_shard++;
844+ return result;
882845 }
883-
884- return 0 ;
885- }
886-
887- class RGWIndexCompletionManager {
888- RGWRados *store{nullptr };
889- ceph::containers::tiny_vector<ceph::mutex> locks;
890- vector<set<complete_op_data *> > completions;
891-
892- RGWIndexCompletionThread *completion_thread{nullptr };
893-
894- int num_shards;
895-
896- std::atomic<int > cur_shard {0 };
897-
898846
899847public:
900848 RGWIndexCompletionManager (RGWRados *_store) :
901849 store (_store),
850+ num_shards (store->ctx ()->_conf->rgw_thread_pool_size),
902851 locks{ceph::make_lock_container<ceph::mutex>(
903- store-> ctx ()-> _conf -> rgw_thread_pool_size ,
852+ num_shards ,
904853 [](const size_t i) {
905854 return ceph::make_mutex (" RGWIndexCompletionManager::lock::" +
906855 std::to_string (i));
907- })}
908- {
909- num_shards = store-> ctx ()-> _conf -> rgw_thread_pool_size ;
910- completions. resize (num_shards);
911- }
856+ })},
857+ completions (num_shards),
858+ retry_thread(&RGWIndexCompletionManager::process, this )
859+ {}
860+
912861 ~RGWIndexCompletionManager () {
913862 stop ();
914863 }
915864
916- int next_shard () {
917- int result = cur_shard % num_shards;
918- cur_shard++;
919- return result;
920- }
921-
922865 void create_completion (const rgw_obj& obj,
923866 RGWModifyOp op, string& tag,
924867 rgw_bucket_entry_ver& ver,
@@ -928,36 +871,17 @@ class RGWIndexCompletionManager {
928871 uint16_t bilog_op,
929872 rgw_zone_set *zones_trace,
930873 complete_op_data **result);
931- bool handle_completion (completion_t cb, complete_op_data *arg);
932874
933- int start (const DoutPrefixProvider *dpp) {
934- completion_thread = new RGWIndexCompletionThread (store);
935- int ret = completion_thread->init (dpp);
936- if (ret < 0 ) {
937- return ret;
938- }
939- completion_thread->start ();
940- return 0 ;
941- }
942- void stop () {
943- if (completion_thread) {
944- completion_thread->stop ();
945- delete completion_thread;
946- }
875+ bool handle_completion (completion_t cb, complete_op_data *arg);
947876
948- for (int i = 0 ; i < num_shards; ++i) {
949- std::lock_guard l{locks[i]};
950- for (auto c : completions[i]) {
951- c->stop ();
952- }
953- }
954- completions.clear ();
877+ CephContext* ctx () {
878+ return store->ctx ();
955879 }
956880};
957881
958882static void obj_complete_cb (completion_t cb, void *arg)
959883{
960- complete_op_data *completion = ( complete_op_data *) arg;
884+ complete_op_data *completion = reinterpret_cast < complete_op_data*>( arg) ;
961885 completion->lock .lock ();
962886 if (completion->stopped ) {
963887 completion->lock .unlock (); /* can drop lock, no one else is referencing us */
@@ -971,6 +895,57 @@ static void obj_complete_cb(completion_t cb, void *arg)
971895 }
972896}
973897
898+ void RGWIndexCompletionManager::process ()
899+ {
900+ DoutPrefix dpp (store->ctx (), dout_subsys, " rgw index completion thread: " );
901+ while (!_stop) {
902+ std::vector<complete_op_data*> comps;
903+
904+ {
905+ std::unique_lock l{retry_completions_lock};
906+ cond.wait (l, [this ](){return _stop || !retry_completions.empty ();});
907+ if (_stop) {
908+ return ;
909+ }
910+ retry_completions.swap (comps);
911+ }
912+
913+ for (auto c : comps) {
914+ std::unique_ptr<complete_op_data> up{c};
915+
916+ ldpp_dout (&dpp, 20 ) << __func__ << " (): handling completion for key=" << c->key << dendl;
917+
918+ RGWRados::BucketShard bs (store);
919+ RGWBucketInfo bucket_info;
920+
921+ int r = bs.init (c->obj .bucket , c->obj , &bucket_info, &dpp);
922+ if (r < 0 ) {
923+ ldpp_dout (&dpp, 0 ) << " ERROR: " << __func__ << " (): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl;
924+ /* not much to do */
925+ continue ;
926+ }
927+
928+ r = store->guard_reshard (&dpp, &bs, c->obj , bucket_info,
929+ [&](RGWRados::BucketShard *bs) -> int {
930+ librados::ObjectWriteOperation o;
931+ cls_rgw_guard_bucket_resharding (o, -ERR_BUSY_RESHARDING);
932+ cls_rgw_bucket_complete_op (o, c->op , c->tag , c->ver , c->key , c->dir_meta , &c->remove_objs ,
933+ c->log_op , c->bilog_op , &c->zones_trace );
934+ return bs->bucket_obj .operate (&dpp, &o, null_yield);
935+ });
936+ if (r < 0 ) {
937+ ldpp_dout (&dpp, 0 ) << " ERROR: " << __func__ << " (): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
938+ /* ignoring error, can't do anything about it */
939+ continue ;
940+ }
941+
942+ r = store->svc .datalog_rados ->add_entry (&dpp, bucket_info, bs.shard_id );
943+ if (r < 0 ) {
944+ ldpp_dout (&dpp, -1 ) << " ERROR: failed writing data log" << dendl;
945+ }
946+ }
947+ }
948+ }
974949
975950void RGWIndexCompletionManager::create_completion (const rgw_obj& obj,
976951 RGWModifyOp op, string& tag,
@@ -1014,7 +989,16 @@ void RGWIndexCompletionManager::create_completion(const rgw_obj& obj,
1014989 entry->rados_completion = librados::Rados::aio_create_completion (entry, obj_complete_cb);
1015990
1016991 std::lock_guard l{locks[shard_id]};
1017- completions[shard_id].insert (entry);
992+ const auto ok = completions[shard_id].insert (entry).second ;
993+ ceph_assert (ok);
994+ }
995+
996+ void RGWIndexCompletionManager::add_completion (complete_op_data *completion) {
997+ {
998+ std::lock_guard l{retry_completions_lock};
999+ retry_completions.push_back (completion);
1000+ }
1001+ cond.notify_all ();
10181002}
10191003
10201004bool RGWIndexCompletionManager::handle_completion (completion_t cb, complete_op_data *arg)
@@ -1027,6 +1011,7 @@ bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_d
10271011
10281012 auto iter = comps.find (arg);
10291013 if (iter == comps.end ()) {
1014+ ldout (arg->manager ->ctx (), 0 ) << __func__ << " (): cannot find completion for obj=" << arg->key << dendl;
10301015 return true ;
10311016 }
10321017
@@ -1035,9 +1020,13 @@ bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_d
10351020
10361021 int r = rados_aio_get_return_value (cb);
10371022 if (r != -ERR_BUSY_RESHARDING) {
1023+ ldout (arg->manager ->ctx (), 20 ) << __func__ << " (): completion " <<
1024+ (r == 0 ? " ok" : " failed with " + to_string (r)) <<
1025+ " for obj=" << arg->key << dendl;
10381026 return true ;
10391027 }
1040- completion_thread->add_completion (arg);
1028+ add_completion (arg);
1029+ ldout (arg->manager ->ctx (), 20 ) << __func__ << " (): async completion added for obj=" << arg->key << dendl;
10411030 return false ;
10421031}
10431032
@@ -1359,10 +1348,6 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp)
13591348 }
13601349
13611350 index_completion_manager = new RGWIndexCompletionManager (this );
1362- ret = index_completion_manager->start (dpp);
1363- if (ret < 0 ) {
1364- return ret;
1365- }
13661351 ret = rgw::notify::init (cct, store, dpp);
13671352 if (ret < 0 ) {
13681353 ldpp_dout (dpp, 1 ) << " ERROR: failed to initialize notification manager" << dendl;
0 commit comments