Skip to content

Commit bf48674

Browse files
authored
Merge pull request ceph#57004 from kchheda3/wip-fix-retry
rgw/multisite-notification: retry storing bucket notification attrs for ECANCELED(ConcurrentModification) errors. Reviewed-by: Casey Bodley <[email protected]> Reviewed-by: Yuval Lifshitz <[email protected]>
2 parents 0aeeebe + 9ca7677 commit bf48674

File tree

3 files changed

+69
-68
lines changed

3 files changed

+69
-68
lines changed

src/rgw/rgw_op.cc

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -951,36 +951,6 @@ void rgw_bucket_object_pre_exec(req_state *s)
951951
dump_bucket_from_state(s);
952952
}
953953

954-
// So! Now and then when we try to update bucket information, the
955-
// bucket has changed during the course of the operation. (Or we have
956-
// a cache consistency problem that Watch/Notify isn't ruling out
957-
// completely.)
958-
//
959-
// When this happens, we need to update the bucket info and try
960-
// again. We have, however, to try the right *part* again. We can't
961-
// simply re-send, since that will obliterate the previous update.
962-
//
963-
// Thus, callers of this function should include everything that
964-
// merges information to be changed into the bucket information as
965-
// well as the call to set it.
966-
//
967-
// The called function must return an integer, negative on error. In
968-
// general, they should just return op_ret.
969-
namespace {
970-
template<typename F>
971-
int retry_raced_bucket_write(const DoutPrefixProvider *dpp, rgw::sal::Bucket* b, const F& f, optional_yield y) {
972-
auto r = f();
973-
for (auto i = 0u; i < 15u && r == -ECANCELED; ++i) {
974-
r = b->try_refresh_info(dpp, nullptr, y);
975-
if (r >= 0) {
976-
r = f();
977-
}
978-
}
979-
return r;
980-
}
981-
}
982-
983-
984954
int RGWGetObj::verify_permission(optional_yield y)
985955
{
986956
s->object->set_atomic();

src/rgw/rgw_op.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,36 @@ int rgw_rest_get_json_input(CephContext *cct, req_state *s, T& out,
165165
return 0;
166166
}
167167

168+
// So! Now and then when we try to update bucket information, the
169+
// bucket has changed during the course of the operation. (Or we have
170+
// a cache consistency problem that Watch/Notify isn't ruling out
171+
// completely.)
172+
//
173+
// When this happens, we need to update the bucket info and try
174+
// again. We have, however, to try the right *part* again. We can't
175+
// simply re-send, since that will obliterate the previous update.
176+
//
177+
// Thus, callers of this function should include everything that
178+
// merges information to be changed into the bucket information as
179+
// well as the call to set it.
180+
//
181+
// The called function must return an integer, negative on error. In
182+
// general, they should just return op_ret.
183+
template<typename F>
184+
int retry_raced_bucket_write(const DoutPrefixProvider *dpp,
185+
rgw::sal::Bucket *b,
186+
const F &f,
187+
optional_yield y) {
188+
auto r = f();
189+
for (auto i = 0u; i < 15u && r == -ECANCELED; ++i) {
190+
r = b->try_refresh_info(dpp, nullptr, y);
191+
if (r >= 0) {
192+
r = f();
193+
}
194+
}
195+
return r;
196+
}
197+
168198
/**
169199
* Provide the base class for all ops.
170200
*/

src/rgw/rgw_rest_pubsub.cc

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1318,48 +1318,49 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
13181318
op_ret = -ERR_SERVICE_UNAVAILABLE;
13191319
return;
13201320
}
1321-
1322-
if (configurations.list.empty()) {
1323-
op_ret = remove_notification_v2(this, driver, s->bucket.get(),
1324-
/*delete all notif=true*/ "", y);
1325-
return;
1326-
}
1327-
rgw_pubsub_bucket_topics bucket_topics;
1328-
op_ret = get_bucket_notifications(this, s->bucket.get(), bucket_topics);
1329-
if (op_ret < 0) {
1330-
ldpp_dout(this, 1)
1331-
<< "failed to load existing bucket notification on bucket: "
1332-
<< s->bucket << ", ret = " << op_ret << dendl;
1333-
return;
1334-
}
1335-
for (const auto& c : configurations.list) {
1336-
const auto& notif_name = c.id;
1337-
1338-
const auto arn = rgw::ARN::parse(c.topic_arn);
1339-
if (!arn) { // already validated above
1340-
continue;
1321+
op_ret = retry_raced_bucket_write(this, s->bucket.get(), [this, y] {
1322+
if (configurations.list.empty()) {
1323+
return remove_notification_v2(this, driver, s->bucket.get(),
1324+
/*delete all notif=true*/"", y);
13411325
}
1342-
const auto& topic_name = arn->resource;
1343-
1344-
auto t = topics.find(*arn);
1345-
if (t == topics.end()) {
1346-
continue;
1326+
rgw_pubsub_bucket_topics bucket_topics;
1327+
int ret = get_bucket_notifications(this, s->bucket.get(), bucket_topics);
1328+
if (ret < 0) {
1329+
ldpp_dout(this, 1)
1330+
<< "failed to load existing bucket notification on bucket: "
1331+
<< s->bucket << ", ret = " << ret << dendl;
1332+
return ret;
13471333
}
1348-
auto& topic_info = t->second;
1334+
for (const auto &c : configurations.list) {
1335+
const auto &notif_name = c.id;
13491336

1350-
auto& topic_filter =
1337+
const auto arn = rgw::ARN::parse(c.topic_arn);
1338+
if (!arn) { // already validated above
1339+
continue;
1340+
}
1341+
const auto &topic_name = arn->resource;
1342+
1343+
auto t = topics.find(*arn);
1344+
if (t == topics.end()) {
1345+
continue;
1346+
}
1347+
auto &topic_info = t->second;
1348+
1349+
auto &topic_filter =
13511350
bucket_topics.topics[topic_to_unique(topic_name, notif_name)];
1352-
topic_filter.topic = topic_info;
1353-
topic_filter.events = c.events;
1354-
topic_filter.s3_id = notif_name;
1355-
topic_filter.s3_filter = c.filter;
1356-
}
1357-
// finally store all the bucket notifications as attr.
1358-
bufferlist bl;
1359-
bucket_topics.encode(bl);
1360-
rgw::sal::Attrs& attrs = s->bucket->get_attrs();
1361-
attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
1362-
op_ret = s->bucket->merge_and_store_attrs(this, attrs, y);
1351+
topic_filter.topic = topic_info;
1352+
topic_filter.events = c.events;
1353+
topic_filter.s3_id = notif_name;
1354+
topic_filter.s3_filter = c.filter;
1355+
}
1356+
// finally store all the bucket notifications as attr.
1357+
bufferlist bl;
1358+
bucket_topics.encode(bl);
1359+
rgw::sal::Attrs &attrs = s->bucket->get_attrs();
1360+
attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
1361+
return s->bucket->merge_and_store_attrs(this, attrs, y);
1362+
}, y);
1363+
13631364
if (op_ret < 0) {
13641365
ldpp_dout(this, 4)
13651366
<< "Failed to store RGW_ATTR_BUCKET_NOTIFICATION on bucket="

0 commit comments

Comments
 (0)