Skip to content

Commit 8c25005

Browse files
9401adarshyuvalif
authored andcommitted
rgw/notifications: sharded bucket notifications
* sharding is based on hash of bucket_name:object_key * compatible with v1 topics * include tests to cover backward compatibility and mixed cluster Signed-off-by: Adarsh <[email protected]>
1 parent d425653 commit 8c25005

File tree

12 files changed

+390
-111
lines changed

12 files changed

+390
-111
lines changed

PendingReleaseNotes

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,3 +762,11 @@ Relevant tracker: https://tracker.ceph.com/issues/64777
762762
when decoding, resulting in Linux codes on the wire, and host codes on the receiver.
763763
All CEPHFS_E* defines have been removed across Ceph (including the Python binding).
764764
Relevant tracker: https://tracker.ceph.com/issues/64611
765+
766+
* RGW: Persistent bucket notifications will use queues with multiple shards instead of one queue. Number of shards
767+
can be configured using the `rgw` option `rgw_bucket_persistent_notif_num_shards`. Note that pre-existing topics will continue to function as is, i.e, they are mapped to only one RADOS object.
768+
769+
For more details, see:
770+
https://docs.ceph.com/en/latest/radosgw/notifications/
771+
772+
Relevant tracker: https://tracker.ceph.com/issues/71677

doc/radosgw/notifications.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,18 @@ which tells the client that it may retry later.
8585
.. tip:: To minimize the latency added by asynchronous notification, we
8686
recommended placing the "log" pool on fast media.
8787

88+
Persistent bucket notifications are managed by the following central configuration options:
89+
90+
.. confval:: rgw_bucket_persistent_notif_num_shards
91+
92+
.. note:: When a topic is created during a Ceph upgrade, per-key reordering of notifications may
93+
happen on any bucket mapped to that topic.
94+
95+
.. note:: Persistent topics that were created on a radosgw that does not support sharding, will be treated as a single shard topics
96+
97+
.. tip:: It is also recommended that you avoid modifying or deleting topics created during
98+
upgrades, as this might result in orphan RADOS objects that will not be deleted when the topic is deleted.
99+
88100

89101
Topic Management via CLI
90102
------------------------

src/common/options/rgw.yaml.in

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4511,3 +4511,12 @@ options:
45114511
services:
45124512
- rgw
45134513
with_legacy: true
4514+
- name: rgw_bucket_persistent_notif_num_shards
4515+
type: uint
4516+
level: advanced
4517+
desc: Number of shards for a persistent topic.
4518+
long_desc: Number of shards of persistent topics. The notifications will be sharded by a combination of
4519+
the bucket and key name. Changing the number effect only new topics and does not change exiting ones.
4520+
default: 11
4521+
services:
4522+
- rgw

src/rgw/driver/rados/rgw_notify.cc

Lines changed: 63 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,6 +1084,19 @@ static inline bool notification_match(reservation_t& res,
10841084
return true;
10851085
}
10861086

1087+
1088+
static inline uint64_t get_target_shard(const DoutPrefixProvider* dpp, const std::string& bucket_name, const std::string& object_key, const uint64_t num_shards) {
1089+
std::hash<std::string> hash_fn;
1090+
std::string hash_key = fmt::format("{}:{}", bucket_name, object_key);
1091+
size_t hash = hash_fn(hash_key);
1092+
ldpp_dout(dpp, 20) << "INFO: Hash Value (hash) is: " << hash << ". Hash Key: " << bucket_name << ":" << object_key << dendl;
1093+
return hash % num_shards;
1094+
}
1095+
1096+
static inline std::string get_shard_name(const std::string& topic_name, const uint64_t& shard_id) {
1097+
return (shard_id == 0) ? topic_name : fmt::format("{}.{}", topic_name, shard_id);
1098+
}
1099+
10871100
int publish_reserve(const DoutPrefixProvider* dpp,
10881101
const SiteConfig& site,
10891102
const EventTypeList& event_types,
@@ -1145,22 +1158,29 @@ int publish_reserve(const DoutPrefixProvider* dpp,
11451158
}
11461159

11471160
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
1161+
uint64_t target_shard = 0;
11481162
if (topic_cfg.dest.persistent) {
11491163
// TODO: take default reservation size from conf
11501164
constexpr auto DEFAULT_RESERVATION = 4 * 1024U; // 4K
11511165
res.size = DEFAULT_RESERVATION;
11521166
librados::ObjectWriteOperation op;
11531167
bufferlist obl;
11541168
int rval;
1155-
const auto& queue_name = topic_cfg.dest.persistent_queue;
1169+
const std::string bucket_name = res.bucket->get_name();
1170+
const std::string object_key = res.object_name ? *res.object_name : res.object->get_name();
1171+
const uint64_t num_shards = topic_cfg.dest.num_shards;
1172+
target_shard = get_target_shard(
1173+
dpp, bucket_name, object_key, num_shards);
1174+
const auto shard_name = get_shard_name(topic_cfg.dest.persistent_queue, target_shard);
1175+
ldpp_dout(res.dpp, 1) << "INFO: target_shard: " << shard_name << dendl;
11561176
cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
11571177
auto ret = rgw_rados_operate(
1158-
res.dpp, res.store->getRados()->get_notif_pool_ctx(), queue_name,
1178+
res.dpp, res.store->getRados()->get_notif_pool_ctx(), shard_name,
11591179
std::move(op), res.yield, librados::OPERATION_RETURNVEC);
11601180
if (ret < 0) {
11611181
ldpp_dout(res.dpp, 1)
11621182
<< "ERROR: failed to reserve notification on queue: "
1163-
<< queue_name << ". error: " << ret << dendl;
1183+
<< shard_name << ". error: " << ret << dendl;
11641184
// if no space is left in queue we ask client to slow down
11651185
return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
11661186
}
@@ -1173,7 +1193,7 @@ int publish_reserve(const DoutPrefixProvider* dpp,
11731193
}
11741194
}
11751195

1176-
res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, event_type);
1196+
res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, event_type, target_shard);
11771197
}
11781198
}
11791199
return 0;
@@ -1209,25 +1229,27 @@ int publish_commit(rgw::sal::Object* obj,
12091229
event_entry.retry_sleep_duration = topic.cfg.dest.retry_sleep_duration;
12101230
bufferlist bl;
12111231
encode(event_entry, bl);
1212-
const auto& queue_name = topic.cfg.dest.persistent_queue;
1232+
uint64_t target_shard = topic.shard_id;
1233+
const auto shard_name = get_shard_name(topic.cfg.dest.persistent_queue, target_shard);
1234+
ldpp_dout(res.dpp, 1) << "INFO: target_shard: " << shard_name << dendl;
12131235
if (bl.length() > res.size) {
12141236
// try to make a larger reservation, fail only if this is not possible
12151237
ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
12161238
<< " exceeded reserved size: " << res.size
12171239
<<
1218-
" . trying to make a larger reservation on queue:" << queue_name
1240+
" . trying to make a larger reservation on queue:" << shard_name
12191241
<< dendl;
12201242
// first cancel the existing reservation
12211243
librados::ObjectWriteOperation op;
12221244
cls_2pc_queue_abort(op, topic.res_id);
12231245
auto ret = rgw_rados_operate(
12241246
dpp, res.store->getRados()->get_notif_pool_ctx(),
1225-
queue_name, std::move(op),
1247+
shard_name, std::move(op),
12261248
res.yield);
12271249
if (ret < 0) {
12281250
ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
12291251
<< topic.res_id <<
1230-
" when trying to make a larger reservation on queue: " << queue_name
1252+
" when trying to make a larger reservation on queue: " << shard_name
12311253
<< ". error: " << ret << dendl;
12321254
return ret;
12331255
}
@@ -1238,10 +1260,10 @@ int publish_commit(rgw::sal::Object* obj,
12381260
cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
12391261
ret = rgw_rados_operate(
12401262
dpp, res.store->getRados()->get_notif_pool_ctx(),
1241-
queue_name, std::move(op), res.yield, librados::OPERATION_RETURNVEC);
1263+
shard_name, std::move(op), res.yield, librados::OPERATION_RETURNVEC);
12421264
if (ret < 0) {
12431265
ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: "
1244-
<< queue_name
1266+
<< shard_name
12451267
<< ". error: " << ret << dendl;
12461268
return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
12471269
}
@@ -1256,12 +1278,12 @@ int publish_commit(rgw::sal::Object* obj,
12561278
librados::ObjectWriteOperation op;
12571279
cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
12581280
topic.res_id = cls_2pc_reservation::NO_ID;
1259-
auto pcc_arg = make_unique<PublishCommitCompleteArg>(queue_name, dpp->get_cct());
1281+
auto pcc_arg = make_unique<PublishCommitCompleteArg>(shard_name, dpp->get_cct());
12601282
aio_completion_ptr completion{librados::Rados::aio_create_completion(pcc_arg.get(), publish_commit_completion)};
12611283
auto& io_ctx = res.store->getRados()->get_notif_pool_ctx();
1262-
if (const int ret = io_ctx.aio_operate(queue_name, completion.get(), &op); ret < 0) {
1284+
if (const int ret = io_ctx.aio_operate(shard_name, completion.get(), &op); ret < 0) {
12631285
ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
1264-
<< queue_name << ". error: " << ret << dendl;
1286+
<< shard_name << ". error: " << ret << dendl;
12651287
return ret;
12661288
}
12671289
// args will be released inside the callback
@@ -1304,16 +1326,18 @@ int publish_abort(reservation_t& res) {
13041326
// nothing to abort or already committed/aborted
13051327
continue;
13061328
}
1307-
const auto& queue_name = topic.cfg.dest.persistent_queue;
1329+
uint64_t target_shard = topic.shard_id;
1330+
const auto shard_name = get_shard_name(topic.cfg.dest.persistent_queue, target_shard);
1331+
ldpp_dout(res.dpp, 1) << "INFO: target_shard: " << shard_name << dendl;
13081332
librados::ObjectWriteOperation op;
13091333
cls_2pc_queue_abort(op, topic.res_id);
13101334
const auto ret = rgw_rados_operate(
13111335
res.dpp, res.store->getRados()->get_notif_pool_ctx(),
1312-
queue_name, std::move(op), res.yield);
1336+
shard_name, std::move(op), res.yield);
13131337
if (ret < 0) {
13141338
ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: "
13151339
<< topic.res_id <<
1316-
" from queue: " << queue_name << ". error: " << ret << dendl;
1340+
" from queue: " << shard_name << ". error: " << ret << dendl;
13171341
return ret;
13181342
}
13191343
topic.res_id = cls_2pc_reservation::NO_ID;
@@ -1322,23 +1346,33 @@ int publish_abort(reservation_t& res) {
13221346
}
13231347

13241348
int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
1325-
const std::string &queue_name, rgw_topic_stats &stats, optional_yield y)
1349+
ShardNamesView shards, rgw_topic_stats &stats, optional_yield y)
13261350
{
13271351
// TODO: use optional_yield instead calling rados_ioctx.operate() synchronously
13281352
cls_2pc_reservations reservations;
1329-
auto ret = cls_2pc_queue_list_reservations(rados_ioctx, queue_name, reservations);
1330-
if (ret < 0) {
1331-
ldpp_dout(dpp, 1) << "ERROR: failed to read queue list reservation: " << ret << dendl;
1332-
return ret;
1333-
}
1334-
stats.queue_reservations = reservations.size();
1335-
1336-
ret = cls_2pc_queue_get_topic_stats(rados_ioctx, queue_name, stats.queue_entries, stats.queue_size);
1337-
if (ret < 0) {
1338-
ldpp_dout(dpp, 1) << "ERROR: failed to get the queue size or the number of entries: " << ret << dendl;
1339-
return ret;
1353+
uint32_t shard_entries;
1354+
uint64_t shard_size;
1355+
1356+
stats.queue_reservations = 0;
1357+
stats.queue_size = 0;
1358+
stats.queue_entries = 0;
1359+
for(const auto& shard_name: shards){
1360+
auto ret = cls_2pc_queue_list_reservations(rados_ioctx, shard_name, reservations);
1361+
if (ret < 0) {
1362+
ldpp_dout(dpp, 1) << "ERROR: failed to read shard: "<< shard_name << "'s list reservation: " << ret << dendl;
1363+
return ret;
1364+
}
1365+
stats.queue_reservations += reservations.size();
1366+
shard_entries = 0;
1367+
shard_size = 0;
1368+
ret = cls_2pc_queue_get_topic_stats(rados_ioctx, shard_name, shard_entries, shard_size);
1369+
stats.queue_size += shard_size;
1370+
stats.queue_entries += shard_entries;
1371+
if (ret < 0) {
1372+
ldpp_dout(dpp, 1) << "ERROR: failed to get the size or number of entries for queue shard: " << shard_name << ret << dendl;
1373+
return ret;
1374+
}
13401375
}
1341-
13421376
return 0;
13431377
}
13441378

src/rgw/driver/rados/rgw_notify.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,19 @@ struct reservation_t {
4747
struct topic_t {
4848
topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg,
4949
cls_2pc_reservation::id_t _res_id,
50-
rgw::notify::EventType _event_type)
50+
rgw::notify::EventType _event_type, uint64_t shard_id)
5151
: configurationId(_configurationId),
5252
cfg(_cfg),
5353
res_id(_res_id),
54-
event_type(_event_type) {}
54+
event_type(_event_type),
55+
shard_id(shard_id){}
5556

5657
const std::string configurationId;
5758
const rgw_pubsub_topic cfg;
5859
// res_id is reset after topic is committed/aborted
5960
cls_2pc_reservation::id_t res_id;
6061
rgw::notify::EventType event_type;
62+
uint64_t shard_id;
6163
};
6264

6365
const DoutPrefixProvider* const dpp;
@@ -132,7 +134,7 @@ int publish_commit(rgw::sal::Object* obj,
132134
int publish_abort(reservation_t& reservation);
133135

134136
int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
135-
const std::string &queue_name, rgw_topic_stats &stats, optional_yield y);
137+
ShardNamesView shards, rgw_topic_stats &stats, optional_yield y);
136138

137139
}
138140

src/rgw/radosgw-admin/radosgw-admin.cc

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11914,9 +11914,9 @@ int main(int argc, const char **argv)
1191411914
rgw::notify::rgw_topic_stats stats;
1191511915
ret = rgw::notify::get_persistent_queue_stats(
1191611916
dpp(), ioctx,
11917-
topic.dest.persistent_queue, stats, null_yield);
11917+
topic.dest.get_shard_names(), stats, null_yield);
1191811918
if (ret < 0) {
11919-
cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
11919+
cerr << "ERROR: could not get persistent queues: " << cpp_strerror(-ret) << std::endl;
1192011920
return -ret;
1192111921
}
1192211922
encode_json("", stats, formatter.get());
@@ -11948,37 +11948,41 @@ int main(int argc, const char **argv)
1194811948
std::string end_marker;
1194911949
librados::ObjectReadOperation rop;
1195011950
std::vector<cls_queue_entry> queue_entries;
11951-
bool truncated = true;
11951+
bool truncated;
1195211952
formatter->open_array_section("eventEntries");
11953-
while (truncated) {
11954-
bufferlist bl;
11955-
int rc;
11956-
cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc);
11957-
ioctx.operate(topic.dest.persistent_queue, &rop, nullptr);
11958-
if (rc < 0 ) {
11959-
cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl;
11960-
return -rc;
11961-
}
11962-
rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker);
11963-
if (rc < 0) {
11964-
cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl;
11965-
return -rc;
11966-
}
11967-
11968-
std::for_each(queue_entries.cbegin(),
11969-
queue_entries.cend(),
11970-
[&formatter](const auto& queue_entry) {
11971-
rgw::notify::event_entry_t event_entry;
11972-
bufferlist::const_iterator iter{&queue_entry.data};
11973-
try {
11974-
event_entry.decode(iter);
11975-
encode_json("", event_entry, formatter.get());
11976-
} catch (const buffer::error& e) {
11977-
cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl;
11978-
}
11979-
});
11980-
formatter->flush(cout);
11981-
marker = end_marker;
11953+
11954+
for (const auto& shard_name: topic.dest.get_shard_names()){
11955+
truncated = true;
11956+
marker.clear();
11957+
while (truncated) {
11958+
bufferlist bl;
11959+
int rc;
11960+
cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc);
11961+
ioctx.operate(shard_name, &rop, nullptr);
11962+
if (rc < 0 ) {
11963+
cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl;
11964+
return -rc;
11965+
}
11966+
rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker);
11967+
if (rc < 0) {
11968+
cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl;
11969+
return -rc;
11970+
}
11971+
std::for_each(queue_entries.cbegin(),
11972+
queue_entries.cend(),
11973+
[&formatter](const auto& queue_entry) {
11974+
rgw::notify::event_entry_t event_entry;
11975+
bufferlist::const_iterator iter{&queue_entry.data};
11976+
try {
11977+
event_entry.decode(iter);
11978+
encode_json("", event_entry, formatter.get());
11979+
} catch (const buffer::error& e) {
11980+
cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl;
11981+
}
11982+
});
11983+
formatter->flush(cout);
11984+
marker = end_marker;
11985+
}
1198211986
}
1198311987
formatter->close_section();
1198411988
formatter->flush(cout);

0 commit comments

Comments
 (0)