Skip to content

Commit ef20848

Browse files
authored
Merge pull request ceph#57079 from kchheda3/wip-lc-notification
rgw/lifecycle-notification: Do not block lc processing for notification errors. Reviewed-by: Yuval Lifshitz <[email protected]>
2 parents 6088637 + 02295f9 commit ef20848

File tree

2 files changed

+55
-95
lines changed

2 files changed

+55
-95
lines changed

src/rgw/driver/rados/rgw_cr_rados.cc

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -717,13 +717,13 @@ int RGWRadosBILogTrimCR::request_complete()
717717
return r;
718718
}
719719

720-
int send_sync_notification(const DoutPrefixProvider* dpp,
721-
rgw::sal::RadosStore* store,
722-
rgw::sal::Bucket* bucket,
723-
rgw::sal::Object* obj,
724-
const rgw::sal::Attrs& attrs,
725-
uint64_t obj_size,
726-
const rgw::notify::EventTypeList& event_types) {
720+
void send_sync_notification(const DoutPrefixProvider* dpp,
721+
rgw::sal::RadosStore* store,
722+
rgw::sal::Bucket* bucket,
723+
rgw::sal::Object* obj,
724+
const rgw::sal::Attrs& attrs,
725+
uint64_t obj_size,
726+
const rgw::notify::EventTypeList& event_types) {
727727
// send notification that object was successfully synced
728728
std::string user_id = "rgw sync";
729729
std::string req_id = "0";
@@ -738,7 +738,6 @@ int send_sync_notification(const DoutPrefixProvider* dpp,
738738
ldpp_dout(dpp, 1) << "ERROR: " << __func__
739739
<< ": caught buffer::error couldn't decode TagSet "
740740
<< dendl;
741-
return -EIO;
742741
}
743742
}
744743
// bucket attrs are required for notification and since its not loaded,
@@ -748,7 +747,7 @@ int send_sync_notification(const DoutPrefixProvider* dpp,
748747
ldpp_dout(dpp, 1) << "ERROR: failed to load bucket attrs for bucket:"
749748
<< bucket->get_name() << " with error ret= " << r
750749
<< " . Not sending notification" << dendl;
751-
return r;
750+
return;
752751
}
753752
rgw::notify::reservation_t notify_res(dpp, store, obj, nullptr, bucket,
754753
user_id, bucket->get_tenant(), req_id,
@@ -772,7 +771,6 @@ int send_sync_notification(const DoutPrefixProvider* dpp,
772771
<< ret << dendl;
773772
}
774773
}
775-
return ret;
776774
}
777775

778776
int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)

src/rgw/rgw_lc.cc

Lines changed: 47 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,35 @@ static bool pass_size_limit_checks(const DoutPrefixProvider *dpp, lc_op_ctx& oc)
538538
static std::string lc_id = "rgw lifecycle";
539539
static std::string lc_req_id = "0";
540540

541+
static void send_notification(const DoutPrefixProvider* dpp,
542+
rgw::sal::Driver* driver,
543+
rgw::sal::Object* obj,
544+
rgw::sal::Bucket* bucket,
545+
const std::string& etag,
546+
uint64_t size,
547+
const std::string& version_id,
548+
const rgw::notify::EventTypeList& event_types) {
549+
// notification supported only for RADOS driver for now
550+
auto notify = driver->get_notification(
551+
dpp, obj, nullptr, event_types, bucket, lc_id,
552+
const_cast<std::string&>(bucket->get_tenant()), lc_req_id, null_yield);
553+
554+
int ret = notify->publish_reserve(dpp, nullptr);
555+
if (ret < 0) {
556+
ldpp_dout(dpp, 1) << "ERROR: notify publish_reserve failed, with error: "
557+
<< ret << " for lc object: " << obj->get_name()
558+
<< " for event_types: " << event_types << dendl;
559+
return;
560+
}
561+
ret = notify->publish_commit(dpp, size, ceph::real_clock::now(), etag,
562+
version_id);
563+
if (ret < 0) {
564+
ldpp_dout(dpp, 5) << "WARNING: notify publish_commit failed, with error: "
565+
<< ret << " for lc object: " << obj->get_name()
566+
<< " for event_types: " << event_types << dendl;
567+
}
568+
}
569+
541570
/* do all zones in the zone group process LC? */
542571
static bool zonegroup_lc_check(const DoutPrefixProvider *dpp, rgw::sal::Zone* zone)
543572
{
@@ -571,7 +600,6 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp,
571600
auto& meta = o.meta;
572601
int ret;
573602
auto version_id = obj_key.instance; // deep copy, so not cleared below
574-
std::unique_ptr<rgw::sal::Notification> notify;
575603

576604
/* per discussion w/Daniel, Casey,and Eric, we *do need*
577605
* a new sal object handle, based on the following decision
@@ -593,6 +621,7 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp,
593621
if (obj->get_attr(RGW_ATTR_ETAG, bl)) {
594622
etag = rgw_bl_str(bl);
595623
}
624+
auto size = obj->get_size();
596625

597626
std::unique_ptr<rgw::sal::Object::DeleteOp> del_op
598627
= obj->get_delete_op();
@@ -603,35 +632,15 @@ static int remove_expired_obj(const DoutPrefixProvider* dpp,
603632
del_op->params.bucket_owner = bucket_info.owner;
604633
del_op->params.unmod_since = meta.mtime;
605634

606-
// notification supported only for RADOS driver for now
607-
notify = driver->get_notification(
608-
dpp, obj.get(), nullptr, event_types, oc.bucket, lc_id,
609-
const_cast<std::string&>(oc.bucket->get_tenant()), lc_req_id, null_yield);
610-
611-
ret = notify->publish_reserve(dpp, nullptr);
612-
if ( ret < 0) {
613-
ldpp_dout(dpp, 1)
614-
<< "ERROR: notify reservation failed, deferring delete of object k="
615-
<< o.key
616-
<< dendl;
617-
return ret;
618-
}
619-
620635
uint32_t flags = (!remove_indeed || !zonegroup_lc_check(dpp, oc.driver->get_zone()))
621636
? rgw::sal::FLAG_LOG_OP : 0;
622637
ret = del_op->delete_obj(dpp, null_yield, flags);
623638
if (ret < 0) {
624639
ldpp_dout(dpp, 1) <<
625640
fmt::format("ERROR: {} failed, with error: {}", __func__, ret) << dendl;
626641
} else {
627-
// send request to notification manager
628-
int publish_ret = notify->publish_commit(dpp, obj->get_size(),
629-
ceph::real_clock::now(),
630-
etag,
631-
version_id);
632-
if (publish_ret < 0) {
633-
ldpp_dout(dpp, 5) << "WARNING: notify publish_commit failed, with error: " << publish_ret << dendl;
634-
}
642+
send_notification(dpp, driver, obj.get(), oc.bucket, etag, size, version_id,
643+
event_types);
635644
}
636645

637646
return ret;
@@ -880,8 +889,6 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
880889
params.ns = RGW_OBJ_NS_MULTIPART;
881890
params.access_list_filter = MultipartMetaFilter;
882891

883-
const auto event_type = rgw::notify::ObjectExpirationAbortMPU;
884-
885892
auto pf = [&](RGWLC::LCWorker *wk, WorkQ *wq, WorkItem &wi) {
886893
int ret{0};
887894
auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
@@ -901,36 +908,13 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
901908
if (sal_obj->get_attr(RGW_ATTR_ETAG, bl)) {
902909
etag = rgw_bl_str(bl);
903910
}
904-
905-
std::unique_ptr<rgw::sal::Notification> notify
906-
= driver->get_notification(
907-
this, sal_obj.get(), nullptr, {event_type}, target, lc_id,
908-
const_cast<std::string&>(target->get_tenant()), lc_req_id,
909-
null_yield);
910-
auto version_id = obj.key.instance;
911-
912-
ret = notify->publish_reserve(this, nullptr);
913-
if (ret < 0) {
914-
ldpp_dout(wk->get_lc(), 0)
915-
<< "ERROR: reserving persistent notification for "
916-
"abort_multipart_upload, ret="
917-
<< ret << ", thread:" << wq->thr_name()
918-
<< ", deferring mpu cleanup for meta:" << obj.key << dendl;
919-
return ret;
920-
}
911+
auto size = sal_obj->get_size();
921912

922913
ret = mpu->abort(this, cct, null_yield);
923914
if (ret == 0) {
924-
int publish_ret = notify->publish_commit(
925-
this, sal_obj->get_size(),
926-
ceph::real_clock::now(),
927-
etag,
928-
version_id);
929-
if (publish_ret < 0) {
930-
ldpp_dout(wk->get_lc(), 5)
931-
<< "WARNING: notify publish_commit failed, with error: " << ret
932-
<< dendl;
933-
}
915+
const auto event_type = rgw::notify::ObjectExpirationAbortMPU;
916+
send_notification(this, driver, sal_obj.get(), target, etag, size,
917+
obj.key.instance, {event_type});
934918
if (perfcounter) {
935919
perfcounter->inc(l_rgw_lc_abort_mpu, 1);
936920
}
@@ -1420,31 +1404,7 @@ class LCOpAction_Transition : public LCOpAction {
14201404
if (obj->get_attr(RGW_ATTR_ETAG, bl)) {
14211405
etag = rgw_bl_str(bl);
14221406
}
1423-
1424-
rgw::notify::EventTypeList event_types;
1425-
if (bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
1426-
event_types.insert(event_types.end(),
1427-
{rgw::notify::ObjectTransitionCurrent,
1428-
rgw::notify::LifecycleTransition});
1429-
} else {
1430-
event_types.push_back(rgw::notify::ObjectTransitionNonCurrent);
1431-
}
1432-
1433-
std::unique_ptr<rgw::sal::Notification> notify =
1434-
oc.driver->get_notification(
1435-
oc.dpp, obj.get(), nullptr, event_types, bucket, lc_id,
1436-
const_cast<std::string&>(oc.bucket->get_tenant()), lc_req_id,
1437-
null_yield);
1438-
auto version_id = oc.o.key.instance;
1439-
1440-
ret = notify->publish_reserve(oc.dpp, nullptr);
1441-
if (ret < 0) {
1442-
ldpp_dout(oc.dpp, 1)
1443-
<< "ERROR: notify reservation failed, deferring transition of object k="
1444-
<< oc.o.key
1445-
<< dendl;
1446-
return ret;
1447-
}
1407+
auto size = obj->get_size();
14481408

14491409
ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o,
14501410
oc.env.worker->get_cloud_targets(),
@@ -1453,15 +1413,17 @@ class LCOpAction_Transition : public LCOpAction {
14531413
if (ret < 0) {
14541414
return ret;
14551415
} else {
1456-
// send request to notification manager
1457-
int publish_ret = notify->publish_commit(oc.dpp, obj->get_size(),
1458-
ceph::real_clock::now(),
1459-
etag,
1460-
version_id);
1461-
if (publish_ret < 0) {
1462-
ldpp_dout(oc.dpp, 5) <<
1463-
"WARNING: notify publish_commit failed, with error: " << publish_ret << dendl;
1416+
rgw::notify::EventTypeList event_types;
1417+
if (bucket->versioned() && oc.o.is_current() &&
1418+
!oc.o.is_delete_marker()) {
1419+
event_types.insert(event_types.end(),
1420+
{rgw::notify::ObjectTransitionCurrent,
1421+
rgw::notify::LifecycleTransition});
1422+
} else {
1423+
event_types.push_back(rgw::notify::ObjectTransitionNonCurrent);
14641424
}
1425+
send_notification(oc.dpp, oc.driver, obj.get(), oc.bucket, etag, size,
1426+
oc.o.key.instance, event_types);
14651427
}
14661428

14671429
if (delete_object) {

0 commit comments

Comments
 (0)