Skip to content

Commit f7fb388

Browse files
Merge pull request ceph#60359 from MaxKellermann/Dispatcher__marrival_simplified
msg/Dispatcher: simplify and optimize the `marrival` tree
2 parents 59b3236 + 9276d24 commit f7fb388

File tree

2 files changed

+22
-21
lines changed

2 files changed

+22
-21
lines changed

src/msg/DispatchQueue.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ double DispatchQueue::get_max_age(utime_t now) const {
3535
if (marrival.empty())
3636
return 0;
3737
else
38-
return (now - marrival.begin()->first);
38+
return (now - *marrival.begin());
3939
}
4040

4141
uint64_t DispatchQueue::pre_dispatch(const ref_t<Message>& m)
@@ -87,11 +87,12 @@ void DispatchQueue::enqueue(const ref_t<Message>& m, int priority, uint64_t id)
8787
return;
8888
}
8989
ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
90-
add_arrival(m);
90+
QueueItem item{m};
91+
add_arrival(item);
9192
if (priority >= CEPH_MSG_PRIO_LOW) {
92-
mqueue.enqueue_strict(id, priority, QueueItem(m));
93+
mqueue.enqueue_strict(id, priority, std::move(item));
9394
} else {
94-
mqueue.enqueue(id, priority, m->get_cost(), QueueItem(m));
95+
mqueue.enqueue(id, priority, m->get_cost(), std::move(item));
9596
}
9697
cond.notify_one();
9798
}
@@ -160,7 +161,7 @@ void DispatchQueue::entry()
160161
while (!mqueue.empty()) {
161162
QueueItem qitem = mqueue.dequeue();
162163
if (!qitem.is_code())
163-
remove_arrival(qitem.get_message());
164+
remove_arrival(qitem);
164165
l.unlock();
165166

166167
if (qitem.is_code()) {
@@ -220,7 +221,7 @@ void DispatchQueue::discard_queue(uint64_t id) {
220221
for (auto i = removed.begin(); i != removed.end(); ++i) {
221222
ceph_assert(!(i->is_code())); // We don't discard id 0, ever!
222223
const ref_t<Message>& m = i->get_message();
223-
remove_arrival(m);
224+
remove_arrival(*i);
224225
dispatch_throttle_release(m->get_dispatch_throttle_size());
225226
}
226227
}

src/msg/DispatchQueue.h

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#define CEPH_DISPATCHQUEUE_H
1717

1818
#include <atomic>
19-
#include <map>
19+
#include <set>
2020
#include <queue>
2121
#include <boost/intrusive_ptr.hpp>
2222
#include "include/ceph_assert.h"
@@ -38,6 +38,9 @@ struct Connection;
3838
* See Messenger::dispatch_entry for details.
3939
*/
4040
class DispatchQueue {
41+
using ArrivalSet = std::multiset<double>;
42+
ArrivalSet marrival;
43+
4144
class QueueItem {
4245
int type;
4346
ConnectionRef con;
@@ -60,6 +63,13 @@ class DispatchQueue {
6063
ceph_assert(is_code());
6164
return con.get();
6265
}
66+
67+
/**
68+
* An iterator into #marrival. This field is only initialized if
69+
* `!is_code()`. It is set by add_arrival() and used by
70+
* remove_arrival().
71+
*/
72+
ArrivalSet::iterator arrival;
6373
};
6474

6575
CephContext *cct;
@@ -69,21 +79,11 @@ class DispatchQueue {
6979

7080
PrioritizedQueue<QueueItem, uint64_t> mqueue;
7181

72-
std::set<std::pair<double, ceph::ref_t<Message>>> marrival;
73-
std::map<ceph::ref_t<Message>, decltype(marrival)::iterator> marrival_map;
74-
void add_arrival(const ceph::ref_t<Message>& m) {
75-
marrival_map.insert(
76-
make_pair(
77-
m,
78-
marrival.insert(std::make_pair(m->get_recv_stamp(), m)).first
79-
)
80-
);
82+
void add_arrival(QueueItem &item) {
83+
item.arrival = marrival.insert(item.get_message()->get_recv_stamp());
8184
}
82-
void remove_arrival(const ceph::ref_t<Message>& m) {
83-
auto it = marrival_map.find(m);
84-
ceph_assert(it != marrival_map.end());
85-
marrival.erase(it->second);
86-
marrival_map.erase(it);
85+
void remove_arrival(QueueItem &item) {
86+
marrival.erase(item.arrival);
8787
}
8888

8989
std::atomic<uint64_t> next_id;

0 commit comments

Comments
 (0)