Skip to content

Commit 93ef23d

Browse files
committed
crimson/osd: allow to send messages concurrently
The ordering is now guaranteed upon calling send(), so there is no reason to couple the crosscore send future with the operation phases -- exclusive phases will limit the send concurrency, potentially causing OSD starvation. Decouple the crosscore send futures in the IO pathes, mostly in ClientRequest and OSDSingletonState::send_to_osd(). Issue-identified-by: Chunmei Liu <[email protected]> see PR53934. Signed-off-by: Yingxin Cheng <[email protected]>
1 parent 6ebf9cd commit 93ef23d

File tree

4 files changed

+46
-15
lines changed

4 files changed

+46
-15
lines changed

src/crimson/net/Connection.h

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,33 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
8383
*
8484
* May be invoked from any core, and the send order will be preserved upon
8585
* the call.
86+
*
87+
* The returned future will be resolved only after the message is enqueued
88+
* remotely.
8689
*/
87-
virtual seastar::future<> send(MessageURef msg) = 0;
90+
virtual seastar::future<> send(
91+
MessageURef msg) = 0;
92+
93+
/**
94+
* send_with_throttling
95+
*
96+
* Send a message over a connection that has completed its handshake.
97+
*
98+
* May be invoked from any core, and the send order will be preserved upon
99+
* the call.
100+
*
101+
* TODO:
102+
*
103+
* The returned future is reserved for throttling.
104+
*
105+
* Gating is needed for graceful shutdown, to wait until the message is
106+
* enqueued remotely.
107+
*/
108+
seastar::future<> send_with_throttling(
109+
MessageURef msg /* , seastar::gate & */) {
110+
std::ignore = send(std::move(msg));
111+
return seastar::now();
112+
}
88113

89114
/**
90115
* send_keepalive

src/crimson/net/Fwd.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include <seastar/core/sharded.hh>
2222

2323
#include "msg/Connection.h"
24-
#include "msg/MessageRef.h"
24+
#include "msg/Message.h"
2525
#include "msg/msg_types.h"
2626

2727
#include "crimson/common/errorator.h"

src/crimson/osd/osd_operations/client_request.cc

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ ClientRequest::process_pg_op(
204204
return pg->do_pg_ops(
205205
m
206206
).then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
207-
return conn->send(std::move(reply));
207+
// TODO: gate the crosscore sending
208+
return conn->send_with_throttling(std::move(reply));
208209
});
209210
}
210211

@@ -218,7 +219,8 @@ auto ClientRequest::reply_op_error(const Ref<PG>& pg, int err)
218219
!m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
219220
reply->set_reply_versions(eversion_t(), 0);
220221
reply->set_op_returns(std::vector<pg_log_op_return_item_t>{});
221-
return conn->send(std::move(reply));
222+
// TODO: gate the crosscore sending
223+
return conn->send_with_throttling(std::move(reply));
222224
}
223225

224226
ClientRequest::interruptible_future<>
@@ -246,7 +248,8 @@ ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
246248
m.get(), completed->err, pg->get_osdmap_epoch(),
247249
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
248250
reply->set_reply_versions(completed->version, completed->user_version);
249-
return conn->send(std::move(reply));
251+
// TODO: gate the crosscore sending
252+
return conn->send_with_throttling(std::move(reply));
250253
} else {
251254
return ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this
252255
).then_interruptible(
@@ -319,13 +322,13 @@ ClientRequest::do_process(
319322

320323
SnapContext snapc = get_snapc(pg,obc);
321324

322-
if ((m->has_flag(CEPH_OSD_FLAG_ORDERSNAP)) &&
323-
snapc.seq < obc->ssc->snapset.seq) {
324-
DEBUGI("{} ORDERSNAP flag set and snapc seq {}",
325-
" < snapset seq {} on {}",
326-
__func__, snapc.seq, obc->ssc->snapset.seq,
327-
obc->obs.oi.soid);
328-
return reply_op_error(pg, -EOLDSNAPC);
325+
if (m->has_flag(CEPH_OSD_FLAG_ORDERSNAP) &&
326+
snapc.seq < obc->ssc->snapset.seq) {
327+
DEBUGI("{} ORDERSNAP flag set and snapc seq {}",
328+
" < snapset seq {} on {}",
329+
__func__, snapc.seq, obc->ssc->snapset.seq,
330+
obc->obs.oi.soid);
331+
return reply_op_error(pg, -EOLDSNAPC);
329332
}
330333

331334
if (!pg->is_primary()) {
@@ -357,8 +360,10 @@ ClientRequest::do_process(
357360
[this, reply=std::move(reply)]() mutable {
358361
LOG_PREFIX(ClientRequest::do_process);
359362
DEBUGI("{}: sending response", *this);
360-
return conn->send(std::move(reply));
361-
});
363+
// TODO: gate the crosscore sending
364+
return conn->send_with_throttling(std::move(reply));
365+
}
366+
);
362367
}, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
363368
return process_op(ihref, pg);
364369
}));

src/crimson/osd/shard_services.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ seastar::future<> OSDSingletonState::send_to_osd(
166166
} else {
167167
auto conn = cluster_msgr.connect(
168168
osdmap->get_cluster_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
169-
return conn->send(std::move(m));
169+
// TODO: gate the crosscore sending
170+
return conn->send_with_throttling(std::move(m));
170171
}
171172
}
172173

0 commit comments

Comments
 (0)