Skip to content

Commit 657d5b3

Browse files
authored
Merge pull request ceph#59189 from xxhdx1985126/wip-67508
crimson/osd/recovery_backend: restart object pulls that are blocked by down osds Reviewed-by: Matan Breizman <[email protected]>
2 parents f404c63 + bf694f2 commit 657d5b3

File tree

5 files changed

+99
-23
lines changed

5 files changed

+99
-23
lines changed

src/crimson/common/interruptible_future.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,6 +1330,27 @@ struct interruptor
13301330
);
13311331
}
13321332
}
1333+
1334+
template <InvokeReturnsInterruptibleFuture AsyncAction>
1335+
[[gnu::always_inline]]
1336+
static auto repeat_eagain(AsyncAction&& action) {
1337+
return seastar::do_with(
1338+
std::forward<AsyncAction>(action),
1339+
[] (auto &f) {
1340+
return repeat([&f] {
1341+
return std::invoke(f
1342+
).si_then([] {
1343+
return seastar::stop_iteration::yes;
1344+
}).handle_error_interruptible(
1345+
[](const crimson::ct_error::eagain &e) {
1346+
return seastar::stop_iteration::no;
1347+
},
1348+
crimson::ct_error::pass_further_all{}
1349+
);
1350+
});
1351+
});
1352+
}
1353+
13331354
template <typename AsyncAction>
13341355
requires (!InvokeReturnsInterruptibleFuture<AsyncAction>)
13351356
[[gnu::always_inline]]

src/crimson/osd/pg.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,20 @@ class PG : public boost::intrusive_ref_counter<
357357
shard_services.remove_want_pg_temp(orderer, pgid.pgid);
358358
}
359359
void check_recovery_sources(const OSDMapRef& newmap) final {
360-
// Not needed yet
360+
LOG_PREFIX(PG::check_recovery_sources);
361+
recovery_backend->for_each_recovery_waiter(
362+
[newmap, FNAME, this](auto &, auto &waiter) {
363+
if (waiter->is_pulling() &&
364+
newmap->is_down(waiter->pull_info->from.osd)) {
365+
SUBDEBUGDPP(
366+
osd,
367+
" repeating pulling for {}, due to osd {} down",
368+
*this,
369+
waiter->pull_info->soid,
370+
waiter->pull_info->from.osd);
371+
waiter->repeat_pull();
372+
}
373+
});
361374
}
362375
void check_blocklisted_watchers() final;
363376
void clear_primary_state() final {

src/crimson/osd/recovery_backend.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ class RecoveryBackend {
112112
}
113113
return on_stop();
114114
}
115+
116+
template <typename Func>
117+
void for_each_recovery_waiter(Func &&f) {
118+
for (auto &[soid, recovery_waiter] : recovering) {
119+
std::forward<Func>(f)(soid, recovery_waiter);
120+
}
121+
}
115122
protected:
116123
crimson::osd::PG& pg;
117124
crimson::osd::ShardServices& shard_services;
@@ -219,6 +226,13 @@ class RecoveryBackend {
219226
pulled.reset();
220227
}
221228
}
229+
void repeat_pull() {
230+
ceph_assert(pulled);
231+
pulled->set_exception(crimson::ct_error::eagain::exception_ptr());
232+
}
233+
bool is_pulling() const {
234+
return (bool)pulled;
235+
}
222236
void set_push_failed(pg_shard_t shard, std::exception_ptr e) {
223237
auto it = pushes.find(shard);
224238
if (it != pushes.end()) {

src/crimson/osd/replicated_recovery_backend.cc

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -113,28 +113,33 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj(
113113
// object is not missing, don't pull
114114
return seastar::make_ready_future<>();
115115
}
116-
return pg.obc_loader.with_obc<RWState::RWREAD>(soid.get_head(),
117-
[this, soid, need](auto head, auto) {
118-
PullOp pull_op;
119-
auto& recovery_waiter = get_recovering(soid);
120-
recovery_waiter.pull_info =
121-
std::make_optional<RecoveryBackend::pull_info_t>();
122-
auto& pull_info = *recovery_waiter.pull_info;
123-
prepare_pull(head, pull_op, pull_info, soid, need);
124-
auto msg = crimson::make_message<MOSDPGPull>();
125-
msg->from = pg.get_pg_whoami();
126-
msg->set_priority(pg.get_recovery_op_priority());
127-
msg->pgid = pg.get_pgid();
128-
msg->map_epoch = pg.get_osdmap_epoch();
129-
msg->min_epoch = pg.get_last_peering_reset();
130-
msg->set_pulls({std::move(pull_op)});
131-
return shard_services.send_to_osd(
132-
pull_info.from.osd,
133-
std::move(msg),
134-
pg.get_osdmap_epoch());
135-
}).si_then([this, soid] {
136-
auto& recovery_waiter = get_recovering(soid);
137-
return recovery_waiter.wait_for_pull();
116+
return interruptor::repeat_eagain([this, soid, need] {
117+
using prepare_pull_iertr =
118+
crimson::osd::ObjectContextLoader::load_obc_iertr::extend<
119+
crimson::ct_error::eagain>;
120+
return pg.obc_loader.with_obc<RWState::RWREAD>(soid.get_head(),
121+
[this, soid, need](auto head, auto) {
122+
PullOp pull_op;
123+
auto& recovery_waiter = get_recovering(soid);
124+
recovery_waiter.pull_info =
125+
std::make_optional<RecoveryBackend::pull_info_t>();
126+
auto& pull_info = *recovery_waiter.pull_info;
127+
prepare_pull(head, pull_op, pull_info, soid, need);
128+
auto msg = crimson::make_message<MOSDPGPull>();
129+
msg->from = pg.get_pg_whoami();
130+
msg->set_priority(pg.get_recovery_op_priority());
131+
msg->pgid = pg.get_pgid();
132+
msg->map_epoch = pg.get_osdmap_epoch();
133+
msg->min_epoch = pg.get_last_peering_reset();
134+
msg->set_pulls({std::move(pull_op)});
135+
return shard_services.send_to_osd(
136+
pull_info.from.osd,
137+
std::move(msg),
138+
pg.get_osdmap_epoch());
139+
}).si_then([this, soid]() -> prepare_pull_iertr::future<> {
140+
auto& recovery_waiter = get_recovering(soid);
141+
return recovery_waiter.wait_for_pull();
142+
});
138143
}).handle_error_interruptible(
139144
crimson::ct_error::assert_all("unexpected error")
140145
);

src/test/crimson/test_interruptible_future.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,29 @@ TEST_F(seastar_test_suite_t, DISABLED_nested_interruptors)
282282
});
283283
}
284284

285+
TEST_F(seastar_test_suite_t, interruptible_repeat_eagain)
286+
{
287+
using interruptor =
288+
interruptible::interruptor<TestInterruptCondition>;
289+
run_async([] {
290+
interruptor::with_interruption([] {
291+
return seastar::do_with(
292+
0,
293+
[](auto &i) {
294+
return interruptor::repeat_eagain([&i]() -> base_iertr::future<> {
295+
if (++i < 5) {
296+
return crimson::ct_error::eagain::make();
297+
}
298+
return base_iertr::now();
299+
}).si_then([&i] {
300+
std::cout << i << std::endl;
301+
ceph_assert(i == 5);
302+
});
303+
});
304+
}, [](std::exception_ptr) {}, false).unsafe_get();
305+
});
306+
}
307+
285308
#if 0
286309
// This seems to cause a hang in the gcc-9 linker on bionic
287310
TEST_F(seastar_test_suite_t, handle_error)

0 commit comments

Comments
 (0)