@@ -1084,6 +1084,19 @@ static inline bool notification_match(reservation_t& res,
10841084 return true ;
10851085}
10861086
1087+
1088+ static inline uint64_t get_target_shard (const DoutPrefixProvider* dpp, const std::string& bucket_name, const std::string& object_key, const uint64_t num_shards) {
1089+ std::hash<std::string> hash_fn;
1090+ std::string hash_key = fmt::format (" {}:{}" , bucket_name, object_key);
1091+ size_t hash = hash_fn (hash_key);
1092+ ldpp_dout (dpp, 20 ) << " INFO: Hash Value (hash) is: " << hash << " . Hash Key: " << bucket_name << " :" << object_key << dendl;
1093+ return hash % num_shards;
1094+ }
1095+
1096+ static inline std::string get_shard_name (const std::string& topic_name, const uint64_t & shard_id) {
1097+ return (shard_id == 0 ) ? topic_name : fmt::format (" {}.{}" , topic_name, shard_id);
1098+ }
1099+
10871100int publish_reserve (const DoutPrefixProvider* dpp,
10881101 const SiteConfig& site,
10891102 const EventTypeList& event_types,
@@ -1145,22 +1158,29 @@ int publish_reserve(const DoutPrefixProvider* dpp,
11451158 }
11461159
11471160 cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
1161+ uint64_t target_shard = 0 ;
11481162 if (topic_cfg.dest .persistent ) {
11491163 // TODO: take default reservation size from conf
11501164 constexpr auto DEFAULT_RESERVATION = 4 * 1024U ; // 4K
11511165 res.size = DEFAULT_RESERVATION;
11521166 librados::ObjectWriteOperation op;
11531167 bufferlist obl;
11541168 int rval;
1155- const auto & queue_name = topic_cfg.dest .persistent_queue ;
1169+ const std::string bucket_name = res.bucket ->get_name ();
1170+ const std::string object_key = res.object_name ? *res.object_name : res.object ->get_name ();
1171+ const uint64_t num_shards = topic_cfg.dest .num_shards ;
1172+ target_shard = get_target_shard (
1173+ dpp, bucket_name, object_key, num_shards);
1174+ const auto shard_name = get_shard_name (topic_cfg.dest .persistent_queue , target_shard);
1175+ ldpp_dout (res.dpp , 1 ) << " INFO: target_shard: " << shard_name << dendl;
11561176 cls_2pc_queue_reserve (op, res.size , 1 , &obl, &rval);
11571177 auto ret = rgw_rados_operate (
1158- res.dpp , res.store ->getRados ()->get_notif_pool_ctx (), queue_name ,
1178+ res.dpp , res.store ->getRados ()->get_notif_pool_ctx (), shard_name ,
11591179 std::move (op), res.yield , librados::OPERATION_RETURNVEC);
11601180 if (ret < 0 ) {
11611181 ldpp_dout (res.dpp , 1 )
11621182 << " ERROR: failed to reserve notification on queue: "
1163- << queue_name << " . error: " << ret << dendl;
1183+ << shard_name << " . error: " << ret << dendl;
11641184 // if no space is left in queue we ask client to slow down
11651185 return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
11661186 }
@@ -1173,7 +1193,7 @@ int publish_reserve(const DoutPrefixProvider* dpp,
11731193 }
11741194 }
11751195
1176- res.topics .emplace_back (topic_filter.s3_id , topic_cfg, res_id, event_type);
1196+ res.topics .emplace_back (topic_filter.s3_id , topic_cfg, res_id, event_type, target_shard );
11771197 }
11781198 }
11791199 return 0 ;
@@ -1209,25 +1229,27 @@ int publish_commit(rgw::sal::Object* obj,
12091229 event_entry.retry_sleep_duration = topic.cfg .dest .retry_sleep_duration ;
12101230 bufferlist bl;
12111231 encode (event_entry, bl);
1212- const auto & queue_name = topic.cfg .dest .persistent_queue ;
1232+ uint64_t target_shard = topic.shard_id ;
1233+ const auto shard_name = get_shard_name (topic.cfg .dest .persistent_queue , target_shard);
1234+ ldpp_dout (res.dpp , 1 ) << " INFO: target_shard: " << shard_name << dendl;
12131235 if (bl.length () > res.size ) {
12141236 // try to make a larger reservation, fail only if this is not possible
12151237 ldpp_dout (dpp, 5 ) << " WARNING: committed size: " << bl.length ()
12161238 << " exceeded reserved size: " << res.size
12171239 <<
1218- " . trying to make a larger reservation on queue:" << queue_name
1240+ " . trying to make a larger reservation on queue:" << shard_name
12191241 << dendl;
12201242 // first cancel the existing reservation
12211243 librados::ObjectWriteOperation op;
12221244 cls_2pc_queue_abort (op, topic.res_id );
12231245 auto ret = rgw_rados_operate (
12241246 dpp, res.store ->getRados ()->get_notif_pool_ctx (),
1225- queue_name , std::move (op),
1247+ shard_name , std::move (op),
12261248 res.yield );
12271249 if (ret < 0 ) {
12281250 ldpp_dout (dpp, 1 ) << " ERROR: failed to abort reservation: "
12291251 << topic.res_id <<
1230- " when trying to make a larger reservation on queue: " << queue_name
1252+ " when trying to make a larger reservation on queue: " << shard_name
12311253 << " . error: " << ret << dendl;
12321254 return ret;
12331255 }
@@ -1238,10 +1260,10 @@ int publish_commit(rgw::sal::Object* obj,
12381260 cls_2pc_queue_reserve (op, bl.length (), 1 , &obl, &rval);
12391261 ret = rgw_rados_operate (
12401262 dpp, res.store ->getRados ()->get_notif_pool_ctx (),
1241- queue_name , std::move (op), res.yield , librados::OPERATION_RETURNVEC);
1263+ shard_name , std::move (op), res.yield , librados::OPERATION_RETURNVEC);
12421264 if (ret < 0 ) {
12431265 ldpp_dout (dpp, 1 ) << " ERROR: failed to reserve extra space on queue: "
1244- << queue_name
1266+ << shard_name
12451267 << " . error: " << ret << dendl;
12461268 return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
12471269 }
@@ -1256,12 +1278,12 @@ int publish_commit(rgw::sal::Object* obj,
12561278 librados::ObjectWriteOperation op;
12571279 cls_2pc_queue_commit (op, bl_data_vec, topic.res_id );
12581280 topic.res_id = cls_2pc_reservation::NO_ID;
1259- auto pcc_arg = make_unique<PublishCommitCompleteArg>(queue_name , dpp->get_cct ());
1281+ auto pcc_arg = make_unique<PublishCommitCompleteArg>(shard_name , dpp->get_cct ());
12601282 aio_completion_ptr completion{librados::Rados::aio_create_completion (pcc_arg.get (), publish_commit_completion)};
12611283 auto & io_ctx = res.store ->getRados ()->get_notif_pool_ctx ();
1262- if (const int ret = io_ctx.aio_operate (queue_name , completion.get (), &op); ret < 0 ) {
1284+ if (const int ret = io_ctx.aio_operate (shard_name , completion.get (), &op); ret < 0 ) {
12631285 ldpp_dout (dpp, 1 ) << " ERROR: failed to commit reservation to queue: "
1264- << queue_name << " . error: " << ret << dendl;
1286+ << shard_name << " . error: " << ret << dendl;
12651287 return ret;
12661288 }
12671289 // args will be released inside the callback
@@ -1304,16 +1326,18 @@ int publish_abort(reservation_t& res) {
13041326 // nothing to abort or already committed/aborted
13051327 continue ;
13061328 }
1307- const auto & queue_name = topic.cfg .dest .persistent_queue ;
1329+ uint64_t target_shard = topic.shard_id ;
1330+ const auto shard_name = get_shard_name (topic.cfg .dest .persistent_queue , target_shard);
1331+ ldpp_dout (res.dpp , 1 ) << " INFO: target_shard: " << shard_name << dendl;
13081332 librados::ObjectWriteOperation op;
13091333 cls_2pc_queue_abort (op, topic.res_id );
13101334 const auto ret = rgw_rados_operate (
13111335 res.dpp , res.store ->getRados ()->get_notif_pool_ctx (),
1312- queue_name , std::move (op), res.yield );
1336+ shard_name , std::move (op), res.yield );
13131337 if (ret < 0 ) {
13141338 ldpp_dout (res.dpp , 1 ) << " ERROR: failed to abort reservation: "
13151339 << topic.res_id <<
1316- " from queue: " << queue_name << " . error: " << ret << dendl;
1340+ " from queue: " << shard_name << " . error: " << ret << dendl;
13171341 return ret;
13181342 }
13191343 topic.res_id = cls_2pc_reservation::NO_ID;
@@ -1322,23 +1346,33 @@ int publish_abort(reservation_t& res) {
13221346}
13231347
13241348int get_persistent_queue_stats (const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
1325- const std::string &queue_name , rgw_topic_stats &stats, optional_yield y)
1349+ ShardNamesView shards , rgw_topic_stats &stats, optional_yield y)
13261350{
13271351 // TODO: use optional_yield instead calling rados_ioctx.operate() synchronously
13281352 cls_2pc_reservations reservations;
1329- auto ret = cls_2pc_queue_list_reservations (rados_ioctx, queue_name, reservations);
1330- if (ret < 0 ) {
1331- ldpp_dout (dpp, 1 ) << " ERROR: failed to read queue list reservation: " << ret << dendl;
1332- return ret;
1333- }
1334- stats.queue_reservations = reservations.size ();
1335-
1336- ret = cls_2pc_queue_get_topic_stats (rados_ioctx, queue_name, stats.queue_entries , stats.queue_size );
1337- if (ret < 0 ) {
1338- ldpp_dout (dpp, 1 ) << " ERROR: failed to get the queue size or the number of entries: " << ret << dendl;
1339- return ret;
1353+ uint32_t shard_entries;
1354+ uint64_t shard_size;
1355+
1356+ stats.queue_reservations = 0 ;
1357+ stats.queue_size = 0 ;
1358+ stats.queue_entries = 0 ;
1359+ for (const auto & shard_name: shards){
1360+ auto ret = cls_2pc_queue_list_reservations (rados_ioctx, shard_name, reservations);
1361+ if (ret < 0 ) {
1362+ ldpp_dout (dpp, 1 ) << " ERROR: failed to read shard: " << shard_name << " 's list reservation: " << ret << dendl;
1363+ return ret;
1364+ }
1365+ stats.queue_reservations += reservations.size ();
1366+ shard_entries = 0 ;
1367+ shard_size = 0 ;
1368+ ret = cls_2pc_queue_get_topic_stats (rados_ioctx, shard_name, shard_entries, shard_size);
1369+ stats.queue_size += shard_size;
1370+ stats.queue_entries += shard_entries;
1371+ if (ret < 0 ) {
1372+ ldpp_dout (dpp, 1 ) << " ERROR: failed to get the size or number of entries for queue shard: " << shard_name << ret << dendl;
1373+ return ret;
1374+ }
13401375 }
1341-
13421376 return 0 ;
13431377}
13441378
0 commit comments