1212#include < boost/algorithm/string/predicate.hpp>
1313#include < boost/optional.hpp>
1414#include < boost/utility/in_place_factory.hpp>
15+ #include < fmt/format.h>
1516
1617#include " include/scope_guard.h"
1718#include " common/Clock.h"
1819#include " common/armor.h"
20+ #include " common/async/spawn_throttle.h"
1921#include " common/errno.h"
2022#include " common/mime.h"
2123#include " common/utf8.h"
@@ -6671,38 +6673,34 @@ void RGWDeleteMultiObj::write_ops_log_entry(rgw_log_entry& entry) const {
66716673 entry.delete_multi_obj_meta .objects = std::move (ops_log_entries);
66726674}
66736675
6674- void RGWDeleteMultiObj::wait_flush (optional_yield y,
6675- boost::asio::deadline_timer *formatter_flush_cond,
6676- std::function<bool ()> predicate)
6676+ void RGWDeleteMultiObj::handle_individual_object (const rgw_obj_key& o, optional_yield y)
66776677{
6678- if (y && formatter_flush_cond) {
6679- auto yc = y.get_yield_context ();
6680- while (!predicate ()) {
6681- boost::system::error_code error;
6682- formatter_flush_cond->async_wait (yc[error]);
6683- rgw_flush_formatter (s, s->formatter );
6678+ // add the object key to the dout prefix so we can trace concurrent calls
6679+ struct ObjectPrefix : public DoutPrefixPipe {
6680+ const rgw_obj_key& o;
6681+ ObjectPrefix (const DoutPrefixProvider& dpp, const rgw_obj_key& o)
6682+ : DoutPrefixPipe(dpp), o(o) {}
6683+ void add_prefix (std::ostream& out) const override {
6684+ out << o << ' ' ;
66846685 }
6685- }
6686- }
6686+ } prefix{* this , o};
6687+ const DoutPrefixProvider* dpp = &prefix;
66876688
6688- void RGWDeleteMultiObj::handle_individual_object (const rgw_obj_key& o, optional_yield y,
6689- boost::asio::deadline_timer *formatter_flush_cond)
6690- {
66916689 std::unique_ptr<rgw::sal::Object> obj = bucket->get_object (o);
66926690 if (o.empty ()) {
6693- send_partial_response (o, false , " " , -EINVAL, formatter_flush_cond );
6691+ send_partial_response (o, false , " " , -EINVAL);
66946692 return ;
66956693 }
66966694
66976695 // verify object delete permission
66986696 const auto action = o.instance .empty () ?
66996697 rgw::IAM::s3DeleteObject :
67006698 rgw::IAM::s3DeleteObjectVersion;
6701- if (!verify_bucket_permission (this , s, ARN (obj->get_obj ()), s->user_acl ,
6699+ if (!verify_bucket_permission (dpp , s, ARN (obj->get_obj ()), s->user_acl ,
67026700 s->bucket_acl , s->iam_policy ,
67036701 s->iam_identity_policies ,
67046702 s->session_policies , action)) {
6705- send_partial_response (o, false , " " , -EACCES, formatter_flush_cond );
6703+ send_partial_response (o, false , " " , -EACCES);
67066704 return ;
67076705 }
67086706
@@ -6712,15 +6710,15 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_
67126710 if (!rgw::sal::Object::empty (obj.get ())) {
67136711 int state_loaded = -1 ;
67146712 bool check_obj_lock = obj->have_instance () && bucket->get_info ().obj_lock_enabled ();
6715- const auto ret = state_loaded = obj->load_obj_state (this , y, true );
6713+ const auto ret = state_loaded = obj->load_obj_state (dpp , y, true );
67166714
67176715 if (ret < 0 ) {
67186716 if (ret == -ENOENT) {
67196717 // object maybe delete_marker, skip check_obj_lock
67206718 check_obj_lock = false ;
67216719 } else {
67226720 // Something went wrong.
6723- send_partial_response (o, false , " " , ret, formatter_flush_cond );
6721+ send_partial_response (o, false , " " , ret);
67246722 return ;
67256723 }
67266724 } else {
@@ -6730,9 +6728,9 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_
67306728
67316729 if (check_obj_lock) {
67326730 ceph_assert (state_loaded == 0 );
6733- int object_lock_response = verify_object_lock (this , obj->get_attrs (), bypass_perm, bypass_governance_mode);
6731+ int object_lock_response = verify_object_lock (dpp , obj->get_attrs (), bypass_perm, bypass_governance_mode);
67346732 if (object_lock_response != 0 ) {
6735- send_partial_response (o, false , " " , object_lock_response, formatter_flush_cond );
6733+ send_partial_response (o, false , " " , object_lock_response);
67366734 return ;
67376735 }
67386736 }
@@ -6745,9 +6743,9 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_
67456743 rgw::notify::ObjectRemovedDelete;
67466744 std::unique_ptr<rgw::sal::Notification> res
67476745 = driver->get_notification (obj.get (), s->src_object .get (), s, event_type, y);
6748- op_ret = res->publish_reserve (this );
6746+ op_ret = res->publish_reserve (dpp );
67496747 if (op_ret < 0 ) {
6750- send_partial_response (o, false , " " , op_ret, formatter_flush_cond );
6748+ send_partial_response (o, false , " " , op_ret);
67516749 return ;
67526750 }
67536751
@@ -6760,67 +6758,59 @@ void RGWDeleteMultiObj::handle_individual_object(const rgw_obj_key& o, optional_
67606758 del_op->params .bucket_owner = s->bucket_owner .id ;
67616759 del_op->params .marker_version_id = version_id;
67626760
6763- op_ret = del_op->delete_obj (this , y, rgw::sal::FLAG_LOG_OP);
6761+ op_ret = del_op->delete_obj (dpp , y, rgw::sal::FLAG_LOG_OP);
67646762 if (op_ret == -ENOENT) {
67656763 op_ret = 0 ;
67666764 }
67676765 if (op_ret == 0 ) {
67686766 // send request to notification manager
6769- int ret = res->publish_commit (this , obj_size, ceph::real_clock::now (), etag, version_id);
6767+ int ret = res->publish_commit (dpp , obj_size, ceph::real_clock::now (), etag, version_id);
67706768 if (ret < 0 ) {
6771- ldpp_dout (this , 1 ) << " ERROR: publishing notification failed, with error: " << ret << dendl;
6769+ ldpp_dout (dpp , 1 ) << " ERROR: publishing notification failed, with error: " << ret << dendl;
67726770 // too late to rollback operation, hence op_ret is not set here
67736771 }
67746772 }
67756773
6776- send_partial_response (o, del_op->result .delete_marker , del_op->result .version_id , op_ret, formatter_flush_cond );
6774+ send_partial_response (o, del_op->result .delete_marker , del_op->result .version_id , op_ret);
67776775}
67786776
67796777void RGWDeleteMultiObj::execute (optional_yield y)
67806778{
6781- RGWMultiDelDelete *multi_delete;
6782- vector<rgw_obj_key>::iterator iter;
6783- RGWMultiDelXMLParser parser;
6784- uint32_t aio_count = 0 ;
6785- const uint32_t max_aio = std::max<uint32_t >(1 , s->cct ->_conf ->rgw_multi_obj_del_max_aio );
6786- char * buf;
6787- std::optional<boost::asio::deadline_timer> formatter_flush_cond;
6788- if (y) {
6789- auto ex = y.get_yield_context ().get_executor ();
6790- formatter_flush_cond = std::make_optional<boost::asio::deadline_timer>(ex);
6791- }
6792-
6793- buf = data.c_str ();
6779+ const char * buf = data.c_str ();
67946780 if (!buf) {
67956781 op_ret = -EINVAL;
6796- goto error ;
6782+ return ;
67976783 }
67986784
6785+ RGWMultiDelXMLParser parser;
67996786 if (!parser.init ()) {
68006787 op_ret = -EINVAL;
6801- goto error ;
6788+ return ;
68026789 }
68036790
68046791 if (!parser.parse (buf, data.length (), 1 )) {
6805- op_ret = -EINVAL;
6806- goto error;
6792+ s->err .message = " Failed to parse xml input" ;
6793+ op_ret = -ERR_MALFORMED_XML;
6794+ return ;
68076795 }
68086796
6809- multi_delete = static_cast <RGWMultiDelDelete *>(parser.find_first (" Delete" ));
6797+ auto multi_delete = static_cast <RGWMultiDelDelete *>(parser.find_first (" Delete" ));
68106798 if (!multi_delete) {
6811- op_ret = -EINVAL;
6812- goto error;
6813- } else {
6814- #define DELETE_MULTI_OBJ_MAX_NUM 1000
6815- int max_num = s->cct ->_conf ->rgw_delete_multi_obj_max_num ;
6816- if (max_num < 0 ) {
6817- max_num = DELETE_MULTI_OBJ_MAX_NUM;
6818- }
6819- int multi_delete_object_num = multi_delete->objects .size ();
6820- if (multi_delete_object_num > max_num) {
6821- op_ret = -ERR_MALFORMED_XML;
6822- goto error;
6823- }
6799+ s->err .message = " Missing require element Delete" ;
6800+ op_ret = -ERR_MALFORMED_XML;
6801+ return ;
6802+ }
6803+
6804+ constexpr int DEFAULT_MAX_NUM = 1000 ;
6805+ int max_num = s->cct ->_conf ->rgw_delete_multi_obj_max_num ;
6806+ if (max_num < 0 ) {
6807+ max_num = DEFAULT_MAX_NUM;
6808+ }
6809+ const int multi_delete_object_num = multi_delete->objects .size ();
6810+ if (multi_delete_object_num > max_num) {
6811+ s->err .message = fmt::format (" Object count limit {} exceeded" , max_num);
6812+ op_ret = -ERR_MALFORMED_XML;
6813+ return ;
68246814 }
68256815
68266816 if (multi_delete->is_quiet ())
@@ -6837,53 +6827,38 @@ void RGWDeleteMultiObj::execute(optional_yield y)
68376827 if (has_versioned && !s->mfa_verified ) {
68386828 ldpp_dout (this , 5 ) << " NOTICE: multi-object delete request with a versioned object, mfa auth not provided" << dendl;
68396829 op_ret = -ERR_MFA_REQUIRED;
6840- goto error ;
6830+ return ;
68416831 }
68426832 }
68436833
68446834 begin_response ();
6845- if (multi_delete->objects .empty ()) {
6846- goto done;
6847- }
68486835
6849- for (iter = multi_delete->objects .begin ();
6850- iter != multi_delete->objects .end ();
6851- ++iter) {
6852- rgw_obj_key obj_key = *iter;
6853- if (y) {
6854- wait_flush (y, &*formatter_flush_cond, [&aio_count, max_aio] {
6855- return aio_count < max_aio;
6856- });
6857- aio_count++;
6858- boost::asio::spawn (y.get_yield_context (), [this , &aio_count, obj_key, &formatter_flush_cond] (boost::asio::yield_context yield) {
6859- handle_individual_object (obj_key, yield, &*formatter_flush_cond);
6860- aio_count--;
6861- }, [] (std::exception_ptr eptr) {
6862- if (eptr) std::rethrow_exception (eptr);
6863- });
6864- } else {
6865- handle_individual_object (obj_key, y, nullptr );
6866- }
6867- }
6868- if (formatter_flush_cond) {
6869- wait_flush (y, &*formatter_flush_cond, [this , n=multi_delete->objects .size ()] {
6870- return n == ops_log_entries.size ();
6871- });
6836+ // process up to max_aio object deletes in parallel
6837+ const uint32_t max_aio = std::max<uint32_t >(1 , s->cct ->_conf ->rgw_multi_obj_del_max_aio );
6838+ auto group = ceph::async::spawn_throttle{y, max_aio};
6839+
6840+ for (const auto & key : multi_delete->objects ) {
6841+ boost::asio::spawn (group.get_executor (),
6842+ [this , &key] (boost::asio::yield_context yield) {
6843+ handle_individual_object (key, yield);
6844+ }, group);
6845+
6846+ rgw_flush_formatter (s, s->formatter );
68726847 }
6848+ group.wait ();
68736849
68746850 /* set the return code to zero, errors at this point will be
68756851 dumped to the response */
68766852 op_ret = 0 ;
68776853
6878- done:
68796854 // will likely segfault if begin_response() has not been called
68806855 end_response ();
6881- return ;
6856+ }
68826857
6883- error:
6858+ void RGWDeleteMultiObj::send_response ()
6859+ {
6860+ // if we haven't already written a response, send the error response
68846861 send_status ();
6885- return ;
6886-
68876862}
68886863
68896864bool RGWBulkDelete::Deleter::verify_permission (RGWBucketInfo& binfo,
0 commit comments