Skip to content

Commit 6ebf9cd

Browse files
committed
crimson/net: preserve the ordering upon the calls to Connection::send()/keepalive()
Signed-off-by: Yingxin Cheng <[email protected]>
1 parent 77e66ad commit 6ebf9cd

File tree

7 files changed

+169
-91
lines changed

7 files changed

+169
-91
lines changed

src/crimson/common/smp_helpers.h

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ auto sharded_map_seq(T &t, F &&f) {
9898
enum class crosscore_type_t {
9999
ONE, // from 1 to 1 core
100100
ONE_N, // from 1 to n cores
101+
N_ONE, // from n to 1 core
101102
};
102103

103104
/**
@@ -109,15 +110,16 @@ template <crosscore_type_t CTypeValue>
109110
class smp_crosscore_ordering_t {
110111
static constexpr bool IS_ONE = (CTypeValue == crosscore_type_t::ONE);
111112
static constexpr bool IS_ONE_N = (CTypeValue == crosscore_type_t::ONE_N);
112-
static_assert(IS_ONE || IS_ONE_N);
113+
static constexpr bool IS_N_ONE = (CTypeValue == crosscore_type_t::N_ONE);
114+
static_assert(IS_ONE || IS_ONE_N || IS_N_ONE);
113115

114116
public:
115117
using seq_t = uint64_t;
116118

117119
smp_crosscore_ordering_t() requires IS_ONE
118120
: out_seqs(0) { }
119121

120-
smp_crosscore_ordering_t() requires IS_ONE_N
122+
smp_crosscore_ordering_t() requires (!IS_ONE)
121123
: out_seqs(seastar::smp::count, 0),
122124
in_controls(seastar::smp::count) {}
123125

@@ -135,6 +137,10 @@ class smp_crosscore_ordering_t {
135137
return do_prepare_submit(out_seqs[target_core]);
136138
}
137139

140+
seq_t prepare_submit() requires IS_N_ONE {
141+
return do_prepare_submit(out_seqs[seastar::this_shard_id()]);
142+
}
143+
138144
/*
139145
* Called by the target core to preserve the ordering
140146
*/
@@ -147,6 +153,10 @@ class smp_crosscore_ordering_t {
147153
return in_controls[seastar::this_shard_id()].seq;
148154
}
149155

156+
seq_t get_in_seq(core_id_t source_core) const requires IS_N_ONE {
157+
return in_controls[source_core].seq;
158+
}
159+
150160
bool proceed_or_wait(seq_t seq) requires IS_ONE {
151161
return in_controls.proceed_or_wait(seq);
152162
}
@@ -155,6 +165,10 @@ class smp_crosscore_ordering_t {
155165
return in_controls[seastar::this_shard_id()].proceed_or_wait(seq);
156166
}
157167

168+
bool proceed_or_wait(seq_t seq, core_id_t source_core) requires IS_N_ONE {
169+
return in_controls[source_core].proceed_or_wait(seq);
170+
}
171+
158172
seastar::future<> wait(seq_t seq) requires IS_ONE {
159173
return in_controls.wait(seq);
160174
}
@@ -163,6 +177,16 @@ class smp_crosscore_ordering_t {
163177
return in_controls[seastar::this_shard_id()].wait(seq);
164178
}
165179

180+
seastar::future<> wait(seq_t seq, core_id_t source_core) requires IS_N_ONE {
181+
return in_controls[source_core].wait(seq);
182+
}
183+
184+
void reset_wait() requires IS_N_ONE {
185+
for (auto &in_control : in_controls) {
186+
in_control.reset_wait();
187+
}
188+
}
189+
166190
private:
167191
struct in_control_t {
168192
seq_t seq = 0;
@@ -171,10 +195,7 @@ class smp_crosscore_ordering_t {
171195
bool proceed_or_wait(seq_t in_seq) {
172196
if (in_seq == seq + 1) {
173197
++seq;
174-
if (unlikely(pr_wait.has_value())) {
175-
pr_wait->set_value();
176-
pr_wait = std::nullopt;
177-
}
198+
reset_wait();
178199
return true;
179200
} else {
180201
return false;
@@ -188,6 +209,13 @@ class smp_crosscore_ordering_t {
188209
}
189210
return pr_wait->get_shared_future();
190211
}
212+
213+
void reset_wait() {
214+
if (unlikely(pr_wait.has_value())) {
215+
pr_wait->set_value();
216+
pr_wait = std::nullopt;
217+
}
218+
}
191219
};
192220

193221
seq_t do_prepare_submit(seq_t &out_seq) {

src/crimson/net/Connection.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
8181
*
8282
* Send a message over a connection that has completed its handshake.
8383
*
84-
* May be invoked from any core, but that requires to chain the returned
85-
* future to preserve ordering.
84+
* May be invoked from any core, and the send order will be preserved upon
85+
* the call.
8686
*/
8787
virtual seastar::future<> send(MessageURef msg) = 0;
8888

@@ -92,8 +92,8 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
9292
* Send a keepalive message over a connection that has completed its
9393
* handshake.
9494
*
95-
* May be invoked from any core, but that requires to chain the returned
96-
* future to preserve ordering.
95+
* May be invoked from any core, and the send order will be preserved upon
96+
* the call.
9797
*/
9898
virtual seastar::future<> send_keepalive() = 0;
9999

src/crimson/net/ProtocolV2.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ class ProtocolV2 final : public HandshakeListener {
251251
// asynchronously populated from io_handler
252252
io_handler_state io_states;
253253

254-
crosscore_ordering_t crosscore;
254+
proto_crosscore_ordering_t crosscore;
255255

256256
bool has_socket = false;
257257

src/crimson/net/SocketConnection.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,13 @@ bool SocketConnection::peer_wins() const
7979
return (messenger.get_myaddr() > peer_addr || policy.server);
8080
}
8181

82-
seastar::future<> SocketConnection::send(MessageURef _msg)
82+
seastar::future<> SocketConnection::send(MessageURef msg)
8383
{
84-
// may be invoked from any core
85-
MessageFRef msg = seastar::make_foreign(std::move(_msg));
8684
return io_handler->send(std::move(msg));
8785
}
8886

8987
seastar::future<> SocketConnection::send_keepalive()
9088
{
91-
// may be invoked from any core
9289
return io_handler->send_keepalive();
9390
}
9491

src/crimson/net/SocketConnection.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class ConnectionHandler {
5454

5555
virtual bool is_connected() const = 0;
5656

57-
virtual seastar::future<> send(MessageFRef) = 0;
57+
virtual seastar::future<> send(MessageURef) = 0;
5858

5959
virtual seastar::future<> send_keepalive() = 0;
6060

0 commit comments

Comments
 (0)