Skip to content

Commit 6557309

Browse files
authored
Merge pull request ceph#55657 from ceph/wip-rgw-meta-topic
rgw: replicate v2 topic/notification metadata Reviewed-by: Krunal Chheda <[email protected]> Reviewed-by: Yuval Lifshitz <[email protected]> Reviewed-by: Casey Bodley <[email protected]> Reviewed-by: Matt Benjamin <[email protected]>
2 parents 0a9aa2f + 666e79f commit 6557309

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2467
-395
lines changed

qa/tasks/notification_tests.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,9 @@ def task(ctx,config):
298298
{
299299
'port':endpoint.port,
300300
'host':endpoint.dns_name,
301+
'zonegroup':'default',
302+
'cluster':'noname',
303+
'version':'v2'
301304
},
302305
's3 main':{}
303306
}

src/rgw/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ set(librgw_common_srcs
193193
driver/rados/rgw_trim_mdlog.cc
194194
driver/rados/rgw_user.cc
195195
driver/rados/rgw_zone.cc
196-
driver/rados/sync_fairness.cc)
196+
driver/rados/sync_fairness.cc
197+
driver/rados/topic.cc)
197198

198199
list(APPEND librgw_common_srcs
199200
driver/immutable_config/store.cc

src/rgw/driver/rados/rgw_bucket.cc

Lines changed: 109 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,17 +1336,6 @@ static int bucket_stats(rgw::sal::Driver* driver,
13361336
}
13371337
}
13381338

1339-
// bucket notifications
1340-
RGWPubSub ps(driver, tenant_name);
1341-
rgw_pubsub_bucket_topics result;
1342-
const RGWPubSub::Bucket b(ps, bucket.get());
1343-
ret = b.get_topics(dpp, result, y);
1344-
if (ret < 0 && ret != -ENOENT) {
1345-
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
1346-
return -ret;
1347-
}
1348-
result.dump(formatter);
1349-
13501339
// TODO: bucket CORS
13511340
// TODO: bucket LC
13521341
formatter->close_section();
@@ -2121,6 +2110,92 @@ int RGWMetadataHandlerPut_Bucket::put_post(const DoutPrefixProvider *dpp)
21212110
return ret;
21222111
}
21232112

2113+
int update_bucket_topic_mappings(const DoutPrefixProvider* dpp,
2114+
RGWBucketCompleteInfo* orig_bci,
2115+
RGWBucketCompleteInfo* current_bci,
2116+
rgw::sal::Driver* driver) {
2117+
const auto decode_attrs = [](const rgw::sal::Attrs& attrs,
2118+
rgw_pubsub_bucket_topics& bucket_topics) -> int {
2119+
auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION);
2120+
if (iter == attrs.end()) {
2121+
return 0;
2122+
}
2123+
try {
2124+
const auto& bl = iter->second;
2125+
auto biter = bl.cbegin();
2126+
bucket_topics.decode(biter);
2127+
} catch (buffer::error& err) {
2128+
return -EIO;
2129+
}
2130+
return 0;
2131+
};
2132+
std::string bucket_name;
2133+
std::string bucket_tenant;
2134+
rgw_pubsub_bucket_topics old_bucket_topics;
2135+
if (orig_bci) {
2136+
auto ret = decode_attrs(orig_bci->attrs, old_bucket_topics);
2137+
if (ret < 0) {
2138+
ldpp_dout(dpp, 1)
2139+
<< "ERROR: failed to decode OLD bucket topics for bucket: "
2140+
<< orig_bci->info.bucket.name << dendl;
2141+
return ret;
2142+
}
2143+
bucket_name = orig_bci->info.bucket.name;
2144+
bucket_tenant = orig_bci->info.bucket.tenant;
2145+
}
2146+
rgw_pubsub_bucket_topics current_bucket_topics;
2147+
if (current_bci) {
2148+
auto ret = decode_attrs(current_bci->attrs, current_bucket_topics);
2149+
if (ret < 0) {
2150+
ldpp_dout(dpp, 1)
2151+
<< "ERROR: failed to decode current bucket topics for bucket: "
2152+
<< current_bci->info.bucket.name << dendl;
2153+
return ret;
2154+
}
2155+
bucket_name = current_bci->info.bucket.name;
2156+
bucket_tenant = current_bci->info.bucket.tenant;
2157+
}
2158+
// fetch the list of subscribed topics stored inside old_bucket attrs.
2159+
std::unordered_map<std::string, rgw_pubsub_topic> old_topics;
2160+
for (const auto& [_, topic_filter] : old_bucket_topics.topics) {
2161+
old_topics[topic_filter.topic.name] = topic_filter.topic;
2162+
}
2163+
// fetch the list of subscribed topics stored inside current_bucket attrs.
2164+
std::unordered_map<std::string, rgw_pubsub_topic> current_topics;
2165+
for (const auto& [_, topic_filter] : current_bucket_topics.topics) {
2166+
current_topics[topic_filter.topic.name] = topic_filter.topic;
2167+
}
2168+
// traverse thru old topics and check if they are not in current, then delete
2169+
// the mapping, if present in both current and old then delete from current
2170+
// set as we do not need to update those mapping.
2171+
int ret = 0;
2172+
for (const auto& [topic_name, topic] : old_topics) {
2173+
auto it = current_topics.find(topic_name);
2174+
if (it == current_topics.end()) {
2175+
const auto op_ret = driver->update_bucket_topic_mapping(
2176+
topic, rgw_make_bucket_entry_name(bucket_tenant, bucket_name),
2177+
/*add_mapping=*/false, null_yield, dpp);
2178+
if (op_ret < 0) {
2179+
ret = op_ret;
2180+
}
2181+
} else {
2182+
// already that attr is present, so do not update the mapping.
2183+
current_topics.erase(it);
2184+
}
2185+
}
2186+
// traverse thru current topics and check if they are any present, then add
2187+
// the mapping.
2188+
for (const auto& [topic_name, topic] : current_topics) {
2189+
const auto op_ret = driver->update_bucket_topic_mapping(
2190+
topic, rgw_make_bucket_entry_name(bucket_tenant, bucket_name),
2191+
/*add_mapping=*/true, null_yield, dpp);
2192+
if (op_ret < 0) {
2193+
ret = op_ret;
2194+
}
2195+
}
2196+
return ret;
2197+
}
2198+
21242199
static void get_md5_digest(const RGWBucketEntryPoint *be, string& md5_digest) {
21252200

21262201
char md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
@@ -2432,7 +2507,14 @@ class RGWBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandler
24322507
if (ret < 0 && ret != -ENOENT)
24332508
return ret;
24342509

2435-
return svc.bucket->remove_bucket_instance_info(ctx, entry, bci.info, &bci.info.objv_tracker, y, dpp);
2510+
ret = svc.bucket->remove_bucket_instance_info(
2511+
ctx, entry, bci.info, &bci.info.objv_tracker, y, dpp);
2512+
if (ret < 0)
2513+
return ret;
2514+
ret = update_bucket_topic_mappings(dpp, &bci, /*current_bci=*/nullptr,
2515+
driver);
2516+
// update_bucket_topic_mapping error is swallowed.
2517+
return 0;
24362518
}
24372519

24382520
int call(std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) {
@@ -2637,6 +2719,21 @@ int RGWMetadataHandlerPut_BucketInstance::put_post(const DoutPrefixProvider *dpp
26372719
}
26382720
} /* update lc */
26392721

2722+
/* update bucket topic mapping */
2723+
{
2724+
auto* orig_obj = static_cast<RGWBucketInstanceMetadataObject*>(old_obj);
2725+
auto* orig_bci = (orig_obj ? &orig_obj->get_bci() : nullptr);
2726+
ret = update_bucket_topic_mappings(dpp, orig_bci, &bci, bihandler->driver);
2727+
if (ret < 0) {
2728+
ldpp_dout(dpp, 0) << __func__
2729+
<< " failed to apply bucket topic mapping for "
2730+
<< bci.info.bucket.name << dendl;
2731+
return ret;
2732+
}
2733+
ldpp_dout(dpp, 20) << __func__
2734+
<< " successfully applied bucket topic mapping for "
2735+
<< bci.info.bucket.name << dendl;
2736+
}
26402737
return STATUS_APPLIED;
26412738
}
26422739

src/rgw/driver/rados/rgw_cr_rados.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
809809
req_id, null_yield);
810810

811811
auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
812-
int ret = rgw::notify::publish_reserve(dpp, rgw::notify::ObjectSyncedCreate, notify_res, &obj_tags);
812+
int ret = rgw::notify::publish_reserve(dpp, *store->svc()->site, rgw::notify::ObjectSyncedCreate, notify_res, &obj_tags);
813813
if (ret < 0) {
814814
ldpp_dout(dpp, 1) << "ERROR: reserving notification failed, with error: " << ret << dendl;
815815
// no need to return, the sync already happened
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab ft=cpp
3+
4+
/*
5+
* Ceph - scalable distributed file system
6+
*
7+
* Copyright contributors to the Ceph project
8+
*
9+
* This is free software; you can redistribute it and/or
10+
* modify it under the terms of the GNU Lesser General Public
11+
* License version 2.1, as published by the Free Software
12+
* Foundation. See file COPYING.
13+
*
14+
*/
15+
16+
#pragma once
17+
18+
#include <algorithm>
19+
#include <iterator>
20+
#include <list>
21+
#include <string>
22+
#include <vector>
23+
#include "services/svc_sys_obj.h"
24+
25+
class DoutPrefixProvider;
26+
27+
class RGWMetadataLister {
28+
RGWSI_SysObj::Pool pool;
29+
RGWSI_SysObj::Pool::Op listing;
30+
31+
virtual void filter_transform(std::vector<std::string>& oids,
32+
std::list<std::string>& keys) {
33+
// use all oids as keys
34+
std::move(oids.begin(), oids.end(), std::back_inserter(keys));
35+
}
36+
37+
public:
38+
explicit RGWMetadataLister(RGWSI_SysObj::Pool pool)
39+
: pool(pool), listing(this->pool) {}
40+
virtual ~RGWMetadataLister() {}
41+
42+
int init(const DoutPrefixProvider* dpp,
43+
const std::string& marker,
44+
const std::string& prefix)
45+
{
46+
return listing.init(dpp, marker, prefix);
47+
}
48+
49+
int get_next(const DoutPrefixProvider* dpp, int max,
50+
std::list<std::string>& keys, bool* truncated)
51+
{
52+
std::vector<std::string> oids;
53+
int r = listing.get_next(dpp, max, &oids, truncated);
54+
if (r == -ENOENT) {
55+
if (truncated) {
56+
*truncated = false;
57+
}
58+
return 0;
59+
}
60+
if (r < 0) {
61+
return r;
62+
}
63+
filter_transform(oids, keys);
64+
return 0;
65+
}
66+
67+
std::string get_marker()
68+
{
69+
std::string marker;
70+
listing.get_marker(&marker);
71+
return marker;
72+
}
73+
};

src/rgw/driver/rados/rgw_notify.cc

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
#include "rgw_sal_rados.h"
1515
#include "rgw_pubsub.h"
1616
#include "rgw_pubsub_push.h"
17+
#include "rgw_zone_features.h"
1718
#include "rgw_perf_counters.h"
19+
#include "services/svc_zone.h"
1820
#include "common/dout.h"
1921
#include <chrono>
2022

@@ -119,6 +121,7 @@ class Manager : public DoutPrefixProvider {
119121
const uint32_t stale_reservations_period_s;
120122
const uint32_t reservations_cleanup_period_s;
121123
queues_persistency_tracker topics_persistency_tracker;
124+
const SiteConfig& site;
122125
public:
123126
rgw::sal::RadosStore& rados_store;
124127

@@ -489,10 +492,11 @@ class Manager : public DoutPrefixProvider {
489492
std::string tenant_name;
490493
// TODO: extract tenant name from queue_name once it is fixed
491494
uint64_t size_to_migrate = 0;
492-
RGWPubSub ps(&rados_store, tenant_name);
495+
RGWPubSub ps(&rados_store, tenant_name, site);
493496

494497
rgw_pubsub_topic topic;
495-
auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, optional_yield(io_context, yield));
498+
auto ret_of_get_topic = ps.get_topic(this, queue_name, topic,
499+
optional_yield(io_context, yield), nullptr);
496500
if (ret_of_get_topic < 0) {
497501
// we can't migrate entries without topic info
498502
ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: "
@@ -666,7 +670,8 @@ class Manager : public DoutPrefixProvider {
666670
Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms,
667671
uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms,
668672
uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
669-
uint32_t _worker_count, rgw::sal::RadosStore* store) :
673+
uint32_t _worker_count, rgw::sal::RadosStore* store,
674+
const SiteConfig& site) :
670675
max_queue_size(_max_queue_size),
671676
queues_update_period_ms(_queues_update_period_ms),
672677
queues_update_retry_ms(_queues_update_retry_ms),
@@ -678,6 +683,7 @@ class Manager : public DoutPrefixProvider {
678683
worker_count(_worker_count),
679684
stale_reservations_period_s(_stale_reservations_period_s),
680685
reservations_cleanup_period_s(_reservations_cleanup_period_s),
686+
site(site),
681687
rados_store(*store)
682688
{
683689
spawn::spawn(io_context, [this](spawn::yield_context yield) {
@@ -750,7 +756,8 @@ constexpr uint32_t WORKER_COUNT = 1; // 1 worker thread
750756
constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old
751757
constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
752758

753-
bool init(CephContext* cct, rgw::sal::RadosStore* store, const DoutPrefixProvider *dpp) {
759+
bool init(CephContext* cct, rgw::sal::RadosStore* store,
760+
const SiteConfig& site, const DoutPrefixProvider *dpp) {
754761
if (s_manager) {
755762
return false;
756763
}
@@ -760,7 +767,7 @@ bool init(CephContext* cct, rgw::sal::RadosStore* store, const DoutPrefixProvide
760767
IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC,
761768
STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
762769
WORKER_COUNT,
763-
store);
770+
store, site);
764771
return true;
765772
}
766773

@@ -975,17 +982,40 @@ static inline bool notification_match(reservation_t& res,
975982
}
976983

977984
int publish_reserve(const DoutPrefixProvider* dpp,
985+
const SiteConfig& site,
978986
EventType event_type,
979987
reservation_t& res,
980988
const RGWObjTags* req_tags)
981989
{
982-
const RGWPubSub ps(res.store, res.user_tenant);
983-
const RGWPubSub::Bucket ps_bucket(ps, res.bucket);
984990
rgw_pubsub_bucket_topics bucket_topics;
985-
auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield);
986-
if (rc < 0) {
987-
// failed to fetch bucket topics
988-
return rc;
991+
if (all_zonegroups_support(site, zone_features::notification_v2) &&
992+
res.store->stat_topics_v1(res.user_tenant, res.yield, res.dpp) == -ENOENT) {
993+
auto ret = 0;
994+
if (!res.s) {
995+
// for non S3-request caller (e.g., lifecycle, ObjectSync), bucket attrs
996+
// are not loaded, so force to reload the bucket, that reloads the attr.
997+
// for non S3-request caller, res.s is nullptr
998+
ret = res.bucket->load_bucket(dpp, res.yield);
999+
if (ret < 0) {
1000+
ldpp_dout(dpp, 1)
1001+
<< "ERROR: failed to reload bucket: '" << res.bucket->get_name()
1002+
<< "' to get bucket notification attrs with error ret= " << ret
1003+
<< dendl;
1004+
return ret;
1005+
}
1006+
}
1007+
ret = get_bucket_notifications(dpp, res.bucket, bucket_topics);
1008+
if (ret < 0) {
1009+
return ret;
1010+
}
1011+
} else {
1012+
const RGWPubSub ps(res.store, res.user_tenant, site);
1013+
const RGWPubSub::Bucket ps_bucket(ps, res.bucket);
1014+
auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield);
1015+
if (rc < 0) {
1016+
// failed to fetch bucket topics
1017+
return rc;
1018+
}
9891019
}
9901020
for (const auto& bucket_topic : bucket_topics.topics) {
9911021
const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
@@ -1026,7 +1056,29 @@ static inline bool notification_match(reservation_t& res,
10261056
return ret;
10271057
}
10281058
}
1029-
res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
1059+
// load the topic,if there is change in topic config while it's stored in
1060+
// notification.
1061+
rgw_pubsub_topic result;
1062+
const RGWPubSub ps(res.store, res.user_tenant, site);
1063+
auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield, nullptr);
1064+
if (ret < 0) {
1065+
ldpp_dout(res.dpp, 1)
1066+
<< "INFO: failed to load topic: " << topic_cfg.name
1067+
<< ". error: " << ret
1068+
<< " while reserving persistent notification event" << dendl;
1069+
if (ret == -ENOENT) {
1070+
// either the topic is deleted but the corresponding notification still
1071+
// exist or in v2 mode the notification could have synced first but
1072+
// topic is not synced yet.
1073+
return 0;
1074+
}
1075+
ldpp_dout(res.dpp, 1)
1076+
<< "WARN: Using the stored topic from bucket notification struct."
1077+
<< dendl;
1078+
res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
1079+
} else {
1080+
res.topics.emplace_back(topic_filter.s3_id, result, res_id);
1081+
}
10301082
}
10311083
return 0;
10321084
}

0 commit comments

Comments
 (0)