Skip to content

Commit eb10f3d

Browse files
committed
Merge PR ceph#57469 into main
* refs/pull/57469/head: mds: set dispatcher order mds: use regular dispatch for processing beacons msg: add priority to dispatcher invocation order mds: note when dispatcher is called Reviewed-by: Leonid Usov <[email protected]>
2 parents 6b2942f + 3291f39 commit eb10f3d

File tree

8 files changed

+70
-52
lines changed

8 files changed

+70
-52
lines changed

src/mds/Beacon.cc

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,19 +102,9 @@ void Beacon::init(const MDSMap &mdsmap)
102102
});
103103
}
104104

105-
bool Beacon::ms_can_fast_dispatch2(const cref_t<Message>& m) const
106-
{
107-
return m->get_type() == MSG_MDS_BEACON;
108-
}
109-
110-
void Beacon::ms_fast_dispatch2(const ref_t<Message>& m)
111-
{
112-
bool handled = ms_dispatch2(m);
113-
ceph_assert(handled);
114-
}
115-
116105
bool Beacon::ms_dispatch2(const ref_t<Message>& m)
117106
{
107+
dout(25) << __func__ << ": processing " << m << dendl;
118108
if (m->get_type() == MSG_MDS_BEACON) {
119109
if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
120110
handle_mds_beacon(ref_cast<MMDSBeacon>(m));

src/mds/Beacon.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,6 @@ class Beacon : public Dispatcher
5353
void init(const MDSMap &mdsmap);
5454
void shutdown();
5555

56-
bool ms_can_fast_dispatch_any() const override { return true; }
57-
bool ms_can_fast_dispatch2(const cref_t<Message>& m) const override;
58-
void ms_fast_dispatch2(const ref_t<Message>& m) override;
5956
bool ms_dispatch2(const ref_t<Message> &m) override;
6057
void ms_handle_connect(Connection *c) override {}
6158
bool ms_handle_reset(Connection *c) override {return false;}

src/mds/MDSDaemon.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -573,8 +573,10 @@ int MDSDaemon::init()
573573
dout(10) << sizeof(Capability) << "\tCapability" << dendl;
574574
dout(10) << sizeof(xlist<void*>::item) << "\txlist<>::item" << dendl;
575575

576-
messenger->add_dispatcher_tail(&beacon);
577-
messenger->add_dispatcher_tail(this);
576+
// Ensure beacons are processed ahead of most other dispatchers.
577+
messenger->add_dispatcher_head(&beacon, Dispatcher::PRIORITY_HIGH);
578+
// order last as MDSDaemon::ms_dispatch2 first acquires the mds_lock
579+
messenger->add_dispatcher_head(this, Dispatcher::PRIORITY_LOW);
578580

579581
// init monc
580582
monc->set_messenger(messenger);
@@ -979,6 +981,7 @@ void MDSDaemon::respawn()
979981

980982
bool MDSDaemon::ms_dispatch2(const ref_t<Message> &m)
981983
{
984+
dout(25) << __func__ << ": processing " << m << dendl;
982985
std::lock_guard l(mds_lock);
983986
if (stopping) {
984987
return false;

src/mds/MDSRank.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ MDSRank::~MDSRank()
611611
void MDSRankDispatcher::init()
612612
{
613613
objecter->init();
614-
messenger->add_dispatcher_head(objecter);
614+
messenger->add_dispatcher_tail(objecter); // the default priority
615615

616616
objecter->start();
617617

@@ -2137,7 +2137,7 @@ void MDSRank::active_start()
21372137

21382138
dout(10) << __func__ << ": initializing metrics handler" << dendl;
21392139
metrics_handler.init();
2140-
messenger->add_dispatcher_tail(&metrics_handler);
2140+
messenger->add_dispatcher_tail(&metrics_handler, Dispatcher::PRIORITY_HIGH);
21412141

21422142
// metric aggregation is solely done by rank 0
21432143
if (is_rank0()) {

src/mds/MetricAggregator.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ void MetricAggregator::shutdown() {
127127
}
128128

129129
bool MetricAggregator::ms_dispatch2(const ref_t<Message> &m) {
130+
dout(25) << " processing " << m << dendl;
130131
if (m->get_type() == MSG_MDS_METRICS &&
131132
m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS) {
132133
const Message *msg = m.get();

src/mon/MonClient.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ int MonClient::get_monmap_and_config()
131131
messenger = Messenger::create_client_messenger(
132132
cct, "temp_mon_client");
133133
ceph_assert(messenger);
134-
messenger->add_dispatcher_head(this);
134+
messenger->add_dispatcher_head(this, Dispatcher::PRIORITY_HIGH);
135135
messenger->start();
136136
auto shutdown_msgr = make_scope_guard([this] {
137137
messenger->shutdown();
@@ -265,7 +265,7 @@ int MonClient::ping_monitor(const string &mon_id, string *result_reply)
265265
result_reply);
266266

267267
Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
268-
smsgr->add_dispatcher_head(pinger);
268+
smsgr->add_dispatcher_head(pinger, Dispatcher::PRIORITY_HIGH);
269269
smsgr->set_auth_client(pinger);
270270
smsgr->start();
271271

@@ -295,6 +295,7 @@ int MonClient::ping_monitor(const string &mon_id, string *result_reply)
295295

296296
bool MonClient::ms_dispatch(Message *m)
297297
{
298+
ldout(cct, 25) << __func__ << " processing " << m << dendl;
298299
// we only care about these message types
299300
switch (m->get_type()) {
300301
case CEPH_MSG_MON_MAP:
@@ -511,7 +512,7 @@ int MonClient::init()
511512
initialized = true;
512513

513514
messenger->set_auth_client(this);
514-
messenger->add_dispatcher_head(this);
515+
messenger->add_dispatcher_head(this, Dispatcher::PRIORITY_HIGH);
515516

516517
timer.init();
517518
schedule_tick();

src/msg/Dispatcher.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ class KeyStore;
2929

3030
class Dispatcher {
3131
public:
32+
/* Ordering of dispatch for a list of Dispatchers. */
33+
using priority_t = uint32_t;
34+
static constexpr priority_t PRIORITY_HIGH = std::numeric_limits<priority_t>::max() / 4;
35+
static constexpr priority_t PRIORITY_DEFAULT = std::numeric_limits<priority_t>::max() / 2;
36+
static constexpr priority_t PRIORITY_LOW = (std::numeric_limits<priority_t>::max() / 4) * 3;
37+
3238
explicit Dispatcher(CephContext *cct_)
3339
: cct(cct_)
3440
{

src/msg/Messenger.h

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
#ifndef CEPH_MESSENGER_H
1818
#define CEPH_MESSENGER_H
1919

20-
#include <deque>
2120
#include <map>
2221
#include <optional>
22+
#include <vector>
2323

2424
#include <errno.h>
2525
#include <sstream>
@@ -92,8 +92,18 @@ struct Interceptor {
9292

9393
class Messenger {
9494
private:
95-
std::deque<Dispatcher*> dispatchers;
96-
std::deque<Dispatcher*> fast_dispatchers;
95+
struct PriorityDispatcher {
96+
using priority_t = Dispatcher::priority_t;
97+
priority_t priority;
98+
Dispatcher* dispatcher;
99+
100+
bool operator<(const PriorityDispatcher& other) const {
101+
return priority < other.priority;
102+
}
103+
};
104+
std::vector<PriorityDispatcher> dispatchers;
105+
std::vector<PriorityDispatcher> fast_dispatchers;
106+
97107
ZTracer::Endpoint trace_endpoint;
98108

99109
protected:
@@ -389,11 +399,14 @@ class Messenger {
389399
*
390400
* @param d The Dispatcher to insert into the list.
391401
*/
392-
void add_dispatcher_head(Dispatcher *d) {
402+
void add_dispatcher_head(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) {
393403
bool first = dispatchers.empty();
394-
dispatchers.push_front(d);
395-
if (d->ms_can_fast_dispatch_any())
396-
fast_dispatchers.push_front(d);
404+
dispatchers.insert(dispatchers.begin(), PriorityDispatcher{priority, d});
405+
std::stable_sort(dispatchers.begin(), dispatchers.end());
406+
if (d->ms_can_fast_dispatch_any()) {
407+
fast_dispatchers.insert(fast_dispatchers.begin(), PriorityDispatcher{priority, d});
408+
std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end());
409+
}
397410
if (first)
398411
ready();
399412
}
@@ -404,11 +417,14 @@ class Messenger {
404417
*
405418
* @param d The Dispatcher to insert into the list.
406419
*/
407-
void add_dispatcher_tail(Dispatcher *d) {
420+
void add_dispatcher_tail(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) {
408421
bool first = dispatchers.empty();
409-
dispatchers.push_back(d);
410-
if (d->ms_can_fast_dispatch_any())
411-
fast_dispatchers.push_back(d);
422+
dispatchers.push_back(PriorityDispatcher{priority, d});
423+
std::stable_sort(dispatchers.begin(), dispatchers.end());
424+
if (d->ms_can_fast_dispatch_any()) {
425+
fast_dispatchers.push_back(PriorityDispatcher{priority, d});
426+
std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end());
427+
}
412428
if (first)
413429
ready();
414430
}
@@ -667,9 +683,10 @@ class Messenger {
667683
* @param m The Message we are testing.
668684
*/
669685
bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) {
670-
for (const auto &dispatcher : fast_dispatchers) {
671-
if (dispatcher->ms_can_fast_dispatch2(m))
672-
return true;
686+
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
687+
if (dispatcher->ms_can_fast_dispatch2(m)) {
688+
return true;
689+
}
673690
}
674691
return false;
675692
}
@@ -682,10 +699,10 @@ class Messenger {
682699
*/
683700
void ms_fast_dispatch(const ceph::ref_t<Message> &m) {
684701
m->set_dispatch_stamp(ceph_clock_now());
685-
for (const auto &dispatcher : fast_dispatchers) {
702+
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
686703
if (dispatcher->ms_can_fast_dispatch2(m)) {
687-
dispatcher->ms_fast_dispatch2(m);
688-
return;
704+
dispatcher->ms_fast_dispatch2(m);
705+
return;
689706
}
690707
}
691708
ceph_abort();
@@ -697,7 +714,7 @@ class Messenger {
697714
*
698715
*/
699716
void ms_fast_preprocess(const ceph::ref_t<Message> &m) {
700-
for (const auto &dispatcher : fast_dispatchers) {
717+
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
701718
dispatcher->ms_fast_preprocess2(m);
702719
}
703720
}
@@ -710,9 +727,10 @@ class Messenger {
710727
*/
711728
void ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
712729
m->set_dispatch_stamp(ceph_clock_now());
713-
for (const auto &dispatcher : dispatchers) {
714-
if (dispatcher->ms_dispatch2(m))
715-
return;
730+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
731+
if (dispatcher->ms_dispatch2(m)) {
732+
return;
733+
}
716734
}
717735
lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
718736
<< m->get_source_inst() << dendl;
@@ -729,7 +747,7 @@ class Messenger {
729747
* @param con Pointer to the new Connection.
730748
*/
731749
void ms_deliver_handle_connect(Connection *con) {
732-
for (const auto& dispatcher : dispatchers) {
750+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
733751
dispatcher->ms_handle_connect(con);
734752
}
735753
}
@@ -742,7 +760,7 @@ class Messenger {
742760
* @param con Pointer to the new Connection.
743761
*/
744762
void ms_deliver_handle_fast_connect(Connection *con) {
745-
for (const auto& dispatcher : fast_dispatchers) {
763+
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
746764
dispatcher->ms_handle_fast_connect(con);
747765
}
748766
}
@@ -754,7 +772,7 @@ class Messenger {
754772
* @param con Pointer to the new Connection.
755773
*/
756774
void ms_deliver_handle_accept(Connection *con) {
757-
for (const auto& dispatcher : dispatchers) {
775+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
758776
dispatcher->ms_handle_accept(con);
759777
}
760778
}
@@ -766,7 +784,7 @@ class Messenger {
766784
* @param con Pointer to the new Connection.
767785
*/
768786
void ms_deliver_handle_fast_accept(Connection *con) {
769-
for (const auto& dispatcher : fast_dispatchers) {
787+
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
770788
dispatcher->ms_handle_fast_accept(con);
771789
}
772790
}
@@ -779,9 +797,10 @@ class Messenger {
779797
* @param con Pointer to the broken Connection.
780798
*/
781799
void ms_deliver_handle_reset(Connection *con) {
782-
for (const auto& dispatcher : dispatchers) {
783-
if (dispatcher->ms_handle_reset(con))
784-
return;
800+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
801+
if (dispatcher->ms_handle_reset(con)) {
802+
return;
803+
}
785804
}
786805
}
787806
/**
@@ -792,7 +811,7 @@ class Messenger {
792811
* @param con Pointer to the broken Connection.
793812
*/
794813
void ms_deliver_handle_remote_reset(Connection *con) {
795-
for (const auto& dispatcher : dispatchers) {
814+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
796815
dispatcher->ms_handle_remote_reset(con);
797816
}
798817
}
@@ -806,9 +825,10 @@ class Messenger {
806825
* @param con Pointer to the broken Connection.
807826
*/
808827
void ms_deliver_handle_refused(Connection *con) {
809-
for (const auto& dispatcher : dispatchers) {
810-
if (dispatcher->ms_handle_refused(con))
828+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
829+
if (dispatcher->ms_handle_refused(con)) {
811830
return;
831+
}
812832
}
813833
}
814834

0 commit comments

Comments
 (0)