Skip to content

Commit d1df8db

Browse files
authored
Merge pull request ceph#49619 from amathuria/wip-amat-fix-encode-payload
msg/async: Encode message once features are set Reviewed-by: Radoslaw Zarzynski <[email protected]>
2 parents 78c579b + 7268211 commit d1df8db

File tree

3 files changed

+41
-31
lines changed

3 files changed

+41
-31
lines changed

src/msg/async/ProtocolV1.cc

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,10 @@ void ProtocolV1::send_message(Message *m) {
217217
// TODO: Currently not all messages supports reencode like MOSDMap, so here
218218
// only let fast dispatch support messages prepare message
219219
bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
220-
if (can_fast_prepare) {
220+
bool is_prepared = false;
221+
if (can_fast_prepare && f) {
221222
prepare_send_message(f, m, bl);
223+
is_prepared = true;
222224
}
223225

224226
std::lock_guard<std::mutex> l(connection->write_lock);
@@ -238,7 +240,8 @@ void ProtocolV1::send_message(Message *m) {
238240
} else {
239241
m->queue_start = ceph::mono_clock::now();
240242
m->trace.event("async enqueueing message");
241-
out_q[m->get_priority()].emplace_back(std::move(bl), m);
243+
out_q[m->get_priority()].emplace_back(out_q_entry_t{
244+
std::move(bl), m, is_prepared});
242245
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
243246
<< dendl;
244247
if (can_write != WriteStatus::REPLACING && !write_in_progress) {
@@ -322,8 +325,10 @@ void ProtocolV1::write_event() {
322325
}
323326
}
324327

325-
ceph::buffer::list data;
326-
Message *m = _get_next_outgoing(&data);
328+
const out_q_entry_t out_entry = _get_next_outgoing();
329+
Message *m = out_entry.m;
330+
ceph::buffer::list data = out_entry.bl;
331+
327332
if (!m) {
328333
break;
329334
}
@@ -337,7 +342,7 @@ void ProtocolV1::write_event() {
337342
connection->write_lock.unlock();
338343

339344
// send_message or requeue messages may not encode message
340-
if (!data.length()) {
345+
if (!data.length() || !out_entry.is_prepared) {
341346
prepare_send_message(connection->get_features(), m, data);
342347
}
343348

@@ -1199,15 +1204,15 @@ void ProtocolV1::requeue_sent() {
11991204
return;
12001205
}
12011206

1202-
list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1207+
list<out_q_entry_t> &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
12031208
out_seq -= sent.size();
12041209
while (!sent.empty()) {
12051210
Message *m = sent.back();
12061211
sent.pop_back();
12071212
ldout(cct, 10) << __func__ << " " << *m << " for resend "
12081213
<< " (" << m->get_seq() << ")" << dendl;
12091214
m->clear_payload();
1210-
rq.push_front(make_pair(ceph::buffer::list(), m));
1215+
rq.push_front(out_q_entry_t{ceph::buffer::list(), m, false});
12111216
}
12121217
}
12131218

@@ -1217,15 +1222,15 @@ uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
12171222
if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
12181223
return seq;
12191224
}
1220-
list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1225+
list<out_q_entry_t> &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
12211226
uint64_t count = out_seq;
12221227
while (!rq.empty()) {
1223-
pair<ceph::buffer::list, Message *> p = rq.front();
1224-
if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
1225-
ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
1226-
<< p.second->get_seq() << " <= " << seq << ", discarding"
1228+
Message* const m = rq.front().m;
1229+
if (m->get_seq() == 0 || m->get_seq() > seq) break;
1230+
ldout(cct, 10) << __func__ << " " << *(m) << " for resend seq "
1231+
<< m->get_seq() << " <= " << seq << ", discarding"
12271232
<< dendl;
1228-
p.second->put();
1233+
m->put();
12291234
rq.pop_front();
12301235
count++;
12311236
}
@@ -1245,13 +1250,13 @@ void ProtocolV1::discard_out_queue() {
12451250
(*p)->put();
12461251
}
12471252
sent.clear();
1248-
for (map<int, list<pair<ceph::buffer::list, Message *> > >::iterator p =
1253+
for (map<int, list<out_q_entry_t>>::iterator p =
12491254
out_q.begin();
12501255
p != out_q.end(); ++p) {
1251-
for (list<pair<ceph::buffer::list, Message *> >::iterator r = p->second.begin();
1256+
for (list<out_q_entry_t>::iterator r = p->second.begin();
12521257
r != p->second.end(); ++r) {
1253-
ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
1254-
r->second->put();
1258+
ldout(cct, 20) << __func__ << " discard " << r->m << dendl;
1259+
r->m->put();
12551260
}
12561261
}
12571262
out_q.clear();
@@ -1320,22 +1325,18 @@ void ProtocolV1::reset_recv_state()
13201325
}
13211326
}
13221327

1323-
Message *ProtocolV1::_get_next_outgoing(ceph::buffer::list *bl) {
1324-
Message *m = 0;
1328+
ProtocolV1::out_q_entry_t ProtocolV1::_get_next_outgoing() {
1329+
out_q_entry_t out_entry;
13251330
if (!out_q.empty()) {
1326-
map<int, list<pair<ceph::buffer::list, Message *> > >::reverse_iterator it =
1331+
map<int, list<out_q_entry_t>>::reverse_iterator it =
13271332
out_q.rbegin();
13281333
ceph_assert(!it->second.empty());
1329-
list<pair<ceph::buffer::list, Message *> >::iterator p = it->second.begin();
1330-
m = p->second;
1331-
if (p->first.length() && bl) {
1332-
assert(bl->length() == 0);
1333-
bl->swap(p->first);
1334-
}
1334+
list<out_q_entry_t>::iterator p = it->second.begin();
1335+
out_entry = *p;
13351336
it->second.erase(p);
13361337
if (it->second.empty()) out_q.erase(it->first);
13371338
}
1338-
return m;
1339+
return out_entry;
13391340
}
13401341

13411342
/**

src/msg/async/ProtocolV1.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,14 @@ handle_tag_ack | v |
105105
enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED };
106106
std::atomic<WriteStatus> can_write;
107107
std::list<Message *> sent; // the first ceph::buffer::list need to inject seq
108+
//struct for outbound msgs
109+
struct out_q_entry_t {
110+
ceph::buffer::list bl;
111+
Message* m {nullptr};
112+
bool is_prepared {false};
113+
};
108114
// priority queue for outbound msgs
109-
std::map<int, std::list<std::pair<ceph::buffer::list, Message *>>> out_q;
115+
std::map<int, std::list<out_q_entry_t>> out_q;
110116
bool keepalive;
111117
bool write_in_progress = false;
112118

@@ -194,7 +200,7 @@ handle_tag_ack | v |
194200
void session_reset();
195201
void randomize_out_seq();
196202

197-
Message *_get_next_outgoing(ceph::buffer::list *bl);
203+
out_q_entry_t _get_next_outgoing();
198204

199205
void prepare_send_message(uint64_t features, Message *m, ceph::buffer::list &bl);
200206
ssize_t write_message(Message *m, ceph::buffer::list &bl, bool more);

src/msg/async/ProtocolV2.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,12 +431,15 @@ void ProtocolV2::send_message(Message *m) {
431431
// TODO: Currently not all messages supports reencode like MOSDMap, so here
432432
// only let fast dispatch support messages prepare message
433433
const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
434-
if (can_fast_prepare) {
434+
bool is_prepared;
435+
if (can_fast_prepare && f) {
435436
prepare_send_message(f, m);
437+
is_prepared = can_fast_prepare;
438+
} else {
439+
is_prepared = false;
436440
}
437441

438442
std::lock_guard<std::mutex> l(connection->write_lock);
439-
bool is_prepared = can_fast_prepare;
440443
// "features" changes will change the payload encoding
441444
if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
442445
// ensure the correctness of message encoding

0 commit comments

Comments
 (0)