Skip to content

Commit 744b88c

Browse files
authored
Merge pull request ceph#57249 from cbodley/wip-65668
rgw/notify: decouple add/remove_persistent_topic() from Manager Reviewed-by: Krunal Chheda <kchheda3@bloomberg.net> Reviewed-by: Yuval Lifshitz <ylifshit@redhat.com>
2 parents d11b25e + 8e53d68 commit 744b88c

File tree

13 files changed

+123
-60
lines changed

13 files changed

+123
-60
lines changed

src/rgw/driver/rados/rgw_notify.cc

Lines changed: 33 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ void publish_commit_completion(rados_completion_t completion, void *arg) {
124124

125125
class Manager : public DoutPrefixProvider {
126126
bool shutdown = false;
127-
const size_t max_queue_size;
128127
const uint32_t queues_update_period_ms;
129128
const uint32_t queues_update_retry_ms;
130129
const uint32_t queue_idle_sleep_us;
@@ -764,12 +763,11 @@ class Manager : public DoutPrefixProvider {
764763
}
765764

766765
// ctor: start all threads
767-
Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms,
766+
Manager(CephContext* _cct, uint32_t _queues_update_period_ms,
768767
uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms,
769768
uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
770769
uint32_t _worker_count, rgw::sal::RadosStore* store,
771770
const SiteConfig& site) :
772-
max_queue_size(_max_queue_size),
773771
queues_update_period_ms(_queues_update_period_ms),
774772
queues_update_retry_ms(_queues_update_retry_ms),
775773
queue_idle_sleep_us(_queue_idle_sleep_us),
@@ -783,39 +781,6 @@ class Manager : public DoutPrefixProvider {
783781
site(site),
784782
rados_store(*store)
785783
{}
786-
787-
int add_persistent_topic(const std::string& topic_queue, optional_yield y) {
788-
if (topic_queue == Q_LIST_OBJECT_NAME) {
789-
ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
790-
return -EINVAL;
791-
}
792-
librados::ObjectWriteOperation op;
793-
op.create(true);
794-
cls_2pc_queue_init(op, topic_queue, max_queue_size);
795-
auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
796-
auto ret = rgw_rados_operate(this, rados_ioctx, topic_queue, &op, y);
797-
if (ret == -EEXIST) {
798-
// queue already exists - nothing to do
799-
ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_queue << " already exists. nothing to do" << dendl;
800-
return 0;
801-
}
802-
if (ret < 0) {
803-
// failed to create queue
804-
ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_queue << ". error: " << ret << dendl;
805-
return ret;
806-
}
807-
808-
bufferlist empty_bl;
809-
std::map<std::string, bufferlist> new_topic{{topic_queue, empty_bl}};
810-
op.omap_set(new_topic);
811-
ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
812-
if (ret < 0) {
813-
ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_queue << " to queue list. error: " << ret << dendl;
814-
return ret;
815-
}
816-
ldpp_dout(this, 20) << "INFO: queue: " << topic_queue << " added to queue list" << dendl;
817-
return 0;
818-
}
819784
};
820785

821786
std::unique_ptr<Manager> s_manager;
@@ -839,7 +804,7 @@ bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
839804
return false;
840805
}
841806
// TODO: take conf from CephContext
842-
s_manager = std::make_unique<Manager>(dpp->get_cct(), MAX_QUEUE_SIZE,
807+
s_manager = std::make_unique<Manager>(dpp->get_cct(),
843808
Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC,
844809
IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC,
845810
STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
@@ -856,11 +821,38 @@ void shutdown() {
856821
s_manager.reset();
857822
}
858823

859-
int add_persistent_topic(const std::string& topic_name, optional_yield y) {
860-
if (!s_manager) {
861-
return -EAGAIN;
824+
int add_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx,
825+
const std::string& topic_queue, optional_yield y)
826+
{
827+
if (topic_queue == Q_LIST_OBJECT_NAME) {
828+
ldpp_dout(dpp, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
829+
return -EINVAL;
862830
}
863-
return s_manager->add_persistent_topic(topic_name, y);
831+
librados::ObjectWriteOperation op;
832+
op.create(true);
833+
cls_2pc_queue_init(op, topic_queue, MAX_QUEUE_SIZE);
834+
auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_queue, &op, y);
835+
if (ret == -EEXIST) {
836+
// queue already exists - nothing to do
837+
ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_queue << " already exists. nothing to do" << dendl;
838+
return 0;
839+
}
840+
if (ret < 0) {
841+
// failed to create queue
842+
ldpp_dout(dpp, 1) << "ERROR: failed to create queue for topic: " << topic_queue << ". error: " << ret << dendl;
843+
return ret;
844+
}
845+
846+
bufferlist empty_bl;
847+
std::map<std::string, bufferlist> new_topic{{topic_queue, empty_bl}};
848+
op.omap_set(new_topic);
849+
ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
850+
if (ret < 0) {
851+
ldpp_dout(dpp, 1) << "ERROR: failed to add queue: " << topic_queue << " to queue list. error: " << ret << dendl;
852+
return ret;
853+
}
854+
ldpp_dout(dpp, 20) << "INFO: queue: " << topic_queue << " added to queue list" << dendl;
855+
return 0;
864856
}
865857

866858
int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y) {
@@ -889,13 +881,6 @@ int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rado
889881
return 0;
890882
}
891883

892-
int remove_persistent_topic(const std::string& topic_queue, optional_yield y) {
893-
if (!s_manager) {
894-
return -EAGAIN;
895-
}
896-
return remove_persistent_topic(s_manager.get(), s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
897-
}
898-
899884
rgw::sal::Object* get_object_with_attributes(
900885
const reservation_t& res, rgw::sal::Object* obj) {
901886
// in case of copy obj, the tags and metadata are taken from source

src/rgw/driver/rados/rgw_notify.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,10 @@ void shutdown();
3434

3535
// create persistent delivery queue for a topic (endpoint)
3636
// this operation also add a topic queue to the common (to all RGWs) list of all topics
37-
int add_persistent_topic(const std::string& topic_queue, optional_yield y);
37+
int add_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y);
3838

3939
// remove persistent delivery queue for a topic (endpoint)
4040
// this operation also remove the topic queue from the common (to all RGWs) list of all topics
41-
int remove_persistent_topic(const std::string& topic_queue, optional_yield y);
42-
43-
// same as the above, expect you need to provide the IoCtx, the above uses rgw::notify::Manager::rados_ioctx
4441
int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y);
4542

4643
// struct holding reservation information

src/rgw/driver/rados/rgw_sal_rados.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,6 +1726,22 @@ int RadosStore::list_account_topics(const DoutPrefixProvider* dpp,
17261726
listing.topics, listing.next_marker);
17271727
}
17281728

1729+
int RadosStore::add_persistent_topic(const DoutPrefixProvider* dpp,
1730+
optional_yield y,
1731+
const std::string& topic_queue)
1732+
{
1733+
return rgw::notify::add_persistent_topic(
1734+
dpp, getRados()->get_notif_pool_ctx(), topic_queue, y);
1735+
}
1736+
1737+
int RadosStore::remove_persistent_topic(const DoutPrefixProvider* dpp,
1738+
optional_yield y,
1739+
const std::string& topic_queue)
1740+
{
1741+
return rgw::notify::remove_persistent_topic(
1742+
dpp, getRados()->get_notif_pool_ctx(), topic_queue, y);
1743+
}
1744+
17291745
int RadosStore::remove_bucket_mapping_from_topics(
17301746
const rgw_pubsub_bucket_topics& bucket_topics,
17311747
const std::string& bucket_key,

src/rgw/driver/rados/rgw_sal_rados.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,12 @@ class RadosStore : public StoreDriver {
319319
std::string_view marker,
320320
uint32_t max_items,
321321
TopicList& listing) override;
322+
int add_persistent_topic(const DoutPrefixProvider* dpp,
323+
optional_yield y,
324+
const std::string& topic_queue) override;
325+
int remove_persistent_topic(const DoutPrefixProvider* dpp,
326+
optional_yield y,
327+
const std::string& topic_queue) override;
322328
int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
323329
const std::string& bucket_key,
324330
bool add_mapping,

src/rgw/driver/rados/topic.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,11 @@ class MetadataHandler : public RGWMetadataHandler {
354354
}
355355
if (!info.dest.push_endpoint.empty() && info.dest.persistent &&
356356
!info.dest.persistent_queue.empty()) {
357-
r = rgw::notify::add_persistent_topic(info.dest.persistent_queue, y);
357+
librados::IoCtx ioctx;
358+
r = rgw_init_ioctx(dpp, &rados, zone.notif_pool, ioctx, true, false);
359+
if (r >= 0) {
360+
r = rgw::notify::add_persistent_topic(dpp, ioctx, info.dest.persistent_queue, y);
361+
}
358362
if (r < 0) {
359363
ldpp_dout(dpp, 1) << "ERROR: failed to create queue for persistent topic "
360364
<< info.dest.persistent_queue << " with: " << cpp_strerror(r) << dendl;
@@ -388,7 +392,11 @@ class MetadataHandler : public RGWMetadataHandler {
388392
if (!dest.push_endpoint.empty() && dest.persistent &&
389393
!dest.persistent_queue.empty()) {
390394
// delete persistent topic queue
391-
r = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
395+
librados::IoCtx ioctx;
396+
r = rgw_init_ioctx(dpp, &rados, zone.notif_pool, ioctx, true, false);
397+
if (r >= 0) {
398+
r = rgw::notify::remove_persistent_topic(dpp, ioctx, dest.persistent_queue, y);
399+
}
392400
if (r < 0 && r != -ENOENT) {
393401
ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: "
394402
<< name << " with error: " << r << dendl;

src/rgw/rgw_pubsub.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,7 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
10911091
const rgw_pubsub_dest& dest = topic.dest;
10921092
if (!dest.push_endpoint.empty() && dest.persistent &&
10931093
!dest.persistent_queue.empty()) {
1094-
ret = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
1094+
ret = driver->remove_persistent_topic(dpp, y, dest.persistent_queue);
10951095
if (ret < 0 && ret != -ENOENT) {
10961096
ldpp_dout(dpp, 1) << "WARNING: failed to remove queue for "
10971097
"persistent topic: " << cpp_strerror(ret) << dendl;
@@ -1138,7 +1138,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na
11381138

11391139
if (!dest.push_endpoint.empty() && dest.persistent &&
11401140
!dest.persistent_queue.empty()) {
1141-
ret = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
1141+
ret = driver->remove_persistent_topic(dpp, y, dest.persistent_queue);
11421142
if (ret < 0 && ret != -ENOENT) {
11431143
ldpp_dout(dpp, 1) << "WARNING: failed to remove queue for "
11441144
"persistent topic: " << cpp_strerror(ret) << dendl;

src/rgw/rgw_rest_pubsub.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
415415
dest.persistent_queue = string_cat_reserve(
416416
get_account_or_tenant(s->owner.id), ":", topic_name);
417417

418-
op_ret = rgw::notify::add_persistent_topic(dest.persistent_queue, s->yield);
418+
op_ret = driver->add_persistent_topic(this, y, dest.persistent_queue);
419419
if (op_ret < 0) {
420420
ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for "
421421
"persistent topics. error:"
@@ -874,7 +874,7 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
874874
dest.persistent_queue = string_cat_reserve(
875875
get_account_or_tenant(s->owner.id), ":", topic_name);
876876

877-
op_ret = rgw::notify::add_persistent_topic(dest.persistent_queue, s->yield);
877+
op_ret = driver->add_persistent_topic(this, y, dest.persistent_queue);
878878
if (op_ret < 0) {
879879
ldpp_dout(this, 4)
880880
<< "SetTopicAttributes Action failed to create queue for "
@@ -884,7 +884,7 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
884884
}
885885
} else if (already_persistent) {
886886
// changing the persistent topic to non-persistent.
887-
op_ret = rgw::notify::remove_persistent_topic(result.dest.persistent_queue, s->yield);
887+
op_ret = driver->remove_persistent_topic(this, y, result.dest.persistent_queue);
888888
if (op_ret != -ENOENT && op_ret < 0) {
889889
ldpp_dout(this, 4) << "SetTopicAttributes Action failed to remove queue "
890890
"for persistent topics. error:"

src/rgw/rgw_sal.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,16 @@ class Driver {
542542
std::string_view marker,
543543
uint32_t max_items,
544544
TopicList& listing) = 0;
545+
546+
// TODO: backends should manage persistent topic queues internally on
547+
// write_topic_v2()/remove_topic_v2()
548+
virtual int add_persistent_topic(const DoutPrefixProvider* dpp,
549+
optional_yield y,
550+
const std::string& topic_queue) = 0;
551+
virtual int remove_persistent_topic(const DoutPrefixProvider* dpp,
552+
optional_yield y,
553+
const std::string& topic_queue) = 0;
554+
545555
/** Update the bucket-topic mapping in the store, if |add_mapping|=true then
546556
* adding the |bucket_key| |topic| mapping to store, else delete the
547557
* |bucket_key| |topic| mapping from the store. The |bucket_key| is

src/rgw/rgw_sal_dbstore.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1912,6 +1912,20 @@ namespace rgw::sal {
19121912
return -ENOTSUP;
19131913
}
19141914

1915+
int DBStore::add_persistent_topic(const DoutPrefixProvider* dpp,
1916+
optional_yield y,
1917+
const std::string& topic_queue)
1918+
{
1919+
return -ENOTSUP;
1920+
}
1921+
1922+
int DBStore::remove_persistent_topic(const DoutPrefixProvider* dpp,
1923+
optional_yield y,
1924+
const std::string& topic_queue)
1925+
{
1926+
return -ENOTSUP;
1927+
}
1928+
19151929
RGWLC* DBStore::get_rgwlc(void) {
19161930
return lc;
19171931
}

src/rgw/rgw_sal_dbstore.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,13 @@ class DBNotification : public StoreNotification {
887887
uint32_t max_items,
888888
TopicList& listing) override;
889889

890+
int add_persistent_topic(const DoutPrefixProvider* dpp,
891+
optional_yield y,
892+
const std::string& topic_queue) override;
893+
int remove_persistent_topic(const DoutPrefixProvider* dpp,
894+
optional_yield y,
895+
const std::string& topic_queue) override;
896+
890897
virtual RGWLC* get_rgwlc(void) override;
891898
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
892899
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;

0 commit comments

Comments
 (0)