Skip to content

Commit d91864f

Browse files
authored
Merge pull request ceph#58148 from xxhdx1985126/wip-65696
crimson/osd/osd_operations: hang requests if the objects are unfound Reviewed-by: Matan Breizman <[email protected]>
2 parents a5cf395 + d8e1567 commit d91864f

File tree

8 files changed

+96
-41
lines changed

8 files changed

+96
-41
lines changed

src/crimson/osd/osd_operations/background_recovery.cc

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,6 @@ seastar::future<> BackgroundRecoveryT<T>::start()
8787
}, [](std::exception_ptr) {
8888
return seastar::make_ready_future<bool>(false);
8989
}, pg, epoch_started);
90-
}).handle_exception_type([ref, this](const std::system_error& err) {
91-
LOG_PREFIX(BackgroundRecoveryT<T>::start);
92-
if (err.code() == std::make_error_code(std::errc::interrupted)) {
93-
DEBUGDPPI("recovery interruped: {}", *pg, err.what());
94-
return seastar::now();
95-
}
96-
return seastar::make_exception_future<>(err);
9790
});
9891
});
9992
});

src/crimson/osd/osd_operations/client_request.cc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ ClientRequest::recover_missing_snaps(
286286
ObjectContextRef head,
287287
std::set<snapid_t> &snaps)
288288
{
289+
LOG_PREFIX(ClientRequest::process_op);
289290
co_await ihref.enter_stage<interruptor>(
290291
client_pp(*pg).recover_missing_snaps, *this);
291292
for (auto &snap : snaps) {
@@ -299,7 +300,12 @@ ClientRequest::recover_missing_snaps(
299300
* we skip the oid as there is no corresponding clone to recover.
300301
* See https://tracker.ceph.com/issues/63821 */
301302
if (oid) {
302-
co_await do_recover_missing(pg, *oid, m->get_reqid());
303+
auto unfound = co_await do_recover_missing(pg, *oid, m->get_reqid());
304+
if (unfound) {
305+
DEBUGDPP("{} unfound, hang it for now", *pg, m->get_hobj().get_head());
306+
co_await interruptor::make_interruptible(
307+
pg->get_recovery_backend()->add_unfound(m->get_hobj().get_head()));
308+
}
303309
}
304310
}
305311
}
@@ -317,7 +323,14 @@ ClientRequest::process_op(
317323
"Skipping recover_missings on non primary pg for soid {}",
318324
*pg, m->get_hobj());
319325
} else {
320-
co_await do_recover_missing(pg, m->get_hobj().get_head(), m->get_reqid());
326+
auto unfound = co_await do_recover_missing(
327+
pg, m->get_hobj().get_head(), m->get_reqid());
328+
if (unfound) {
329+
DEBUGDPP("{} unfound, hang it for now", *pg, m->get_hobj().get_head());
330+
co_await interruptor::make_interruptible(
331+
pg->get_recovery_backend()->add_unfound(m->get_hobj().get_head()));
332+
}
333+
321334
std::set<snapid_t> snaps = snaps_need_to_recover();
322335
if (!snaps.empty()) {
323336
// call with_obc() in order, but wait concurrently for loading.

src/crimson/osd/osd_operations/client_request_common.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace {
1313

1414
namespace crimson::osd {
1515

16-
typename InterruptibleOperation::template interruptible_future<>
16+
typename InterruptibleOperation::template interruptible_future<bool>
1717
CommonClientRequest::do_recover_missing(
1818
Ref<PG> pg,
1919
const hobject_t& soid,
@@ -45,22 +45,29 @@ CommonClientRequest::do_recover_missing(
4545
if (!needs_recovery_or_backfill) {
4646
logger().debug("{} reqid {} nothing to recover {}",
4747
__func__, reqid, soid);
48-
return seastar::now();
48+
return seastar::make_ready_future<bool>(false);
4949
}
5050

51+
if (pg->get_peering_state().get_missing_loc().is_unfound(soid)) {
52+
return seastar::make_ready_future<bool>(true);
53+
}
5154
logger().debug("{} reqid {} need to wait for recovery, {} version {}",
5255
__func__, reqid, soid, ver);
5356
if (pg->get_recovery_backend()->is_recovering(soid)) {
5457
logger().debug("{} reqid {} object {} version {}, already recovering",
5558
__func__, reqid, soid, ver);
56-
return pg->get_recovery_backend()->get_recovering(soid).wait_for_recovered();
59+
return pg->get_recovery_backend()->get_recovering(
60+
soid).wait_for_recovered(
61+
).then([] {
62+
return seastar::make_ready_future<bool>(false);
63+
});
5764
} else {
5865
logger().debug("{} reqid {} object {} version {}, starting recovery",
5966
__func__, reqid, soid, ver);
6067
auto [op, fut] =
6168
pg->get_shard_services().start_operation<UrgentRecovery>(
6269
soid, ver, pg, pg->get_shard_services(), pg->get_osdmap_epoch());
63-
return std::move(fut);
70+
return fut.then([] { return seastar::make_ready_future<bool>(false); });
6471
}
6572
}
6673

src/crimson/osd/osd_operations/client_request_common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace crimson::osd {
1111

1212
struct CommonClientRequest {
1313

14-
static InterruptibleOperation::template interruptible_future<>
14+
static InterruptibleOperation::template interruptible_future<bool>
1515
do_recover_missing(
1616
Ref<PG> pg,
1717
const hobject_t& soid,

src/crimson/osd/osd_operations/internal_client_request.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@ seastar::future<> InternalClientRequest::start()
7070
client_pp().recover_missing);
7171
}).then_interruptible([this] {
7272
return do_recover_missing(pg, get_target_oid(), osd_reqid_t());
73-
}).then_interruptible([this] {
73+
}).then_interruptible([this](bool unfound) {
74+
if (unfound) {
75+
throw std::system_error(
76+
std::make_error_code(std::errc::operation_canceled),
77+
fmt::format("{} is unfound, drop it!", get_target_oid()));
78+
}
7479
return enter_stage<interruptor>(
7580
client_pp().get_obc);
7681
}).then_interruptible([this] () -> PG::load_obc_iertr::future<> {
@@ -128,6 +133,9 @@ seastar::future<> InternalClientRequest::start()
128133
}, pg, start_epoch);
129134
}).then([this] {
130135
track_event<CompletionEvent>();
136+
}).handle_exception_type([](std::system_error &error) {
137+
logger().debug("error {}, message: {}", error.code(), error.what());
138+
return seastar::now();
131139
}).finally([this] {
132140
logger().debug("{}: exit", *this);
133141
handle.exit();

src/crimson/osd/pg_recovery.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ void PGRecovery::on_global_recover (
433433
auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid);
434434
recovery_waiter.set_recovered();
435435
pg->get_recovery_backend()->remove_recovering(soid);
436+
pg->get_recovery_backend()->found_and_remove(soid);
436437
}
437438

438439
void PGRecovery::on_failed_recover(

src/crimson/osd/recovery_backend.cc

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ void RecoveryBackend::clear_temp_obj(const hobject_t &oid)
4343
}
4444

4545
void RecoveryBackend::clean_up(ceph::os::Transaction& t,
46-
std::string_view why)
46+
interrupt_cause_t why)
4747
{
4848
for_each_temp_obj([&](auto &soid) {
4949
t.remove(pg.get_collection_ref()->get_cid(),
@@ -65,6 +65,36 @@ void RecoveryBackend::clean_up(ceph::os::Transaction& t,
6565
recovering.clear();
6666
}
6767

68+
void RecoveryBackend::WaitForObjectRecovery::interrupt(interrupt_cause_t why) {
69+
switch(why) {
70+
case interrupt_cause_t::INTERVAL_CHANGE:
71+
if (readable) {
72+
readable->set_exception(
73+
crimson::common::actingset_changed(pg.is_primary()));
74+
readable.reset();
75+
}
76+
if (recovered) {
77+
recovered->set_exception(
78+
crimson::common::actingset_changed(pg.is_primary()));
79+
recovered.reset();
80+
}
81+
if (pulled) {
82+
pulled->set_exception(
83+
crimson::common::actingset_changed(pg.is_primary()));
84+
pulled.reset();
85+
}
86+
for (auto& [pg_shard, pr] : pushes) {
87+
pr.set_exception(
88+
crimson::common::actingset_changed(pg.is_primary()));
89+
}
90+
pushes.clear();
91+
break;
92+
default:
93+
ceph_abort("impossible");
94+
break;
95+
}
96+
}
97+
6898
void RecoveryBackend::WaitForObjectRecovery::stop() {
6999
if (readable) {
70100
readable->set_exception(

src/crimson/osd/recovery_backend.h

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,22 @@ class RecoveryBackend {
4545
backend{backend} {}
4646
virtual ~RecoveryBackend() {}
4747
std::pair<WaitForObjectRecovery&, bool> add_recovering(const hobject_t& soid) {
48-
auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery{});
48+
auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery(pg));
4949
assert(it->second);
5050
return {*(it->second), added};
5151
}
52+
seastar::future<> add_unfound(const hobject_t &soid) {
53+
auto [it, added] = unfound.emplace(soid, seastar::shared_promise());
54+
return it->second.get_shared_future();
55+
}
56+
void found_and_remove(const hobject_t &soid) {
57+
auto it = unfound.find(soid);
58+
if (it != unfound.end()) {
59+
auto &found_promise = it->second;
60+
found_promise.set_value();
61+
unfound.erase(it);
62+
}
63+
}
5264
WaitForObjectRecovery& get_recovering(const hobject_t& soid) {
5365
assert(is_recovering(soid));
5466
return *(recovering.at(soid));
@@ -82,14 +94,22 @@ class RecoveryBackend {
8294
std::int64_t min,
8395
std::int64_t max);
8496

97+
enum interrupt_cause_t : uint8_t {
98+
INTERVAL_CHANGE,
99+
MAX
100+
};
85101
void on_peering_interval_change(ceph::os::Transaction& t) {
86-
clean_up(t, "new peering interval");
102+
clean_up(t, interrupt_cause_t::INTERVAL_CHANGE);
87103
}
88104

89105
seastar::future<> stop() {
90106
for (auto& [soid, recovery_waiter] : recovering) {
91107
recovery_waiter->stop();
92108
}
109+
for (auto& [soid, promise] : unfound) {
110+
promise.set_exception(
111+
crimson::common::system_shutdown_exception());
112+
}
93113
return on_stop();
94114
}
95115
protected:
@@ -124,11 +144,14 @@ class RecoveryBackend {
124144
public boost::intrusive_ref_counter<
125145
WaitForObjectRecovery, boost::thread_unsafe_counter>,
126146
public crimson::BlockerT<WaitForObjectRecovery> {
147+
crimson::osd::PG &pg;
127148
std::optional<seastar::shared_promise<>> readable, recovered, pulled;
128149
std::map<pg_shard_t, seastar::shared_promise<>> pushes;
129150
public:
130151
static constexpr const char* type_name = "WaitForObjectRecovery";
131152

153+
WaitForObjectRecovery(crimson::osd::PG &pg) : pg(pg) {}
154+
132155
crimson::osd::ObjectContextRef obc;
133156
std::optional<pull_info_t> pull_info;
134157
std::map<pg_shard_t, push_info_t> pushing;
@@ -204,28 +227,7 @@ class RecoveryBackend {
204227
pushes.erase(it);
205228
}
206229
}
207-
void interrupt(std::string_view why) {
208-
if (readable) {
209-
readable->set_exception(std::system_error(
210-
std::make_error_code(std::errc::interrupted), why.data()));
211-
readable.reset();
212-
}
213-
if (recovered) {
214-
recovered->set_exception(std::system_error(
215-
std::make_error_code(std::errc::interrupted), why.data()));
216-
recovered.reset();
217-
}
218-
if (pulled) {
219-
pulled->set_exception(std::system_error(
220-
std::make_error_code(std::errc::interrupted), why.data()));
221-
pulled.reset();
222-
}
223-
for (auto& [pg_shard, pr] : pushes) {
224-
pr.set_exception(std::system_error(
225-
std::make_error_code(std::errc::interrupted), why.data()));
226-
}
227-
pushes.clear();
228-
}
230+
void interrupt(interrupt_cause_t why);
229231
void stop();
230232
void dump_detail(Formatter* f) const {
231233
}
@@ -235,6 +237,7 @@ class RecoveryBackend {
235237
using WaitForObjectRecoveryRef = boost::intrusive_ptr<WaitForObjectRecovery>;
236238
protected:
237239
std::map<hobject_t, WaitForObjectRecoveryRef> recovering;
240+
std::map<hobject_t, seastar::shared_promise<>> unfound;
238241
hobject_t get_temp_recovery_object(
239242
const hobject_t& target,
240243
eversion_t version) const;
@@ -249,7 +252,7 @@ class RecoveryBackend {
249252
backend->clear_temp_objs();
250253
}
251254

252-
void clean_up(ceph::os::Transaction& t, std::string_view why);
255+
void clean_up(ceph::os::Transaction& t, interrupt_cause_t why);
253256
virtual seastar::future<> on_stop() = 0;
254257
private:
255258
void handle_backfill_finish(

0 commit comments

Comments
 (0)