@@ -129,6 +129,8 @@ namespace rgw::dedup {
129129 this ->remote_pause_req = false ;
130130 this ->remote_paused = false ;
131131 this ->remote_restart_req = false ;
132+ this ->bucket_index_throttle .disable ();
133+ this ->metadata_access_throttle .disable ();
132134 }
133135
134136 // ---------------------------------------------------------------------------
@@ -147,6 +149,8 @@ namespace rgw::dedup {
147149 encode (ctl.remote_pause_req , bl);
148150 encode (ctl.remote_paused , bl);
149151 encode (ctl.remote_restart_req , bl);
152+ encode (ctl.bucket_index_throttle , bl);
153+ encode (ctl.metadata_access_throttle , bl);
150154 ENCODE_FINISH (bl);
151155 }
152156
@@ -168,6 +172,8 @@ namespace rgw::dedup {
168172 decode (ctl.remote_pause_req , bl);
169173 decode (ctl.remote_paused , bl);
170174 decode (ctl.remote_restart_req , bl);
175+ decode (ctl.bucket_index_throttle , bl);
176+ decode (ctl.metadata_access_throttle , bl);
171177 DECODE_FINISH (bl);
172178 }
173179
@@ -209,6 +215,13 @@ namespace rgw::dedup {
209215 out << " ::remote_restart_req" ;
210216 }
211217
218+ if (!ctl.bucket_index_throttle .is_disabled ()) {
219+ out << " ::bucket_index_throttle=" << ctl.bucket_index_throttle .get_max_calls_per_second ();
220+ }
221+ if (!ctl.metadata_access_throttle .is_disabled ()) {
222+ out << " ::metadata_throttle=" << ctl.metadata_access_throttle .get_max_calls_per_second ();
223+ }
224+
212225 return out;
213226 }
214227
@@ -534,6 +547,7 @@ namespace rgw::dedup {
534547 librados::IoCtx ioctx = obj.ioctx ;
535548 ldpp_dout (dpp, 20 ) << __func__ << " ::removing tail object: " << raw_obj.oid
536549 << dendl;
550+ d_ctl.metadata_access_throttle .acquire ();
537551 ret = ioctx.remove (raw_obj.oid );
538552 }
539553
@@ -567,6 +581,8 @@ namespace rgw::dedup {
567581 }
568582
569583 ObjectWriteOperation op;
584+ d_ctl.metadata_access_throttle .acquire ();
585+ ldpp_dout (dpp, 20 ) << __func__ << " ::dec ref-count on tail object: " << raw_obj.oid << dendl;
570586 cls_refcount_put (op, ref_tag, true );
571587 rgw::AioResultList completed = aio->get (obj.obj ,
572588 rgw::Aio::librados_op (obj.ioctx , std::move (op), null_yield),
@@ -602,6 +618,7 @@ namespace rgw::dedup {
602618
603619 ObjectWriteOperation op;
604620 cls_refcount_get (op, ref_tag, true );
621+ d_ctl.metadata_access_throttle .acquire ();
605622 ldpp_dout (dpp, 20 ) << __func__ << " ::inc ref-count on tail object: " << raw_obj.oid << dendl;
606623 rgw::AioResultList completed = aio->get (obj.obj ,
607624 rgw::Aio::librados_op (obj.ioctx , std::move (op), null_yield),
@@ -782,6 +799,7 @@ namespace rgw::dedup {
782799 ldpp_dout (dpp, 20 ) << __func__ << " ::ref_tag=" << ref_tag << dendl;
783800 int ret = inc_ref_count_by_manifest (ref_tag, src_oid, src_manifest);
784801 if (ret == 0 ) {
802+ d_ctl.metadata_access_throttle .acquire ();
785803 ldpp_dout (dpp, 20 ) << __func__ << " ::send TGT CLS (Shared_Manifest)" << dendl;
786804 ret = tgt_ioctx.operate (tgt_oid, &tgt_op);
787805 if (unlikely (ret != 0 )) {
@@ -809,6 +827,7 @@ namespace rgw::dedup {
809827 p_stats->set_hash_attrs ++;
810828 }
811829
830+ d_ctl.metadata_access_throttle .acquire ();
812831 ldpp_dout (dpp, 20 ) << __func__ <<" ::send SRC CLS (Shared_Manifest)" << dendl;
813832 ret = src_ioctx.operate (src_oid, &src_op);
814833 if (unlikely (ret != 0 )) {
@@ -1075,6 +1094,7 @@ namespace rgw::dedup {
10751094 return 0 ;
10761095 }
10771096
1097+ d_ctl.metadata_access_throttle .acquire ();
10781098 ret = p_obj->get_obj_attrs (null_yield, dpp);
10791099 if (unlikely (ret < 0 )) {
10801100 p_stats->ingress_failed_get_obj_attrs ++;
@@ -1393,6 +1413,7 @@ namespace rgw::dedup {
13931413 }
13941414 }
13951415
1416+ p_stats->ingress_slabs ++;
13961417 (*p_slab_count)++;
13971418 failure_count = 0 ;
13981419 unsigned slab_rec_count = 0 ;
@@ -1647,6 +1668,7 @@ namespace rgw::dedup {
16471668 const string& oid = oids[current_shard];
16481669 rgw_cls_list_ret result;
16491670 librados::ObjectReadOperation op;
1671+ d_ctl.bucket_index_throttle .acquire ();
16501672 // get bucket-indices of @current_shard
16511673 cls_rgw_bucket_list_op (op, marker, null_prefix, null_delimiter, max_entries,
16521674 list_versions, &result);
@@ -1781,7 +1803,7 @@ namespace rgw::dedup {
17811803 display_table_stat_counters (dpp, p_stats);
17821804
17831805 ldpp_dout (dpp, 10 ) << __func__ << " ::MD5 Loop::" << d_ctl.dedup_type << dendl;
1784- if (d_ctl.dedup_type != dedup_req_type_t ::DEDUP_TYPE_FULL ) {
1806+ if (d_ctl.dedup_type != dedup_req_type_t ::DEDUP_TYPE_EXEC ) {
17851807 for (work_shard_t worker_id = 0 ; worker_id < num_work_shards; worker_id++) {
17861808 remove_slabs (worker_id, md5_shard, slab_count_arr[worker_id]);
17871809 }
@@ -2035,13 +2057,16 @@ namespace rgw::dedup {
20352057 &worker_stats,raw_mem, raw_mem_size);
20362058 if (ret == 0 ) {
20372059 worker_stats.duration = ceph_clock_now () - start_time;
2060+ worker_stats.bidx_throttle_sleep_events = d_ctl.bucket_index_throttle .get_sleep_events ();
2061+ worker_stats.bidx_throttle_sleep_time_usec = d_ctl.bucket_index_throttle .get_sleep_time_usec ();
20382062 d_cluster.mark_work_shard_token_completed (store, worker_id, &worker_stats);
20392063 ldpp_dout (dpp, 10 ) << " stat counters [worker]:\n " << worker_stats << dendl;
20402064 ldpp_dout (dpp, 10 ) << " Shard Process Duration = "
20412065 << worker_stats.duration << dendl;
20422066 }
20432067 // ldpp_dout(dpp, 0) << __func__ << "::sleep for 2 seconds\n" << dendl;
20442068 // std::this_thread::sleep_for(std::chrono::seconds(2));
2069+ // std::this_thread::sleep_forstd::chrono::microseconds(usec_timeout);
20452070 return ret;
20462071 }
20472072
@@ -2059,6 +2084,9 @@ namespace rgw::dedup {
20592084 int ret = objects_dedup_single_md5_shard (&table, md5_shard, &md5_stats, num_work_shards);
20602085 if (ret == 0 ) {
20612086 md5_stats.duration = ceph_clock_now () - start_time;
2087+ md5_stats.md_throttle_sleep_events = d_ctl.metadata_access_throttle .get_sleep_events ();
2088+ md5_stats.md_throttle_sleep_time_usec = d_ctl.metadata_access_throttle .get_sleep_time_usec ();
2089+
20622090 d_cluster.mark_md5_shard_token_completed (store, md5_shard, &md5_stats);
20632091 ldpp_dout (dpp, 10 ) << " stat counters [md5]:\n " << md5_stats << dendl;
20642092 ldpp_dout (dpp, 10 ) << " Shard Process Duration = "
@@ -2248,7 +2276,7 @@ namespace rgw::dedup {
22482276 ldpp_dout (dpp, 10 ) <<__func__ << " ::" << *p_epoch << dendl;
22492277 d_ctl.dedup_type = p_epoch->dedup_type ;
22502278#ifdef FULL_DEDUP_SUPPORT
2251- ceph_assert (d_ctl.dedup_type == dedup_req_type_t ::DEDUP_TYPE_FULL ||
2279+ ceph_assert (d_ctl.dedup_type == dedup_req_type_t ::DEDUP_TYPE_EXEC ||
22522280 d_ctl.dedup_type == dedup_req_type_t ::DEDUP_TYPE_ESTIMATE);
22532281#else
22542282 ceph_assert (d_ctl.dedup_type == dedup_req_type_t ::DEDUP_TYPE_ESTIMATE);
@@ -2291,14 +2319,27 @@ namespace rgw::dedup {
22912319 {
22922320 int ret = 0 ;
22932321 int32_t urgent_msg = URGENT_MSG_NONE;
2322+ auto bl_iter = bl.cbegin ();
22942323 try {
2295- auto bl_iter = bl.cbegin ();
22962324 ceph::decode (urgent_msg, bl_iter);
22972325 } catch (buffer::error& err) {
22982326 ldpp_dout (dpp, 1 ) << __func__ << " ::ERROR: bad urgent_msg" << dendl;
2299- ret = -EINVAL;
2327+ cluster::ack_notify (store, dpp, &d_ctl, notify_id, cookie, -EINVAL);
2328+ return ;
2329+ }
2330+ ldpp_dout (dpp, 5 ) << __func__ << " ::" << get_urgent_msg_names (urgent_msg) << dendl;
2331+
2332+ throttle_msg_t throttle_msg;
2333+ if (urgent_msg == URGENT_MSG_THROTTLE) {
2334+ try {
2335+ decode (throttle_msg, bl_iter);
2336+ ldpp_dout (dpp, 5 ) << __func__ << " ::" << throttle_msg << dendl;
2337+ } catch (buffer::error& err) {
2338+ ldpp_dout (dpp, 1 ) << __func__ << " ::ERROR: bad throttle_msg" << dendl;
2339+ cluster::ack_notify (store, dpp, &d_ctl, notify_id, cookie, -EINVAL);
2340+ return ;
2341+ }
23002342 }
2301- ldpp_dout (dpp, 5 ) << __func__ << " ::-->" << get_urgent_msg_names (urgent_msg) << dendl;
23022343
23032344 // use lock to prevent concurrent pause/resume requests
23042345 std::unique_lock cond_lock (d_cond_mutex); // [------>open lock block
@@ -2359,6 +2400,24 @@ namespace rgw::dedup {
23592400 ldpp_dout (dpp, 5 ) << __func__ << " ::dedup is not paused->nothing to do" << dendl;
23602401 }
23612402 break ;
2403+ case URGENT_MSG_THROTTLE:
2404+ for (auto action : throttle_msg.vec ) {
2405+ if (action.op_type == BUCKET_INDEX_OP) {
2406+ d_ctl.bucket_index_throttle .set_max_calls_per_sec (action.limit );
2407+ }
2408+ else if (action.op_type == METADATA_ACCESS_OP) {
2409+ d_ctl.metadata_access_throttle .set_max_calls_per_sec (action.limit );
2410+ }
2411+ else if (action.op_type == STAT) {
2412+ ldpp_dout (dpp, 10 ) << __func__ << " ::Throttle STAT" << dendl;
2413+ }
2414+ else {
2415+ ldpp_dout (dpp, 1 ) << __func__ << " ::unexpected throttle_msg "
2416+ << action.op_type << dendl;
2417+ ret = -EINVAL;
2418+ }
2419+ }
2420+ break ;
23622421 default :
23632422 ldpp_dout (dpp, 1 ) << __func__ << " ::unexpected urgent_msg: "
23642423 << get_urgent_msg_names (urgent_msg) << dendl;
0 commit comments