Skip to content

Commit 9cb8feb

Browse files
authored
Merge pull request ceph#62080 from xxhdx1985126/wip-70180
crimson/osd/pg_recovery: use OperationThrottler to throttle object pushes/pulls Reviewed-by: Matan Breizman <[email protected]>
2 parents 25e7e77 + 791772f commit 9cb8feb

File tree

9 files changed

+83
-58
lines changed

9 files changed

+83
-58
lines changed

src/crimson/osd/backfill_state.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,6 @@ BackfillState::Enqueuing::Enqueuing(my_context ctx)
342342

343343
do {
344344
if (!backfill_listener().budget_available()) {
345-
DEBUGDPP("throttle failed, turning to Waiting", pg());
346345
post_event(RequestWaiting{});
347346
return;
348347
} else if (should_rescan_replicas(backfill_state().peer_backfill_info,

src/crimson/osd/backfill_state.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ struct BackfillState {
6262
struct SuspendBackfill : sc::event<SuspendBackfill> {
6363
};
6464

65-
struct ThrottleAcquired : sc::event<ThrottleAcquired> {
66-
};
6765
private:
6866
// internal events
6967
struct RequestPrimaryScanning : sc::event<RequestPrimaryScanning> {
@@ -262,7 +260,6 @@ struct BackfillState {
262260
sc::transition<RequestDone, Done>,
263261
sc::custom_reaction<SuspendBackfill>,
264262
sc::custom_reaction<Triggered>,
265-
sc::transition<ThrottleAcquired, Enqueuing>,
266263
sc::transition<sc::event_base, Crashed>>;
267264
explicit Waiting(my_context);
268265
sc::result react(ObjectPushed);

src/crimson/osd/osd_operation.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,7 @@ OperationThrottler::OperationThrottler(ConfigProxy &conf)
158158

159159
void OperationThrottler::wake()
160160
{
161-
while ((!max_in_progress || in_progress < max_in_progress) &&
162-
!scheduler->empty()) {
161+
while (available() && !scheduler->empty()) {
163162
auto item = scheduler->dequeue();
164163
item.wake.set_value();
165164
++in_progress;

src/crimson/osd/osd_operation.h

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,22 @@ class OperationThrottler : public BlockerT<OperationThrottler>,
334334
const std::set<std::string> &changed) final;
335335
void update_from_config(const ConfigProxy &conf);
336336

337+
bool available() const {
338+
return !max_in_progress || in_progress < max_in_progress;
339+
}
340+
341+
template <typename F>
342+
auto with_throttle(
343+
crimson::osd::scheduler::params_t params,
344+
F &&f) {
345+
if (!max_in_progress) return f();
346+
return acquire_throttle(params)
347+
.then(std::forward<F>(f))
348+
.finally([this] {
349+
release_throttle();
350+
});
351+
}
352+
337353
template <class OpT, class... Args>
338354
seastar::future<> with_throttle_while(
339355
BlockingEvent::Trigger<OpT>&& trigger,
@@ -342,18 +358,6 @@ class OperationThrottler : public BlockerT<OperationThrottler>,
342358
with_throttle_while(std::forward<Args>(args)...), *this);
343359
}
344360

345-
// Returns std::nullopt if the throttle is acquired immediately,
346-
// returns the future for the acquiring otherwise
347-
std::optional<seastar::future<>>
348-
try_acquire_throttle_now(crimson::osd::scheduler::params_t params) {
349-
if (!max_in_progress || in_progress < max_in_progress) {
350-
++in_progress;
351-
--pending;
352-
return std::nullopt;
353-
}
354-
return acquire_throttle(params);
355-
}
356-
357361
private:
358362
void dump_detail(Formatter *f) const final;
359363

src/crimson/osd/osd_operations/background_recovery.cc

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,18 +77,19 @@ seastar::future<> BackgroundRecoveryT<T>::start()
7777
std::chrono::milliseconds(std::lround(delay * 1000)));
7878
}
7979
return maybe_delay.then([ref, this] {
80-
return this->template with_blocking_event<OperationThrottler::BlockingEvent>(
81-
[ref, this] (auto&& trigger) {
82-
return ss.with_throttle_while(
83-
std::move(trigger),
84-
this, get_scheduler_params(), [this] {
85-
return interruptor::with_interruption([this] {
86-
return do_recovery();
87-
}, [](std::exception_ptr) {
88-
return seastar::make_ready_future<bool>(false);
89-
}, pg, epoch_started);
90-
});
80+
return seastar::repeat([ref, this] {
81+
return interruptor::with_interruption([this] {
82+
return do_recovery();
83+
}, [](std::exception_ptr) {
84+
return seastar::make_ready_future<bool>(false);
85+
}, pg, epoch_started).then([](bool recovery_done) {
86+
if (recovery_done) {
87+
return seastar::stop_iteration::yes;
88+
} else {
89+
return seastar::stop_iteration::no;
90+
}
9191
});
92+
});
9293
});
9394
}
9495

@@ -117,7 +118,8 @@ UrgentRecovery::do_recovery()
117118
).then_interruptible([this] {
118119
return with_blocking_event<RecoveryBackend::RecoveryBlockingEvent,
119120
interruptor>([this] (auto&& trigger) {
120-
return pg->get_recovery_handler()->recover_missing(trigger, soid, need);
121+
return pg->get_recovery_handler()->recover_missing(
122+
trigger, soid, need, false);
121123
}).then_interruptible([] {
122124
return seastar::make_ready_future<bool>(false);
123125
});

src/crimson/osd/osd_operations/background_recovery.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ class UrgentRecovery final : public BackgroundRecoveryT<UrgentRecovery> {
6666
void print(std::ostream&) const final;
6767

6868
std::tuple<
69-
OperationThrottler::BlockingEvent,
7069
RecoveryBackend::RecoveryBlockingEvent
7170
> tracking_events;
7271

@@ -86,7 +85,6 @@ class PglogBasedRecovery final : public BackgroundRecoveryT<PglogBasedRecovery>
8685
float delay = 0);
8786

8887
std::tuple<
89-
OperationThrottler::BlockingEvent,
9088
RecoveryBackend::RecoveryBlockingEvent
9189
> tracking_events;
9290

src/crimson/osd/pg_recovery.cc

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ PGRecovery::start_recovery_ops(
6767
if (max_to_start > 0) {
6868
max_to_start -= start_replica_recovery_ops(trigger, max_to_start, &started);
6969
}
70+
using interruptor =
71+
crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
7072
return interruptor::parallel_for_each(started,
7173
[] (auto&& ifut) {
7274
return std::move(ifut);
@@ -108,7 +110,7 @@ PGRecovery::start_recovery_ops(
108110
}
109111
pg->reset_pglog_based_recovery_op();
110112
}
111-
return seastar::make_ready_future<bool>(!done);
113+
return seastar::make_ready_future<bool>(done);
112114
});
113115
}
114116

@@ -194,10 +196,10 @@ size_t PGRecovery::start_primary_recovery_ops(
194196
auto it = missing.get_items().find(head);
195197
assert(it != missing.get_items().end());
196198
auto head_need = it->second.need;
197-
out->emplace_back(recover_missing(trigger, head, head_need));
199+
out->emplace_back(recover_missing(trigger, head, head_need, true));
198200
++skipped;
199201
} else {
200-
out->emplace_back(recover_missing(trigger, soid, item.need));
202+
out->emplace_back(recover_missing(trigger, soid, item.need, true));
201203
}
202204
++started;
203205
}
@@ -304,7 +306,9 @@ size_t PGRecovery::start_replica_recovery_ops(
304306
PGRecovery::interruptible_future<>
305307
PGRecovery::recover_missing(
306308
RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
307-
const hobject_t &soid, eversion_t need)
309+
const hobject_t &soid,
310+
eversion_t need,
311+
bool with_throttle)
308312
{
309313
logger().info("{} {} v {}", __func__, soid, need);
310314
auto [recovering, added] = pg->get_recovery_backend()->add_recovering(soid);
@@ -317,7 +321,9 @@ PGRecovery::recover_missing(
317321
} else {
318322
return recovering.wait_track_blocking(
319323
trigger,
320-
pg->get_recovery_backend()->recover_object(soid, need)
324+
with_throttle
325+
? recover_object_with_throttle(soid, need)
326+
: recover_object(soid, need)
321327
.handle_exception_interruptible(
322328
[=, this, soid = std::move(soid)] (auto e) {
323329
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
@@ -365,7 +371,7 @@ RecoveryBackend::interruptible_future<> PGRecovery::prep_object_replica_pushes(
365371
logger().info("{} {} v {}, new recovery", __func__, soid, need);
366372
return recovering.wait_track_blocking(
367373
trigger,
368-
pg->get_recovery_backend()->recover_object(soid, need)
374+
recover_object_with_throttle(soid, need)
369375
.handle_exception_interruptible(
370376
[=, this, soid = std::move(soid)] (auto e) {
371377
on_failed_recover({ pg->get_pg_whoami() }, soid, need);
@@ -514,6 +520,25 @@ void PGRecovery::request_primary_scan(
514520
});
515521
}
516522

523+
PGRecovery::interruptible_future<>
524+
PGRecovery::recover_object_with_throttle(
525+
const hobject_t &soid,
526+
eversion_t need)
527+
{
528+
crimson::osd::scheduler::params_t params =
529+
{1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
530+
auto &ss = pg->get_shard_services();
531+
logger().debug("{} {}", soid, need);
532+
return ss.with_throttle(
533+
std::move(params),
534+
[this, soid, need] {
535+
logger().debug("got throttle: {} {}", soid, need);
536+
auto backend = pg->get_recovery_backend();
537+
assert(backend);
538+
return backend->recover_object(soid, need);
539+
});
540+
}
541+
517542
void PGRecovery::enqueue_push(
518543
const hobject_t& obj,
519544
const eversion_t& v,
@@ -525,7 +550,7 @@ void PGRecovery::enqueue_push(
525550
if (!added)
526551
return;
527552
peering_state.prepare_backfill_for_missing(obj, v, peers);
528-
std::ignore = pg->get_recovery_backend()->recover_object(obj, v).\
553+
std::ignore = recover_object_with_throttle(obj, v).\
529554
handle_exception_interruptible([] (auto) {
530555
ceph_abort_msg("got exception on backfill's push");
531556
return seastar::make_ready_future<>();
@@ -603,21 +628,8 @@ void PGRecovery::update_peers_last_backfill(
603628

604629
bool PGRecovery::budget_available() const
605630
{
606-
crimson::osd::scheduler::params_t params =
607-
{1, 0, crimson::osd::scheduler::scheduler_class_t::background_best_effort};
608631
auto &ss = pg->get_shard_services();
609-
auto futopt = ss.try_acquire_throttle_now(std::move(params));
610-
if (!futopt) {
611-
return true;
612-
}
613-
std::ignore = interruptor::make_interruptible(std::move(*futopt)
614-
).then_interruptible([this] {
615-
assert(!backfill_state->is_triggered());
616-
using BackfillState = crimson::osd::BackfillState;
617-
backfill_state->process_event(
618-
BackfillState::ThrottleAcquired{}.intrusive_from_this());
619-
});
620-
return false;
632+
return ss.throttle_available();
621633
}
622634

623635
void PGRecovery::on_pg_clean()

src/crimson/osd/pg_recovery.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ class PGBackend;
2525

2626
class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
2727
public:
28-
using interruptor =
29-
crimson::interruptible::interruptor<crimson::osd::IOInterruptCondition>;
3028
template <typename T = void>
3129
using interruptible_future = RecoveryBackend::interruptible_future<T>;
3230
PGRecovery(PGRecoveryListener* pg) : pg(pg) {}
@@ -67,7 +65,9 @@ class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
6765
}
6866
RecoveryBackend::interruptible_future<> recover_missing(
6967
RecoveryBackend::RecoveryBlockingEvent::TriggerI&,
70-
const hobject_t &soid, eversion_t need);
68+
const hobject_t &soid,
69+
eversion_t need,
70+
bool with_throttle);
7171
RecoveryBackend::interruptible_future<> prep_object_replica_deletes(
7272
RecoveryBackend::RecoveryBlockingEvent::TriggerI& trigger,
7373
const hobject_t& soid,
@@ -99,6 +99,18 @@ class PGRecovery : public crimson::osd::BackfillState::BackfillListener {
9999
friend class ReplicatedRecoveryBackend;
100100
friend class crimson::osd::UrgentRecovery;
101101

102+
interruptible_future<> recover_object_with_throttle(
103+
const hobject_t &soid,
104+
eversion_t need);
105+
106+
interruptible_future<> recover_object(
107+
const hobject_t &soid,
108+
eversion_t need) {
109+
auto backend = pg->get_recovery_backend();
110+
assert(backend);
111+
return backend->recover_object(soid, need);
112+
}
113+
102114
// backfill begin
103115
std::unique_ptr<crimson::osd::BackfillState> backfill_state;
104116
std::map<pg_shard_t,

src/crimson/osd/shard_services.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,8 +593,7 @@ class ShardServices : public OSDMapService {
593593
}
594594

595595
FORWARD_TO_OSD_SINGLETON(get_pool_info)
596-
FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
597-
FORWARD(try_acquire_throttle_now, try_acquire_throttle_now, local_state.throttler)
596+
FORWARD(with_throttle, with_throttle, local_state.throttler)
598597

599598
FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg)
600599
FORWARD_TO_OSD_SINGLETON(send_incremental_map)
@@ -618,6 +617,9 @@ class ShardServices : public OSDMapService {
618617
snap_dump_reservations,
619618
snap_reserver.dump)
620619

620+
bool throttle_available() const {
621+
return local_state.throttler.available();
622+
}
621623

622624
auto local_update_priority(
623625
singleton_orderer_t &orderer,

0 commit comments

Comments
 (0)