Skip to content

Commit 4b9def8

Browse files
committed
crimson/osd/recovery_backend: restart object pulling for recoveries that
are blocked pulling from down osds Fixes: https://tracker.ceph.com/issues/67508 Signed-off-by: Xuehan Xu <[email protected]>
1 parent 1df9dd9 commit 4b9def8

File tree

3 files changed

+48
-23
lines changed

3 files changed

+48
-23
lines changed

src/crimson/osd/pg.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,13 @@ class PG : public boost::intrusive_ref_counter<
357357
shard_services.remove_want_pg_temp(orderer, pgid.pgid);
358358
}
359359
void check_recovery_sources(const OSDMapRef& newmap) final {
360-
// Not needed yet
360+
recovery_backend->for_each_recovery_waiter(
361+
[newmap, FNAME](auto &, auto &waiter) {
362+
if (waiter->is_pulling() &&
363+
newmap->is_down(waiter->pull_info->from.osd)) {
364+
waiter->repeat_pull();
365+
}
366+
});
361367
}
362368
void check_blocklisted_watchers() final;
363369
void clear_primary_state() final {

src/crimson/osd/recovery_backend.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ class RecoveryBackend {
112112
}
113113
return on_stop();
114114
}
115+
116+
template <typename Func>
117+
void for_each_recovery_waiter(Func &&f) {
118+
for (auto &[soid, recovery_waiter] : recovering) {
119+
std::forward<Func>(f)(soid, recovery_waiter);
120+
}
121+
}
115122
protected:
116123
crimson::osd::PG& pg;
117124
crimson::osd::ShardServices& shard_services;
@@ -219,6 +226,13 @@ class RecoveryBackend {
219226
pulled.reset();
220227
}
221228
}
229+
void repeat_pull() {
230+
ceph_assert(pulled);
231+
pulled->set_exception(crimson::ct_error::eagain::exception_ptr());
232+
}
233+
bool is_pulling() const {
234+
return (bool)pulled;
235+
}
222236
void set_push_failed(pg_shard_t shard, std::exception_ptr e) {
223237
auto it = pushes.find(shard);
224238
if (it != pushes.end()) {

src/crimson/osd/replicated_recovery_backend.cc

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -113,28 +113,33 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj(
113113
// object is not missing, don't pull
114114
return seastar::make_ready_future<>();
115115
}
116-
return pg.obc_loader.with_obc<RWState::RWREAD>(soid.get_head(),
117-
[this, soid, need](auto head, auto) {
118-
PullOp pull_op;
119-
auto& recovery_waiter = get_recovering(soid);
120-
recovery_waiter.pull_info =
121-
std::make_optional<RecoveryBackend::pull_info_t>();
122-
auto& pull_info = *recovery_waiter.pull_info;
123-
prepare_pull(head, pull_op, pull_info, soid, need);
124-
auto msg = crimson::make_message<MOSDPGPull>();
125-
msg->from = pg.get_pg_whoami();
126-
msg->set_priority(pg.get_recovery_op_priority());
127-
msg->pgid = pg.get_pgid();
128-
msg->map_epoch = pg.get_osdmap_epoch();
129-
msg->min_epoch = pg.get_last_peering_reset();
130-
msg->set_pulls({std::move(pull_op)});
131-
return shard_services.send_to_osd(
132-
pull_info.from.osd,
133-
std::move(msg),
134-
pg.get_osdmap_epoch());
135-
}).si_then([this, soid] {
136-
auto& recovery_waiter = get_recovering(soid);
137-
return recovery_waiter.wait_for_pull();
116+
return interruptor::repeat_eagain([this, soid, need] {
117+
using prepare_pull_iertr =
118+
crimson::osd::ObjectContextLoader::load_obc_iertr::extend<
119+
crimson::ct_error::eagain>;
120+
return pg.obc_loader.with_obc<RWState::RWREAD>(soid.get_head(),
121+
[this, soid, need](auto head, auto) {
122+
PullOp pull_op;
123+
auto& recovery_waiter = get_recovering(soid);
124+
recovery_waiter.pull_info =
125+
std::make_optional<RecoveryBackend::pull_info_t>();
126+
auto& pull_info = *recovery_waiter.pull_info;
127+
prepare_pull(head, pull_op, pull_info, soid, need);
128+
auto msg = crimson::make_message<MOSDPGPull>();
129+
msg->from = pg.get_pg_whoami();
130+
msg->set_priority(pg.get_recovery_op_priority());
131+
msg->pgid = pg.get_pgid();
132+
msg->map_epoch = pg.get_osdmap_epoch();
133+
msg->min_epoch = pg.get_last_peering_reset();
134+
msg->set_pulls({std::move(pull_op)});
135+
return shard_services.send_to_osd(
136+
pull_info.from.osd,
137+
std::move(msg),
138+
pg.get_osdmap_epoch());
139+
}).si_then([this, soid]() -> prepare_pull_iertr::future<> {
140+
auto& recovery_waiter = get_recovering(soid);
141+
return recovery_waiter.wait_for_pull();
142+
});
138143
}).handle_error_interruptible(
139144
crimson::ct_error::assert_all("unexpected error")
140145
);

0 commit comments

Comments
 (0)