@@ -964,6 +964,15 @@ PG::BackgroundProcessLock::lock() noexcept
964964 return interruptor::make_interruptible (mutex.lock ());
965965}
966966
967+ // We may need to rollback the ObjectContext on failed op execution.
968+ // Copy the current obc before mutating it in order to recover on failures.
969+ ObjectContextRef duplicate_obc (const ObjectContextRef &obc) {
970+ ObjectContextRef object_context = new ObjectContext (obc->obs .oi .soid );
971+ object_context->obs = obc->obs ;
972+ object_context->ssc = new SnapSetContext (*obc->ssc );
973+ return object_context;
974+ }
975+
967976template <class Ret , class SuccessFunc , class FailureFunc >
968977PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t <Ret>>
969978PG::do_osd_ops_execute (
@@ -976,9 +985,9 @@ PG::do_osd_ops_execute(
976985 FailureFunc&& failure_func)
977986{
978987 assert (ox);
979- auto rollbacker = ox->create_rollbacker ([ this ] ( auto & obc) {
980- return obc_loader. reload_obc (obc). handle_error_interruptible (
981- load_obc_ertr::assert_all{ " can't live with object state messed up " } );
988+ auto rollbacker = ox->create_rollbacker (
989+ [object_context= duplicate_obc (obc)] ( auto & obc) mutable {
990+ obc-> update_from (*object_context );
982991 });
983992 auto failure_func_ptr = seastar::make_lw_shared (std::move (failure_func));
984993 return interruptor::do_for_each (ops, [ox](OSDOp& osd_op) {
@@ -1040,23 +1049,21 @@ PG::do_osd_ops_execute(
10401049 std::move (log_entries));
10411050 });
10421051 }).safe_then_unpack_interruptible (
1043- [success_func=std::move (success_func), rollbacker, this , failure_func_ptr]
1052+ [success_func=std::move (success_func), rollbacker, this , failure_func_ptr, obc ]
10441053 (auto submitted_fut, auto _all_completed_fut) mutable {
10451054
10461055 auto all_completed_fut = _all_completed_fut.safe_then_interruptible_tuple (
10471056 std::move (success_func),
10481057 crimson::ct_error::object_corrupted::handle (
1049- [rollbacker, this ] (const std::error_code& e) mutable {
1058+ [rollbacker, this , obc ] (const std::error_code& e) mutable {
10501059 // this is a path for EIO. it's special because we want to fix the obejct
10511060 // and try again. that is, the layer above `PG::do_osd_ops` is supposed to
10521061 // restart the execution.
1053- return rollbacker.rollback_obc_if_modified (e).then_interruptible (
1054- [obc=rollbacker.get_obc (), this ] {
1055- return repair_object (obc->obs .oi .soid ,
1056- obc->obs .oi .version
1057- ).then_interruptible ([] {
1058- return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make ()};
1059- });
1062+ rollbacker.rollback_obc_if_modified (e);
1063+ return repair_object (obc->obs .oi .soid ,
1064+ obc->obs .oi .version
1065+ ).then_interruptible ([] {
1066+ return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make ()};
10601067 });
10611068 }), OpsExecuter::osd_op_errorator::all_same_way (
10621069 [rollbacker, failure_func_ptr]
@@ -1065,11 +1072,8 @@ PG::do_osd_ops_execute(
10651072 ceph_assert (e.value () == EDQUOT ||
10661073 e.value () == ENOSPC ||
10671074 e.value () == EAGAIN);
1068- return rollbacker.rollback_obc_if_modified (e).then_interruptible (
1069- [e, failure_func_ptr] {
1070- // no need to record error log
1071- return (*failure_func_ptr)(e);
1072- });
1075+ rollbacker.rollback_obc_if_modified (e);
1076+ return (*failure_func_ptr)(e);
10731077 }));
10741078
10751079 return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t <Ret>>(
@@ -1081,45 +1085,42 @@ PG::do_osd_ops_execute(
10811085 rollbacker, failure_func_ptr]
10821086 (const std::error_code& e) mutable {
10831087 ceph_tid_t rep_tid = shard_services.get_tid ();
1084- return rollbacker.rollback_obc_if_modified (e).then_interruptible (
1085- [&, op_info, m, obc,
1086- this , e, rep_tid, failure_func_ptr] {
1087- // record error log
1088- auto maybe_submit_error_log =
1089- seastar::make_ready_future<std::optional<eversion_t >>(std::nullopt );
1090- // call submit_error_log only for non-internal clients
1091- if constexpr (!std::is_same_v<Ret, void >) {
1092- if (op_info.may_write ()) {
1093- maybe_submit_error_log =
1094- submit_error_log (m, op_info, obc, e, rep_tid);
1095- }
1088+ rollbacker.rollback_obc_if_modified (e);
1089+ // record error log
1090+ auto maybe_submit_error_log =
1091+ interruptor::make_ready_future<std::optional<eversion_t >>(std::nullopt );
1092+ // call submit_error_log only for non-internal clients
1093+ if constexpr (!std::is_same_v<Ret, void >) {
1094+ if (op_info.may_write ()) {
1095+ maybe_submit_error_log =
1096+ submit_error_log (m, op_info, obc, e, rep_tid);
10961097 }
1097- return maybe_submit_error_log.then (
1098- [this , failure_func_ptr, e, rep_tid] (auto version) {
1099- auto all_completed =
1100- [this , failure_func_ptr, e, rep_tid, version] {
1101- if (version.has_value ()) {
1102- return complete_error_log (rep_tid, version.value ()).then (
1103- [failure_func_ptr, e] {
1104- return (*failure_func_ptr)(e);
1105- });
1106- } else {
1098+ }
1099+ return maybe_submit_error_log.then_interruptible (
1100+ [this , failure_func_ptr, e, rep_tid] (auto version) {
1101+ auto all_completed =
1102+ [this , failure_func_ptr, e, rep_tid, version] {
1103+ if (version.has_value ()) {
1104+ return complete_error_log (rep_tid, version.value ()
1105+ ).then_interruptible ([failure_func_ptr, e] {
11071106 return (*failure_func_ptr)(e);
1108- }
1109- };
1110- return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t <Ret>>(
1111- std::move (seastar::now ()),
1112- std::move (all_completed ())
1113- );
1114- });
1107+ });
1108+ } else {
1109+ return (*failure_func_ptr)(e);
1110+ }
1111+ };
1112+ return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t <Ret>>(
1113+ std::move (seastar::now ()),
1114+ std::move (all_completed ())
1115+ );
11151116 });
11161117 }));
11171118}
11181119
1119- seastar::future <> PG::complete_error_log (const ceph_tid_t & rep_tid,
1120+ PG::interruptible_future <> PG::complete_error_log (const ceph_tid_t & rep_tid,
11201121 const eversion_t & version)
11211122{
1122- auto result = seastar ::now ();
1123+ auto result = interruptor ::now ();
11231124 auto last_complete = peering_state.get_info ().last_complete ;
11241125 ceph_assert (log_entry_update_waiting_on.contains (rep_tid));
11251126 auto & log_update = log_entry_update_waiting_on[rep_tid];
@@ -1133,8 +1134,9 @@ seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
11331134 } else {
11341135 logger ().debug (" complete_error_log: rep_tid {} awaiting update from {}" ,
11351136 rep_tid, log_update.waiting_on );
1136- result = log_update.all_committed .get_shared_future ().then (
1137- [this , last_complete, rep_tid, version] {
1137+ result = interruptor::make_interruptible (
1138+ log_update.all_committed .get_shared_future ()
1139+ ).then_interruptible ([this , last_complete, rep_tid, version] {
11381140 logger ().debug (" complete_error_log: rep_tid {} awaited " , rep_tid);
11391141 peering_state.complete_write (version, last_complete);
11401142 ceph_assert (!log_entry_update_waiting_on.contains (rep_tid));
@@ -1144,7 +1146,7 @@ seastar::future<> PG::complete_error_log(const ceph_tid_t& rep_tid,
11441146 return result;
11451147}
11461148
1147- seastar::future <std::optional<eversion_t >> PG::submit_error_log (
1149+ PG::interruptible_future <std::optional<eversion_t >> PG::submit_error_log (
11481150 Ref<MOSDOp> m,
11491151 const OpInfo &op_info,
11501152 ObjectContextRef obc,
@@ -1175,7 +1177,7 @@ seastar::future<std::optional<eversion_t>> PG::submit_error_log(
11751177
11761178 return seastar::do_with (log_entries, set<pg_shard_t >{},
11771179 [this , t=std::move (t), rep_tid](auto & log_entries, auto & waiting_on) mutable {
1178- return seastar ::do_for_each (get_acting_recovery_backfill (),
1180+ return interruptor ::do_for_each (get_acting_recovery_backfill (),
11791181 [this , log_entries, waiting_on, rep_tid]
11801182 (auto & i) mutable {
11811183 pg_shard_t peer (i);
@@ -1200,7 +1202,7 @@ seastar::future<std::optional<eversion_t>> PG::submit_error_log(
12001202 return shard_services.send_to_osd (peer.osd ,
12011203 std::move (log_m),
12021204 get_osdmap_epoch ());
1203- }).then ([this , waiting_on, t=std::move (t), rep_tid] () mutable {
1205+ }).then_interruptible ([this , waiting_on, t=std::move (t), rep_tid] () mutable {
12041206 waiting_on.insert (pg_whoami);
12051207 logger ().debug (" submit_error_log: inserting rep_tid {}" , rep_tid);
12061208 log_entry_update_waiting_on.insert (
0 commit comments