@@ -539,11 +539,12 @@ int RGWPubSub::get_topics(const DoutPrefixProvider* dpp,
539539 rgw_pubsub_topics& result, std::string& next_marker,
540540 optional_yield y) const
541541{
542- if (!use_notification_v2) {
542+ if (!use_notification_v2 || driver->stat_topics_v1 (tenant, y, dpp) != -ENOENT) {
543+ // in case of v1 or during migration we use v1 topics
543544 // v1 returns all topics, ignoring marker/max_items
544545 return read_topics_v1 (dpp, result, nullptr , y);
545546 }
546-
547+
547548 // TODO: prefix filter on 'tenant:'
548549 void * handle = NULL ;
549550 int ret = driver->meta_list_keys_init (dpp, " topic" , start_marker, &handle);
@@ -623,6 +624,13 @@ int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pub
623624 RGWObjVersionTracker *objv_tracker,
624625 optional_yield y) const
625626{
627+ if (ps.use_notification_v2 ) {
628+ if (const auto ret = ps.driver ->stat_topics_v1 (bucket->get_tenant (), y, dpp); ret != -ENOENT) {
629+ ldpp_dout (dpp, 1 ) << " WARNING: " << (ret == 0 ? " topic migration in process" : " cannot determine topic migration status. ret = " + std::to_string (ret))
630+ << " . please try again later" << dendl;
631+ return -ERR_SERVICE_UNAVAILABLE;
632+ }
633+ }
626634 const int ret = bucket->write_topics (topics, objv_tracker, y, dpp);
627635 if (ret < 0 ) {
628636 ldpp_dout (dpp, 1 ) << " ERROR: failed to write bucket topics info: ret=" << ret << dendl;
@@ -637,7 +645,8 @@ int RGWPubSub::get_topic(const DoutPrefixProvider* dpp,
637645 rgw_pubsub_topic& result,
638646 optional_yield y,
639647 std::set<std::string>* subscribed_buckets) const {
640- if (use_notification_v2) {
648+ if (use_notification_v2 && driver->stat_topics_v1 (tenant, y, dpp) == -ENOENT) {
649+ // in case of v1 or during migration we use v1 topics
641650 int ret = driver->read_topic_v2 (name, tenant, result, nullptr , y, dpp);
642651 if (ret < 0 ) {
643652 ldpp_dout (dpp, 1 ) << " failed to read topic info for name: " << name
@@ -962,6 +971,11 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
962971 const std::string& policy_text,
963972 optional_yield y) const {
964973 if (use_notification_v2) {
974+ if (const auto ret = driver->stat_topics_v1 (tenant, y, dpp); ret != -ENOENT) {
975+ ldpp_dout (dpp, 1 ) << " WARNING: " << (ret == 0 ? " topic migration in process" : " cannot determine topic migration status. ret = " + std::to_string (ret))
976+ << " . please try again later" << dendl;
977+ return -ERR_SERVICE_UNAVAILABLE;
978+ }
965979 rgw_pubsub_topic new_topic;
966980 new_topic.user = user;
967981 new_topic.name = name;
@@ -994,6 +1008,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
9941008 ldpp_dout (dpp, 1 ) << " ERROR: failed to write topics info: ret=" << ret << dendl;
9951009 return ret;
9961010 }
1011+ ldpp_dout (dpp, 1 ) << " INFO: successfully created v1 topic" << dendl;
9971012
9981013 return 0 ;
9991014}
@@ -1025,6 +1040,11 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
10251040int RGWPubSub::remove_topic (const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const
10261041{
10271042 if (use_notification_v2) {
1043+ if (const auto ret = driver->stat_topics_v1 (tenant, y, dpp); ret != -ENOENT) {
1044+ ldpp_dout (dpp, 1 ) << " WARNING: " << (ret == 0 ? " topic migration in process" : " cannot determine topic migration status. ret = " + std::to_string (ret))
1045+ << " . please try again later" << dendl;
1046+ return -ERR_SERVICE_UNAVAILABLE;
1047+ }
10281048 return remove_topic_v2 (dpp, name, y);
10291049 }
10301050 RGWObjVersionTracker objv_tracker;
0 commit comments