Skip to content

Commit 482036c

Browse files
committed
crimson/osd/replicated_recovery_backend: take excl lock during final replica push commit
We need to block reads on any recovery object between when we mark the object not missing and when the commit completes. Create obc from first push_op and use that obc during the final commit to create an ObjectContextLoader::Manager to lock. Signed-off-by: Samuel Just <[email protected]>
1 parent e9d03bc commit 482036c

File tree

1 file changed

+22
-8
lines changed

1 file changed

+22
-8
lines changed

src/crimson/osd/replicated_recovery_backend.cc

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,14 @@ ReplicatedRecoveryBackend::handle_push(
993993
bool clear_omap = !push_op.before_progress.omap_complete;
994994
response.soid = push_op.recovery_info.soid;
995995

996+
if (first) {
997+
auto [oi, ssc] = get_md_from_push_op(push_op);
998+
replica_push_targets[push_op.recovery_info.soid] =
999+
pg.obc_loader.create_cached_obc_from_push_data(
1000+
oi,
1001+
ssc);
1002+
}
1003+
9961004
co_await submit_push_data(
9971005
push_op.recovery_info, first, complete, clear_omap,
9981006
std::move(data_zeros),
@@ -1002,22 +1010,28 @@ ReplicatedRecoveryBackend::handle_push(
10021010
push_op.attrset,
10031011
std::move(push_op.omap_entries), &t);
10041012

1013+
epoch_t epoch_frozen = pg.get_osdmap_epoch();
1014+
DEBUGDPP("submitting transaction", pg);
1015+
10051016
if (complete) {
1017+
auto ptiter = replica_push_targets.find(push_op.recovery_info.soid);
1018+
ceph_assert(ptiter != replica_push_targets.end());
1019+
auto manager = pg.obc_loader.get_obc_manager(ptiter->second);
1020+
manager.lock_excl_sync(); /* cannot already be locked */
1021+
10061022
co_await pg.get_recovery_handler()->on_local_recover(
10071023
push_op.recovery_info.soid, push_op.recovery_info,
10081024
false, t);
1009-
}
10101025

1011-
epoch_t epoch_frozen = pg.get_osdmap_epoch();
1012-
DEBUGDPP("submitting transaction", pg);
1013-
1014-
co_await interruptor::make_interruptible(
1015-
shard_services.get_store().do_transaction(coll, std::move(t)));
1026+
co_await interruptor::make_interruptible(
1027+
shard_services.get_store().do_transaction(coll, std::move(t)));
1028+
replica_push_targets.erase(ptiter);
10161029

1017-
if (complete) {
1018-
//TODO: this should be grouped with pg.on_local_recover somehow.
10191030
pg.get_recovery_handler()->_committed_pushed_object(
10201031
epoch_frozen, pg.get_info().last_complete);
1032+
} else {
1033+
co_await interruptor::make_interruptible(
1034+
shard_services.get_store().do_transaction(coll, std::move(t)));
10211035
}
10221036

10231037
auto reply = crimson::make_message<MOSDPGPushReply>();

0 commit comments

Comments
 (0)