Skip to content

Commit b463d93

Browse files
committed
msg: add priority to dispatcher invocation order
So we can ensure that e.g. MDSRank::ms_dispatch is lowest priority so that we do not acquire the mds_lock when looking at beacons. This change maintains the current behavior when the priority is unset: the use of std::stable_sort will ensure that the add_dispatcher_head and add_dispatcher_tail calls will preserve order when dispatcher priorities are equal. Fixes: 7fc04be Signed-off-by: Patrick Donnelly <[email protected]>
1 parent 1b93fd5 commit b463d93

File tree

2 files changed

+57
-31
lines changed

2 files changed

+57
-31
lines changed

src/msg/Dispatcher.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ class KeyStore;
2929

3030
class Dispatcher {
3131
public:
32+
/* Ordering of dispatch for a list of Dispatchers. */
33+
using priority_t = uint32_t;
34+
static constexpr priority_t PRIORITY_HIGH = std::numeric_limits<priority_t>::max() / 4;
35+
static constexpr priority_t PRIORITY_DEFAULT = std::numeric_limits<priority_t>::max() / 2;
36+
static constexpr priority_t PRIORITY_LOW = (std::numeric_limits<priority_t>::max() / 4) * 3;
37+
3238
explicit Dispatcher(CephContext *cct_)
3339
: cct(cct_)
3440
{

src/msg/Messenger.h

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
#ifndef CEPH_MESSENGER_H
1818
#define CEPH_MESSENGER_H
1919

20-
#include <deque>
2120
#include <map>
2221
#include <optional>
22+
#include <vector>
2323

2424
#include <errno.h>
2525
#include <sstream>
@@ -92,8 +92,18 @@ struct Interceptor {
9292

9393
class Messenger {
9494
private:
95-
std::deque<Dispatcher*> dispatchers;
96-
std::deque<Dispatcher*> fast_dispatchers;
95+
struct PriorityDispatcher {
96+
using priority_t = Dispatcher::priority_t;
97+
priority_t priority;
98+
Dispatcher* dispatcher;
99+
100+
bool operator<(const PriorityDispatcher& other) const {
101+
return priority < other.priority;
102+
}
103+
};
104+
std::vector<PriorityDispatcher> dispatchers;
105+
std::vector<PriorityDispatcher> fast_dispatchers;
106+
97107
ZTracer::Endpoint trace_endpoint;
98108

99109
protected:
@@ -389,11 +399,14 @@ class Messenger {
389399
*
390400
* @param d The Dispatcher to insert into the list.
391401
*/
392-
void add_dispatcher_head(Dispatcher *d) {
402+
void add_dispatcher_head(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) {
393403
bool first = dispatchers.empty();
394-
dispatchers.push_front(d);
395-
if (d->ms_can_fast_dispatch_any())
396-
fast_dispatchers.push_front(d);
404+
dispatchers.insert(dispatchers.begin(), PriorityDispatcher{priority, d});
405+
std::stable_sort(dispatchers.begin(), dispatchers.end());
406+
if (d->ms_can_fast_dispatch_any()) {
407+
fast_dispatchers.insert(fast_dispatchers.begin(), PriorityDispatcher{priority, d});
408+
std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end());
409+
}
397410
if (first)
398411
ready();
399412
}
@@ -404,11 +417,14 @@ class Messenger {
404417
*
405418
* @param d The Dispatcher to insert into the list.
406419
*/
407-
void add_dispatcher_tail(Dispatcher *d) {
420+
void add_dispatcher_tail(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) {
408421
bool first = dispatchers.empty();
409-
dispatchers.push_back(d);
410-
if (d->ms_can_fast_dispatch_any())
411-
fast_dispatchers.push_back(d);
422+
dispatchers.push_back(PriorityDispatcher{priority, d});
423+
std::stable_sort(dispatchers.begin(), dispatchers.end());
424+
if (d->ms_can_fast_dispatch_any()) {
425+
fast_dispatchers.push_back(PriorityDispatcher{priority, d});
426+
std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end());
427+
}
412428
if (first)
413429
ready();
414430
}
@@ -667,9 +683,10 @@ class Messenger {
667683
* @param m The Message we are testing.
668684
*/
669685
bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) {
670-
for (const auto &dispatcher : fast_dispatchers) {
671-
if (dispatcher->ms_can_fast_dispatch2(m))
672-
return true;
686+
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
687+
if (dispatcher->ms_can_fast_dispatch2(m)) {
688+
return true;
689+
}
673690
}
674691
return false;
675692
}
@@ -682,10 +699,10 @@ class Messenger {
682699
*/
683700
void ms_fast_dispatch(const ceph::ref_t<Message> &m) {
684701
m->set_dispatch_stamp(ceph_clock_now());
685-
for (const auto &dispatcher : fast_dispatchers) {
702+
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
686703
if (dispatcher->ms_can_fast_dispatch2(m)) {
687-
dispatcher->ms_fast_dispatch2(m);
688-
return;
704+
dispatcher->ms_fast_dispatch2(m);
705+
return;
689706
}
690707
}
691708
ceph_abort();
@@ -697,7 +714,7 @@ class Messenger {
697714
*
698715
*/
699716
void ms_fast_preprocess(const ceph::ref_t<Message> &m) {
700-
for (const auto &dispatcher : fast_dispatchers) {
717+
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
701718
dispatcher->ms_fast_preprocess2(m);
702719
}
703720
}
@@ -710,9 +727,10 @@ class Messenger {
710727
*/
711728
void ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
712729
m->set_dispatch_stamp(ceph_clock_now());
713-
for (const auto &dispatcher : dispatchers) {
714-
if (dispatcher->ms_dispatch2(m))
715-
return;
730+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
731+
if (dispatcher->ms_dispatch2(m)) {
732+
return;
733+
}
716734
}
717735
lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
718736
<< m->get_source_inst() << dendl;
@@ -729,7 +747,7 @@ class Messenger {
729747
* @param con Pointer to the new Connection.
730748
*/
731749
void ms_deliver_handle_connect(Connection *con) {
732-
for (const auto& dispatcher : dispatchers) {
750+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
733751
dispatcher->ms_handle_connect(con);
734752
}
735753
}
@@ -742,7 +760,7 @@ class Messenger {
742760
* @param con Pointer to the new Connection.
743761
*/
744762
void ms_deliver_handle_fast_connect(Connection *con) {
745-
for (const auto& dispatcher : fast_dispatchers) {
763+
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
746764
dispatcher->ms_handle_fast_connect(con);
747765
}
748766
}
@@ -754,7 +772,7 @@ class Messenger {
754772
* @param con Pointer to the new Connection.
755773
*/
756774
void ms_deliver_handle_accept(Connection *con) {
757-
for (const auto& dispatcher : dispatchers) {
775+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
758776
dispatcher->ms_handle_accept(con);
759777
}
760778
}
@@ -766,7 +784,7 @@ class Messenger {
766784
* @param con Pointer to the new Connection.
767785
*/
768786
void ms_deliver_handle_fast_accept(Connection *con) {
769-
for (const auto& dispatcher : fast_dispatchers) {
787+
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
770788
dispatcher->ms_handle_fast_accept(con);
771789
}
772790
}
@@ -779,9 +797,10 @@ class Messenger {
779797
* @param con Pointer to the broken Connection.
780798
*/
781799
void ms_deliver_handle_reset(Connection *con) {
782-
for (const auto& dispatcher : dispatchers) {
783-
if (dispatcher->ms_handle_reset(con))
784-
return;
800+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
801+
if (dispatcher->ms_handle_reset(con)) {
802+
return;
803+
}
785804
}
786805
}
787806
/**
@@ -792,7 +811,7 @@ class Messenger {
792811
* @param con Pointer to the broken Connection.
793812
*/
794813
void ms_deliver_handle_remote_reset(Connection *con) {
795-
for (const auto& dispatcher : dispatchers) {
814+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
796815
dispatcher->ms_handle_remote_reset(con);
797816
}
798817
}
@@ -806,9 +825,10 @@ class Messenger {
806825
* @param con Pointer to the broken Connection.
807826
*/
808827
void ms_deliver_handle_refused(Connection *con) {
809-
for (const auto& dispatcher : dispatchers) {
810-
if (dispatcher->ms_handle_refused(con))
828+
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
829+
if (dispatcher->ms_handle_refused(con)) {
811830
return;
831+
}
812832
}
813833
}
814834

0 commit comments

Comments
 (0)