Skip to content

Commit 3efe524

Browse files
committed
crimson/osd/pg_recovery: rewrite start_recovery_ops
We had few confusions around the return value from start_recovery_ops. This commit is a groundwork for the return type change. * Move to coroutines * Update logging macro Signed-off-by: Matan Breizman <[email protected]> (cherry picked from commit ce4e9aa)
1 parent 5816f0d commit 3efe524

File tree

3 files changed

+75
-46
lines changed

3 files changed

+75
-46
lines changed

src/crimson/common/interruptible_future.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1486,6 +1486,17 @@ struct interruptor
14861486
futurize_invoke_if_func(std::forward<FutOrFuncs>(fut_or_funcs))...);
14871487
}
14881488

1489+
// This is a simpler implemation than seastar::when_all_succeed.
1490+
// We are not using ::seastar::internal::complete_when_all
1491+
template <typename T>
1492+
static inline auto when_all_succeed(std::vector<interruptible_future<InterruptCond, T>>&& futures) noexcept {
1493+
return interruptor::parallel_for_each(futures,
1494+
[] (auto&& ifut) -> interruptible_future<InterruptCond, T> {
1495+
return std::move(ifut);
1496+
});
1497+
}
1498+
1499+
14891500
template <typename Func,
14901501
typename Result = futurize_t<std::invoke_result_t<Func>>>
14911502
static inline Result async(Func&& func) {

src/crimson/osd/pg_recovery.cc

Lines changed: 59 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,14 @@ PGRecovery::start_recovery_ops(
4646
PglogBasedRecovery &recover_op,
4747
size_t max_to_start)
4848
{
49+
LOG_PREFIX(PGRecovery::start_recovery_ops);
4950
assert(pg->is_primary());
5051
assert(pg->is_peered());
5152

5253
if (pg->has_reset_since(recover_op.get_epoch_started()) ||
5354
recover_op.is_cancelled()) {
54-
logger().debug("recovery {} cancelled.", recover_op);
55-
return seastar::make_ready_future<bool>(false);
55+
DEBUGDPP("recovery {} cancelled.", pg->get_pgid(), recover_op);
56+
co_return false;
5657
}
5758
ceph_assert(pg->is_recovering());
5859

@@ -71,51 +72,32 @@ PGRecovery::start_recovery_ops(
7172
if (max_to_start > 0) {
7273
max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started);
7374
}
74-
using interruptor =
75-
crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
76-
return interruptor::parallel_for_each(started,
77-
[] (auto&& ifut) {
78-
return std::move(ifut);
79-
}).then_interruptible([this, &recover_op] {
80-
//TODO: maybe we should implement a recovery race interruptor in the future
81-
if (pg->has_reset_since(recover_op.get_epoch_started()) ||
82-
recover_op.is_cancelled()) {
83-
logger().debug("recovery {} cancelled.", recover_op);
84-
return seastar::make_ready_future<bool>(false);
85-
}
86-
ceph_assert(pg->is_recovering());
87-
ceph_assert(!pg->is_backfilling());
88-
89-
bool do_recovery = pg->get_peering_state().needs_recovery();
90-
if (!do_recovery) {
91-
logger().debug("start_recovery_ops: AllReplicasRecovered for pg: {}",
92-
pg->get_pgid());
93-
using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
94-
if (!pg->get_peering_state().needs_backfill()) {
95-
logger().debug("start_recovery_ops: AllReplicasRecovered for pg: {}",
96-
pg->get_pgid());
97-
(void) pg->get_shard_services().start_operation<LocalPeeringEvent>(
98-
static_cast<crimson::osd::PG*>(pg),
99-
pg->get_pg_whoami(),
100-
pg->get_pgid(),
101-
pg->get_osdmap_epoch(),
102-
pg->get_osdmap_epoch(),
103-
PeeringState::AllReplicasRecovered{});
104-
} else {
105-
logger().debug("start_recovery_ops: RequestBackfill for pg: {}",
106-
pg->get_pgid());
107-
(void) pg->get_shard_services().start_operation<LocalPeeringEvent>(
108-
static_cast<crimson::osd::PG*>(pg),
109-
pg->get_pg_whoami(),
110-
pg->get_pgid(),
111-
pg->get_osdmap_epoch(),
112-
pg->get_osdmap_epoch(),
113-
PeeringState::RequestBackfill{});
114-
}
115-
pg->reset_pglog_based_recovery_op();
116-
}
117-
return seastar::make_ready_future<bool>(do_recovery);
75+
76+
co_await interruptor::when_all_succeed(std::move(started));
77+
78+
//TODO: maybe we should implement a recovery race interruptor in the future
79+
if (pg->has_reset_since(recover_op.get_epoch_started()) ||
80+
recover_op.is_cancelled()) {
81+
DEBUGDPP("recovery {} cancelled.", pg->get_pgid(), recover_op);
82+
co_return false;
83+
}
84+
ceph_assert(pg->is_recovering());
85+
ceph_assert(!pg->is_backfilling());
86+
87+
// move to unnamed placeholder when C++ 26 is available
88+
auto reset_pglog_based_recovery_op = seastar::defer([this] {
89+
pg->reset_pglog_based_recovery_op();
11890
});
91+
92+
if (!pg->get_peering_state().needs_recovery()) {
93+
if (pg->get_peering_state().needs_backfill()) {
94+
request_backfill();
95+
} else {
96+
all_replicas_recovered();
97+
}
98+
co_return false;
99+
}
100+
co_return true;
119101
}
120102

121103
size_t PGRecovery::start_primary_recovery_ops(
@@ -642,6 +624,8 @@ void PGRecovery::on_pg_clean()
642624

643625
void PGRecovery::backfilled()
644626
{
627+
LOG_PREFIX(PGRecovery::backfilled);
628+
DEBUGDPP("", pg->get_pgid());
645629
using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
646630
std::ignore = pg->get_shard_services().start_operation<LocalPeeringEvent>(
647631
static_cast<crimson::osd::PG*>(pg),
@@ -652,6 +636,35 @@ void PGRecovery::backfilled()
652636
PeeringState::Backfilled{});
653637
}
654638

639+
void PGRecovery::request_backfill()
640+
{
641+
LOG_PREFIX(PGRecovery::request_backfill);
642+
DEBUGDPP("", pg->get_pgid());
643+
using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
644+
std::ignore = pg->get_shard_services().start_operation<LocalPeeringEvent>(
645+
static_cast<crimson::osd::PG*>(pg),
646+
pg->get_pg_whoami(),
647+
pg->get_pgid(),
648+
pg->get_osdmap_epoch(),
649+
pg->get_osdmap_epoch(),
650+
PeeringState::RequestBackfill{});
651+
}
652+
653+
654+
void PGRecovery::all_replicas_recovered()
655+
{
656+
LOG_PREFIX(PGRecovery::all_replicas_recovered);
657+
DEBUGDPP("", pg->get_pgid());
658+
using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
659+
std::ignore = pg->get_shard_services().start_operation<LocalPeeringEvent>(
660+
static_cast<crimson::osd::PG*>(pg),
661+
pg->get_pg_whoami(),
662+
pg->get_pgid(),
663+
pg->get_osdmap_epoch(),
664+
pg->get_osdmap_epoch(),
665+
PeeringState::AllReplicasRecovered{});
666+
}
667+
655668
void PGRecovery::backfill_suspended()
656669
{
657670
using BackfillState = crimson::osd::BackfillState;

src/crimson/osd/pg_recovery.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,12 @@ class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
137137
void update_peers_last_backfill(
138138
const hobject_t& new_last_backfill) final;
139139
bool budget_available() const final;
140+
141+
// TODO: move to start_peering_event_operation
140142
void backfilled() final;
143+
void request_backfill();
144+
void all_replicas_recovered();
145+
141146
friend crimson::osd::BackfillState::PGFacade;
142147
friend crimson::osd::PG;
143148
// backfill end

0 commit comments

Comments
 (0)