Skip to content

Commit db3db9c

Browse files
authored
Merge pull request ceph#60598 from xxhdx1985126/wip-68808
crimson/osd/replicate_backend: add the skipped newly created clone object to the push queue after the clone request completes Reviewed-by: Matan Breizman <[email protected]> Reviewed-by: Samuel Just <[email protected]>
2 parents c5eacfc + 184c186 commit db3db9c

File tree

13 files changed

+160
-51
lines changed

13 files changed

+160
-51
lines changed

src/crimson/osd/backfill_state.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,4 +611,12 @@ void BackfillState::ProgressTracker::complete_to(
611611
}
612612
}
613613

614+
void BackfillState::enqueue_standalone_push(
615+
const hobject_t &obj,
616+
const eversion_t &v,
617+
const std::vector<pg_shard_t> &peers) {
618+
progress_tracker->enqueue_push(obj);
619+
backfill_machine.backfill_listener.enqueue_push(obj, v, peers);
620+
}
621+
614622
} // namespace crimson::osd

src/crimson/osd/backfill_state.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,15 @@ struct BackfillState {
304304
backfill_machine.process_event(*std::move(evt));
305305
}
306306

307+
void enqueue_standalone_push(
308+
const hobject_t &obj,
309+
const eversion_t &v,
310+
const std::vector<pg_shard_t> &peers);
311+
312+
bool is_triggered() const {
313+
return backfill_machine.triggering_event() != nullptr;
314+
}
315+
307316
hobject_t get_last_backfill_started() const {
308317
return last_backfill_started;
309318
}

src/crimson/osd/ec_backend.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ ECBackend::_read(const hobject_t& hoid,
2626
ECBackend::rep_op_fut_t
2727
ECBackend::submit_transaction(const std::set<pg_shard_t> &pg_shards,
2828
const hobject_t& hoid,
29+
crimson::osd::ObjectContextRef&& new_clone,
2930
ceph::os::Transaction&& txn,
3031
osd_op_params_t&& osd_op_p,
3132
epoch_t min_epoch, epoch_t max_epoch,

src/crimson/osd/ec_backend.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class ECBackend : public PGBackend
2828
rep_op_fut_t
2929
submit_transaction(const std::set<pg_shard_t> &pg_shards,
3030
const hobject_t& hoid,
31+
crimson::osd::ObjectContextRef&& new_clone,
3132
ceph::os::Transaction&& txn,
3233
osd_op_params_t&& req,
3334
epoch_t min_epoch, epoch_t max_epoch,

src/crimson/osd/ops_executer.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,7 @@ std::unique_ptr<OpsExecuter::CloningContext> OpsExecuter::execute_clone(
940940
};
941941
encode(cloned_snaps, cloning_ctx->log_entry.snaps);
942942
cloning_ctx->log_entry.clean_regions.mark_data_region_dirty(0, initial_obs.oi.size);
943+
cloning_ctx->clone_obc = clone_obc;
943944

944945
return cloning_ctx;
945946
}
@@ -966,7 +967,7 @@ void OpsExecuter::update_clone_overlap() {
966967

967968
void OpsExecuter::CloningContext::apply_to(
968969
std::vector<pg_log_entry_t>& log_entries,
969-
ObjectContext& processed_obc) &&
970+
ObjectContext& processed_obc)
970971
{
971972
log_entry.mtime = processed_obc.obs.oi.mtime;
972973
log_entries.insert(log_entries.begin(), std::move(log_entry));
@@ -983,7 +984,7 @@ OpsExecuter::flush_clone_metadata(
983984
assert(!txn.empty());
984985
update_clone_overlap();
985986
if (cloning_ctx) {
986-
std::move(*cloning_ctx).apply_to(log_entries, *obc);
987+
cloning_ctx->apply_to(log_entries, *obc);
987988
}
988989
if (snapc.seq > obc->ssc->snapset.seq) {
989990
// update snapset with latest snap context

src/crimson/osd/ops_executer.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,11 @@ class OpsExecuter {
197197
struct CloningContext {
198198
SnapSet new_snapset;
199199
pg_log_entry_t log_entry;
200+
ObjectContextRef clone_obc;
200201

201202
void apply_to(
202203
std::vector<pg_log_entry_t>& log_entries,
203-
ObjectContext& processed_obc) &&;
204+
ObjectContext& processed_obc);
204205
};
205206
std::unique_ptr<CloningContext> cloning_ctx;
206207

@@ -504,6 +505,7 @@ OpsExecuter::flush_changes_n_do_ops_effects(
504505
ceph_assert(want_mutate);
505506
}
506507

508+
apply_stats();
507509
if (want_mutate) {
508510
auto log_entries = flush_clone_metadata(
509511
prepare_transaction(ops),
@@ -519,14 +521,15 @@ OpsExecuter::flush_changes_n_do_ops_effects(
519521
std::move(txn),
520522
std::move(obc),
521523
std::move(*osd_op_params),
522-
std::move(log_entries));
524+
std::move(log_entries),
525+
cloning_ctx
526+
? std::move(cloning_ctx->clone_obc)
527+
: nullptr);
523528

524529
submitted = std::move(_submitted);
525530
all_completed = std::move(_all_completed);
526531
}
527532

528-
apply_stats();
529-
530533
if (op_effects.size()) [[unlikely]] {
531534
// need extra ref pg due to apply_stats() which can be executed after
532535
// informing snap mapper

src/crimson/osd/osd_operations/snaptrim_event.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ SnapTrimObjSubEvent::process_and_submit(ObjectContextRef head_obc,
435435

436436
auto [submitted, all_completed] = co_await pg->submit_transaction(
437437
std::move(clone_obc),
438+
nullptr,
438439
std::move(txn),
439440
std::move(osd_op_p),
440441
std::move(log_entries)

src/crimson/osd/pg.cc

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -907,11 +907,23 @@ void PG::mutate_object(
907907
}
908908
}
909909

910+
void PG::enqueue_push_for_backfill(
911+
const hobject_t &obj,
912+
const eversion_t &v,
913+
const std::vector<pg_shard_t> &peers)
914+
{
915+
assert(recovery_handler);
916+
assert(recovery_handler->backfill_state);
917+
auto backfill_state = recovery_handler->backfill_state.get();
918+
backfill_state->enqueue_standalone_push(obj, v, peers);
919+
}
920+
910921
PG::interruptible_future<
911922
std::tuple<PG::interruptible_future<>,
912923
PG::interruptible_future<>>>
913924
PG::submit_transaction(
914925
ObjectContextRef&& obc,
926+
ObjectContextRef&& new_clone,
915927
ceph::os::Transaction&& txn,
916928
osd_op_params_t&& osd_op_p,
917929
std::vector<pg_log_entry_t>&& log_entries)
@@ -924,8 +936,9 @@ PG::submit_transaction(
924936
}
925937

926938
epoch_t map_epoch = get_osdmap_epoch();
939+
auto at_version = osd_op_p.at_version;
927940

928-
peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
941+
peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, at_version);
929942
peering_state.update_trim_to();
930943

931944
ceph_assert(!log_entries.empty());
@@ -939,6 +952,7 @@ PG::submit_transaction(
939952
auto [submitted, all_completed] = co_await backend->submit_transaction(
940953
peering_state.get_acting_recovery_backfill(),
941954
obc->obs.oi.soid,
955+
std::move(new_clone),
942956
std::move(txn),
943957
std::move(osd_op_p),
944958
peering_state.get_last_peering_reset(),
@@ -947,8 +961,8 @@ PG::submit_transaction(
947961
co_return std::make_tuple(
948962
std::move(submitted),
949963
all_completed.then_interruptible(
950-
[this, last_complete=peering_state.get_info().last_complete,
951-
at_version=osd_op_p.at_version](auto acked) {
964+
[this, last_complete=peering_state.get_info().last_complete, at_version]
965+
(auto acked) {
952966
for (const auto& peer : acked) {
953967
peering_state.update_peer_last_complete_ondisk(
954968
peer.shard, peer.last_complete_ondisk);
@@ -1153,11 +1167,13 @@ PG::submit_executer_fut PG::submit_executer(
11531167
[FNAME, this](auto&& txn,
11541168
auto&& obc,
11551169
auto&& osd_op_p,
1156-
auto&& log_entries) {
1170+
auto&& log_entries,
1171+
auto&& new_clone) {
11571172
DEBUGDPP("object {} submitting txn", *this, obc->get_oid());
11581173
mutate_object(obc, txn, osd_op_p);
11591174
return submit_transaction(
11601175
std::move(obc),
1176+
std::move(new_clone),
11611177
std::move(txn),
11621178
std::move(osd_op_p),
11631179
std::move(log_entries));
@@ -1604,7 +1620,7 @@ bool PG::should_send_op(
16041620
// missing set
16051621
hoid <= peering_state.get_peer_info(peer).last_backfill ||
16061622
(has_backfill_state() && hoid <= get_last_backfill_started() &&
1607-
!peering_state.get_peer_missing(peer).is_missing(hoid)));
1623+
!is_missing_on_peer(peer, hoid)));
16081624
if (!should_send) {
16091625
ceph_assert(is_backfill_target(peer));
16101626
logger().debug("{} issue_repop shipping empty opt to osd."

src/crimson/osd/pg.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
class MQuery;
4646
class OSDMap;
4747
class PGBackend;
48+
class ReplicatedBackend;
4849
class PGPeeringEvent;
4950
class osd_op_params_t;
5051

@@ -678,6 +679,7 @@ class PG : public boost::intrusive_ref_counter<
678679
std::tuple<interruptible_future<>, interruptible_future<>>>
679680
submit_transaction(
680681
ObjectContextRef&& obc,
682+
ObjectContextRef&& new_clone,
681683
ceph::os::Transaction&& txn,
682684
osd_op_params_t&& oop,
683685
std::vector<pg_log_entry_t>&& log_entries);
@@ -885,6 +887,10 @@ class PG : public boost::intrusive_ref_counter<
885887
friend class SnapTrimObjSubEvent;
886888
private:
887889

890+
void enqueue_push_for_backfill(
891+
const hobject_t &obj,
892+
const eversion_t &v,
893+
const std::vector<pg_shard_t> &peers);
888894
void mutate_object(
889895
ObjectContextRef& obc,
890896
ceph::os::Transaction& txn,
@@ -893,21 +899,27 @@ class PG : public boost::intrusive_ref_counter<
893899
bool can_discard_op(const MOSDOp& m) const;
894900
void context_registry_on_change();
895901
bool is_missing_object(const hobject_t& soid) const {
896-
return peering_state.get_pg_log().get_missing().get_items().count(soid);
902+
return get_local_missing().is_missing(soid);
897903
}
898904
bool is_unreadable_object(const hobject_t &oid,
899905
eversion_t* v = 0) const final {
900906
return is_missing_object(oid) ||
901907
!peering_state.get_missing_loc().readable_with_acting(
902908
oid, get_actingset(), v);
903909
}
910+
bool is_missing_on_peer(
911+
const pg_shard_t &peer,
912+
const hobject_t &soid) const {
913+
return peering_state.get_peer_missing(peer).is_missing(soid);
914+
}
904915
bool is_degraded_or_backfilling_object(const hobject_t& soid) const;
905916
const std::set<pg_shard_t> &get_actingset() const {
906917
return peering_state.get_actingset();
907918
}
908919

909920
private:
910921
friend class IOInterruptCondition;
922+
friend class ::ReplicatedBackend;
911923
struct log_update_t {
912924
std::set<pg_shard_t> waiting_on;
913925
seastar::shared_promise<> all_committed;

src/crimson/osd/pg_backend.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,7 @@ class PGBackend
414414
virtual rep_op_fut_t
415415
submit_transaction(const std::set<pg_shard_t> &pg_shards,
416416
const hobject_t& hoid,
417+
crimson::osd::ObjectContextRef&& new_clone,
417418
ceph::os::Transaction&& txn,
418419
osd_op_params_t&& osd_op_p,
419420
epoch_t min_epoch, epoch_t max_epoch,

0 commit comments

Comments
 (0)