Skip to content

Commit 791772f

Browse files
committed
crimson/osd/pg_recovery: use OperationThrottler to throttle object
pushes/pulls Instead of throttling recovery/backfill operations Fixes: https://tracker.ceph.com/issues/70180 Signed-off-by: Xuehan Xu <[email protected]>
1 parent 895d52c commit 791772f

File tree

7 files changed

+82
-27
lines changed

7 files changed

+82
-27
lines changed

src/crimson/osd/osd_operation.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,7 @@ OperationThrottler::OperationThrottler(ConfigProxy &conf)
158158

159159
void OperationThrottler::wake()
160160
{
161-
while ((!max_in_progress || in_progress < max_in_progress) &&
162-
!scheduler->empty()) {
161+
while (available() && !scheduler->empty()) {
163162
auto item = scheduler->dequeue();
164163
item.wake.set_value();
165164
++in_progress;

src/crimson/osd/osd_operation.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,22 @@ class OperationThrottler : public BlockerT<OperationThrottler>,
334334
const std::set<std::string> &changed) final;
335335
void update_from_config(const ConfigProxy &conf);
336336

337+
bool available() const {
338+
return !max_in_progress || in_progress < max_in_progress;
339+
}
340+
341+
template <typename F>
342+
auto with_throttle(
343+
crimson::osd::scheduler::params_t params,
344+
F &&f) {
345+
if (!max_in_progress) return f();
346+
return acquire_throttle(params)
347+
.then(std::forward<F>(f))
348+
.finally([this] {
349+
release_throttle();
350+
});
351+
}
352+
337353
template <class OpT, class... Args>
338354
seastar::future<> with_throttle_while(
339355
BlockingEvent::Trigger<OpT>&& trigger,

src/crimson/osd/osd_operations/background_recovery.cc

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,18 +77,19 @@ seastar::future<> BackgroundRecoveryT<T>::start()
7777
std::chrono::milliseconds(std::lround(delay * 1000)));
7878
}
7979
return maybe_delay.then([ref, this] {
80-
return this->template with_blocking_event<OperationThrottler::BlockingEvent>(
81-
[ref, this] (auto&& trigger) {
82-
return ss.with_throttle_while(
83-
std::move(trigger),
84-
this, get_scheduler_params(), [this] {
85-
return interruptor::with_interruption([this] {
86-
return do_recovery();
87-
}, [](std::exception_ptr) {
88-
return seastar::make_ready_future<bool>(false);
89-
}, pg, epoch_started);
90-
});
80+
return seastar::repeat([ref, this] {
81+
return interruptor::with_interruption([this] {
82+
return do_recovery();
83+
}, [](std::exception_ptr) {
84+
return seastar::make_ready_future<bool>(false);
85+
}, pg, epoch_started).then([](bool recovery_done) {
86+
if (recovery_done) {
87+
return seastar::stop_iteration::yes;
88+
} else {
89+
return seastar::stop_iteration::no;
90+
}
9191
});
92+
});
9293
});
9394
}
9495

@@ -117,7 +118,8 @@ UrgentRecovery::do_recovery()
117118
).then_interruptible([this] {
118119
return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
119120
interruptor>([this] (auto&& trigger) {
120-
return pg->get_recovery_handler()->recover_missing(trigger, soid, need);
121+
return pg->get_recovery_handler()->recover_missing(
122+
trigger, soid, need, false);
121123
}).then_interruptible([] {
122124
return seastar::make_ready_future<bool>(false);
123125
});

src/crimson/osd/osd_operations/background_recovery.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ class UrgentRecovery final : public BackgroundRecoveryT<UrgentRecovery> {
6666
void print(std::ostream&) const final;
6767

6868
std::tuple<
69-
OperationThrottler::BlockingEvent,
7069
RecoveryBackend::RecoveryBlockingEvent
7170
> tracking_events;
7271

@@ -86,7 +85,6 @@ class PglogBasedRecovery final : public BackgroundRecoveryT<PglogBasedRecovery>
8685
float delay = 0);
8786

8887
std::tuple<
89-
OperationThrottler::BlockingEvent,
9088
RecoveryBackend::RecoveryBlockingEvent
9189
> tracking_events;
9290

src/crimson/osd/pg_recovery.cc

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ PGRecovery::start_recovery_ops(
110110
}
111111
pg->reset_pglog_based_recovery_op();
112112
}
113-
return seastar::make_ready_future<bool>(!done);
113+
return seastar::make_ready_future<bool>(done);
114114
});
115115
}
116116

@@ -196,10 +196,10 @@ size_t PGRecovery::start_primary_recovery_ops(
196196
auto it = missing.get_items().find(head);
197197
assert(it != missing.get_items().end());
198198
auto head_need = it->second.need;
199-
out->emplace_back(recover_missing(trigger, head, head_need));
199+
out->emplace_back(recover_missing(trigger, head, head_need, true));
200200
++skipped;
201201
} else {
202-
out->emplace_back(recover_missing(trigger, soid, item.need));
202+
out->emplace_back(recover_missing(trigger, soid, item.need, true));
203203
}
204204
++started;
205205
}
@@ -306,7 +306,9 @@ size_t PGRecovery::start_replica_recovery_ops(
306306
PGRecovery::interruptible_future<>
307307
PGRecovery::recover_missing(
308308
RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
309-
const hobject_t &soid, eversion_t need)
309+
const hobject_t &soid,
310+
eversion_t need,
311+
bool with_throttle)
310312
{
311313
logger().info("{} {} v {}", __func__, soid, need);
312314
auto [recovering, added] = pg->get_recovery_backend()->add_recovering(soid);
@@ -319,7 +321,9 @@ PGRecovery::recover_missing(
319321
} else {
320322
return recovering.wait_track_blocking(
321323
trigger,
322-
pg->get_recovery_backend()->recover_object(soid, need)
324+
with_throttle
325+
? recover_object_with_throttle(soid, need)
326+
: recover_object(soid, need)
323327
.handle_exception_interruptible(
324328
[=, this, soid = std::move(soid)] (auto e) {
325329
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
@@ -367,7 +371,7 @@ RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes(
367371
logger().info("{} {} v {}, new recovery", __func__, soid, need);
368372
return recovering.wait_track_blocking(
369373
trigger,
370-
pg->get_recovery_backend()->recover_object(soid, need)
374+
recover_object_with_throttle(soid, need)
371375
.handle_exception_interruptible(
372376
[=, this, soid = std::move(soid)] (auto e) {
373377
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
@@ -516,6 +520,25 @@ void PGRecovery::request_primary_scan(
516520
});
517521
}
518522

523+
PGRecovery::interruptible_future<>
524+
PGRecovery::recover_object_with_throttle(
525+
const hobject_t &soid,
526+
eversion_t need)
527+
{
528+
crimson::osd::scheduler::params_t params =
529+
{1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
530+
auto &ss = pg->get_shard_services();
531+
logger().debug("{} {}", soid, need);
532+
return ss.with_throttle(
533+
std::move(params),
534+
[this, soid, need] {
535+
logger().debug("got throttle: {} {}", soid, need);
536+
auto backend = pg->get_recovery_backend();
537+
assert(backend);
538+
return backend->recover_object(soid, need);
539+
});
540+
}
541+
519542
void PGRecovery::enqueue_push(
520543
const hobject_t& obj,
521544
const eversion_t& v,
@@ -527,7 +550,7 @@ void PGRecovery::enqueue_push(
527550
if (!added)
528551
return;
529552
peering_state.prepare_backfill_for_missing(obj, v, peers);
530-
std::ignore = pg->get_recovery_backend()->recover_object(obj, v).\
553+
std::ignore = recover_object_with_throttle(obj, v).\
531554
handle_exception_interruptible([] (auto) {
532555
ceph_abort_msg("got exception on backfill's push");
533556
return seastar::make_ready_future<>();
@@ -605,8 +628,8 @@ void PGRecovery::update_peers_last_backfill(
605628

606629
bool PGRecovery::budget_available() const
607630
{
608-
// TODO: the limits!
609-
return true;
631+
auto &ss = pg->get_shard_services();
632+
return ss.throttle_available();
610633
}
611634

612635
void PGRecovery::on_pg_clean()

src/crimson/osd/pg_recovery.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
6565
}
6666
RecoveryBackend::interruptible_future<> recover_missing(
6767
RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
68-
const hobject_t &soid, eversion_t need);
68+
const hobject_t &soid,
69+
eversion_t need,
70+
bool with_throttle);
6971
RecoveryBackend::interruptible_future<> prep_object_replica_deletes(
7072
RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
7173
const hobject_t& soid,
@@ -97,6 +99,18 @@ class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
9799
friend class ReplicatedRecoveryBackend;
98100
friend class crimson::osd::UrgentRecovery;
99101

102+
interruptible_future<> recover_object_with_throttle(
103+
const hobject_t &soid,
104+
eversion_t need);
105+
106+
interruptible_future<> recover_object(
107+
const hobject_t &soid,
108+
eversion_t need) {
109+
auto backend = pg->get_recovery_backend();
110+
assert(backend);
111+
return backend->recover_object(soid, need);
112+
}
113+
100114
// backfill begin
101115
std::unique_ptr<crimson::osd::BackfillState> backfill_state;
102116
std::map<pg_shard_t,

src/crimson/osd/shard_services.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ class ShardServices : public OSDMapService {
593593
}
594594

595595
FORWARD_TO_OSD_SINGLETON(get_pool_info)
596-
FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
596+
FORWARD(with_throttle, with_throttle, local_state.throttler)
597597

598598
FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg)
599599
FORWARD_TO_OSD_SINGLETON(send_incremental_map)
@@ -617,6 +617,9 @@ class ShardServices : public OSDMapService {
617617
snap_dump_reservations,
618618
snap_reserver.dump)
619619

620+
bool throttle_available() const {
621+
return local_state.throttler.available();
622+
}
620623

621624
auto local_update_priority(
622625
singleton_orderer_t &orderer,

0 commit comments

Comments
 (0)