Skip to content

Commit 4478c09

Browse files
authored
Merge pull request ceph#50256 from yuvalif/wip-yuval-zipper-notifications
rgw/notifications: add bucket notification configuration to zipper Reviewed-By: dang, cbodley
2 parents 6850e35 + 3f7a295 commit 4478c09

File tree

11 files changed

+269
-198
lines changed

11 files changed

+269
-198
lines changed

src/rgw/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ set(librgw_common_srcs
9494
rgw_notify_event_type.cc
9595
rgw_period_history.cc
9696
rgw_period_puller.cc
97+
rgw_pubsub.cc
9798
rgw_coroutine.cc
9899
rgw_cr_rest.cc
99100
rgw_op.cc
@@ -110,6 +111,7 @@ set(librgw_common_srcs
110111
rgw_rest_ratelimit.cc
111112
rgw_rest_role.cc
112113
rgw_rest_s3.cc
114+
rgw_rest_pubsub.cc
113115
rgw_s3select.cc
114116
rgw_role.cc
115117
rgw_sal.cc
@@ -165,14 +167,12 @@ set(librgw_common_srcs
165167
driver/rados/rgw_object_expirer_core.cc
166168
driver/rados/rgw_otp.cc
167169
driver/rados/rgw_period.cc
168-
driver/rados/rgw_pubsub.cc
169170
driver/rados/rgw_pubsub_push.cc
170171
driver/rados/rgw_putobj_processor.cc
171172
driver/rados/rgw_rados.cc
172173
driver/rados/rgw_reshard.cc
173174
driver/rados/rgw_rest_bucket.cc
174175
driver/rados/rgw_rest_log.cc
175-
driver/rados/rgw_rest_pubsub.cc
176176
driver/rados/rgw_rest_realm.cc
177177
driver/rados/rgw_rest_user.cc
178178
driver/rados/rgw_sal_rados.cc

src/rgw/driver/rados/rgw_notify.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -778,9 +778,9 @@ static inline bool notification_match(reservation_t& res,
778778
const RGWObjTags* req_tags)
779779
{
780780
const RGWPubSub ps(res.store, res.user_tenant);
781-
const RGWPubSub::Bucket ps_bucket(ps, res.bucket->get_key());
781+
const RGWPubSub::Bucket ps_bucket(ps, res.bucket);
782782
rgw_pubsub_bucket_topics bucket_topics;
783-
auto rc = ps_bucket.get_topics(&bucket_topics);
783+
auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield);
784784
if (rc < 0) {
785785
// failed to fetch bucket topics
786786
return rc;

src/rgw/driver/rados/rgw_sal_rados.cc

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ namespace rgw::sal {
7474
// default number of entries to list with each bucket listing call
7575
// (use marker to bridge between calls)
7676
static constexpr size_t listing_max_entries = 1000;
77+
static std::string pubsub_oid_prefix = "pubsub.";
7778

7879
static int decode_policy(CephContext* cct,
7980
bufferlist& bl,
@@ -471,7 +472,7 @@ int RadosBucket::remove_bucket(const DoutPrefixProvider* dpp,
471472
// if bucket has notification definitions associated with it
472473
// they should be removed (note that any pending notifications on the bucket are still going to be sent)
473474
const RGWPubSub ps(store, info.owner.tenant);
474-
const RGWPubSub::Bucket ps_bucket(ps, info.bucket);
475+
const RGWPubSub::Bucket ps_bucket(ps, this);
475476
const auto ps_ret = ps_bucket.remove_notifications(dpp, y);
476477
if (ps_ret < 0 && ps_ret != -ENOENT) {
477478
ldpp_dout(dpp, -1) << "ERROR: unable to remove notifications from bucket. ret=" << ps_ret << dendl;
@@ -1024,6 +1025,55 @@ int RadosBucket::abort_multiparts(const DoutPrefixProvider* dpp,
10241025
return 0;
10251026
}
10261027

1028+
std::string RadosBucket::topics_oid() const {
1029+
return pubsub_oid_prefix + get_tenant() + ".bucket." + get_name() + "/" + get_marker();
1030+
}
1031+
1032+
int RadosBucket::read_topics(rgw_pubsub_bucket_topics& notifications,
1033+
RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp)
1034+
{
1035+
bufferlist bl;
1036+
const int ret = rgw_get_system_obj(store->svc()->sysobj,
1037+
store->svc()->zone->get_zone_params().log_pool,
1038+
topics_oid(),
1039+
bl,
1040+
objv_tracker,
1041+
nullptr, y, dpp, nullptr);
1042+
if (ret < 0) {
1043+
return ret;
1044+
}
1045+
1046+
auto iter = bl.cbegin();
1047+
try {
1048+
decode(notifications, iter);
1049+
} catch (buffer::error& err) {
1050+
ldpp_dout(dpp, 20) << " failed to decode bucket notifications from oid: " << topics_oid() << ". for bucket: "
1051+
<< get_name() << ". error: " << err.what() << dendl;
1052+
return -EIO;
1053+
}
1054+
1055+
return 0;
1056+
}
1057+
1058+
int RadosBucket::write_topics(const rgw_pubsub_bucket_topics& notifications,
1059+
RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) {
1060+
bufferlist bl;
1061+
encode(notifications, bl);
1062+
1063+
return rgw_put_system_obj(dpp, store->svc()->sysobj,
1064+
store->svc()->zone->get_zone_params().log_pool,
1065+
topics_oid(),
1066+
bl, false, objv_tracker, real_time(), y);
1067+
}
1068+
1069+
int RadosBucket::remove_topics(RGWObjVersionTracker* objv_tracker,
1070+
optional_yield y, const DoutPrefixProvider *dpp) {
1071+
return rgw_delete_system_obj(dpp, store->svc()->sysobj,
1072+
store->svc()->zone->get_zone_params().log_pool,
1073+
topics_oid(),
1074+
objv_tracker, y);
1075+
}
1076+
10271077
std::unique_ptr<User> RadosStore::get_user(const rgw_user &u)
10281078
{
10291079
return std::make_unique<RadosUser>(this, u);
@@ -1283,6 +1333,54 @@ std::unique_ptr<Notification> RadosStore::get_notification(const DoutPrefixProvi
12831333
return std::make_unique<RadosNotification>(dpp, this, obj, src_obj, event_type, _bucket, _user_id, _user_tenant, _req_id, y);
12841334
}
12851335

1336+
std::string RadosStore::topics_oid(const std::string& tenant) const {
1337+
return pubsub_oid_prefix + tenant;
1338+
}
1339+
1340+
int RadosStore::read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
1341+
optional_yield y, const DoutPrefixProvider *dpp) {
1342+
bufferlist bl;
1343+
const int ret = rgw_get_system_obj(svc()->sysobj,
1344+
svc()->zone->get_zone_params().log_pool,
1345+
topics_oid(tenant),
1346+
bl,
1347+
objv_tracker,
1348+
nullptr, y, dpp, nullptr);
1349+
if (ret < 0) {
1350+
return ret;
1351+
}
1352+
1353+
auto iter = bl.cbegin();
1354+
try {
1355+
decode(topics, iter);
1356+
} catch (buffer::error& err) {
1357+
ldpp_dout(dpp, 20) << " failed to decode topics from oid: " << topics_oid(tenant) <<
1358+
". error: " << err.what() << dendl;
1359+
return -EIO;
1360+
}
1361+
1362+
return 0;
1363+
}
1364+
1365+
int RadosStore::write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
1366+
optional_yield y, const DoutPrefixProvider *dpp) {
1367+
bufferlist bl;
1368+
encode(topics, bl);
1369+
1370+
return rgw_put_system_obj(dpp, svc()->sysobj,
1371+
svc()->zone->get_zone_params().log_pool,
1372+
topics_oid(tenant),
1373+
bl, false, objv_tracker, real_time(), y);
1374+
}
1375+
1376+
int RadosStore::remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,
1377+
optional_yield y, const DoutPrefixProvider *dpp) {
1378+
return rgw_delete_system_obj(dpp, svc()->sysobj,
1379+
svc()->zone->get_zone_params().log_pool,
1380+
topics_oid(tenant),
1381+
objv_tracker, y);
1382+
}
1383+
12861384
int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj)
12871385
{
12881386
return rados->delete_raw_obj(dpp, obj);

src/rgw/driver/rados/rgw_sal_rados.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ class RadosStore : public StoreDriver {
125125
RGWRados* rados;
126126
RGWUserCtl* user_ctl;
127127
std::unique_ptr<RadosZone> zone;
128+
std::string topics_oid(const std::string& tenant) const;
128129

129130
public:
130131
RadosStore()
@@ -168,6 +169,12 @@ class RadosStore : public StoreDriver {
168169
const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj,
169170
rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant,
170171
std::string& _req_id, optional_yield y) override;
172+
int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
173+
optional_yield y, const DoutPrefixProvider *dpp) override;
174+
int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
175+
optional_yield y, const DoutPrefixProvider *dpp) override;
176+
int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,
177+
optional_yield y, const DoutPrefixProvider *dpp) override;
171178
virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
172179
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
173180

@@ -503,6 +510,7 @@ class RadosBucket : public StoreBucket {
503510
private:
504511
RadosStore* store;
505512
RGWAccessControlPolicy acls;
513+
std::string topics_oid() const;
506514

507515
public:
508516
RadosBucket(RadosStore *_st)
@@ -608,6 +616,12 @@ class RadosBucket : public StoreBucket {
608616
bool *is_truncated) override;
609617
virtual int abort_multiparts(const DoutPrefixProvider* dpp,
610618
CephContext* cct) override;
619+
int read_topics(rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* objv_tracker,
620+
optional_yield y, const DoutPrefixProvider *dpp) override;
621+
int write_topics(const rgw_pubsub_bucket_topics& notifications, RGWObjVersionTracker* objv_tracker,
622+
optional_yield y, const DoutPrefixProvider *dpp) override;
623+
int remove_topics(RGWObjVersionTracker* objv_tracker,
624+
optional_yield y, const DoutPrefixProvider *dpp) override;
611625

612626
private:
613627
int link(const DoutPrefixProvider* dpp, User* new_user, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr);

src/rgw/rgw_admin.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10317,7 +10317,7 @@ int main(int argc, const char **argv)
1031710317

1031810318
if (opt_cmd == OPT::PUBSUB_TOPICS_LIST) {
1031910319

10320-
RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), tenant);
10320+
RGWPubSub ps(driver, tenant);
1032110321

1032210322
if (!bucket_name.empty()) {
1032310323
rgw_pubsub_bucket_topics result;
@@ -10327,16 +10327,16 @@ int main(int argc, const char **argv)
1032710327
return -ret;
1032810328
}
1032910329

10330-
const RGWPubSub::Bucket b(ps, bucket->get_key());
10331-
ret = b.get_topics(&result);
10330+
const RGWPubSub::Bucket b(ps, bucket.get());
10331+
ret = b.get_topics(dpp(), result, null_yield);
1033210332
if (ret < 0 && ret != -ENOENT) {
1033310333
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
1033410334
return -ret;
1033510335
}
1033610336
encode_json("result", result, formatter.get());
1033710337
} else {
1033810338
rgw_pubsub_topics result;
10339-
int ret = ps.get_topics(&result);
10339+
int ret = ps.get_topics(dpp(), result, null_yield);
1034010340
if (ret < 0 && ret != -ENOENT) {
1034110341
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
1034210342
return -ret;
@@ -10352,10 +10352,10 @@ int main(int argc, const char **argv)
1035210352
return EINVAL;
1035310353
}
1035410354

10355-
RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), tenant);
10355+
RGWPubSub ps(driver, tenant);
1035610356

1035710357
rgw_pubsub_topic topic;
10358-
ret = ps.get_topic(topic_name, &topic);
10358+
ret = ps.get_topic(dpp(), topic_name, topic, null_yield);
1035910359
if (ret < 0) {
1036010360
cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
1036110361
return -ret;
@@ -10370,7 +10370,7 @@ int main(int argc, const char **argv)
1037010370
return EINVAL;
1037110371
}
1037210372

10373-
RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), tenant);
10373+
RGWPubSub ps(driver, tenant);
1037410374

1037510375
ret = ps.remove_topic(dpp(), topic_name, null_yield);
1037610376
if (ret < 0) {

0 commit comments

Comments
 (0)