Skip to content

Commit f26071f

Browse files
authored
Merge pull request ceph#54304 from cyx1231st/wip-crimson-make-crosscore-send-ordered
crimson/osd: support to send messages concurrently with exclusive phases Reviewed-by: Samuel Just <[email protected]> Reviewed-by: chunmei-liu <[email protected]>
2 parents ae9fe18 + 93ef23d commit f26071f

File tree

13 files changed

+366
-250
lines changed

13 files changed

+366
-250
lines changed

src/crimson/common/smp_helpers.h

Lines changed: 145 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,16 @@
33

44
#pragma once
55

6+
#include <concepts>
67
#include <limits>
8+
#include <optional>
9+
#include <type_traits>
10+
#include <vector>
711

12+
#include <seastar/core/shared_future.hh>
813
#include <seastar/core/smp.hh>
914

15+
#include "common/likely.h"
1016
#include "crimson/common/errorator.h"
1117
#include "crimson/common/utility.h"
1218

@@ -89,4 +95,142 @@ auto sharded_map_seq(T &t, F &&f) {
8995
});
9096
}
9197

92-
}
98+
enum class crosscore_type_t {
99+
ONE, // from 1 to 1 core
100+
ONE_N, // from 1 to n cores
101+
N_ONE, // from n to 1 core
102+
};
103+
104+
/**
105+
* smp_crosscore_ordering_t
106+
*
107+
* To preserve the event order from source to target core(s).
108+
*/
109+
template <crosscore_type_t CTypeValue>
110+
class smp_crosscore_ordering_t {
111+
static constexpr bool IS_ONE = (CTypeValue == crosscore_type_t::ONE);
112+
static constexpr bool IS_ONE_N = (CTypeValue == crosscore_type_t::ONE_N);
113+
static constexpr bool IS_N_ONE = (CTypeValue == crosscore_type_t::N_ONE);
114+
static_assert(IS_ONE || IS_ONE_N || IS_N_ONE);
115+
116+
public:
117+
using seq_t = uint64_t;
118+
119+
smp_crosscore_ordering_t() requires IS_ONE
120+
: out_seqs(0) { }
121+
122+
smp_crosscore_ordering_t() requires (!IS_ONE)
123+
: out_seqs(seastar::smp::count, 0),
124+
in_controls(seastar::smp::count) {}
125+
126+
~smp_crosscore_ordering_t() = default;
127+
128+
/*
129+
* Called by the original core to get the ordering sequence
130+
*/
131+
132+
seq_t prepare_submit() requires IS_ONE {
133+
return do_prepare_submit(out_seqs);
134+
}
135+
136+
seq_t prepare_submit(core_id_t target_core) requires IS_ONE_N {
137+
return do_prepare_submit(out_seqs[target_core]);
138+
}
139+
140+
seq_t prepare_submit() requires IS_N_ONE {
141+
return do_prepare_submit(out_seqs[seastar::this_shard_id()]);
142+
}
143+
144+
/*
145+
* Called by the target core to preserve the ordering
146+
*/
147+
148+
seq_t get_in_seq() const requires IS_ONE {
149+
return in_controls.seq;
150+
}
151+
152+
seq_t get_in_seq() const requires IS_ONE_N {
153+
return in_controls[seastar::this_shard_id()].seq;
154+
}
155+
156+
seq_t get_in_seq(core_id_t source_core) const requires IS_N_ONE {
157+
return in_controls[source_core].seq;
158+
}
159+
160+
bool proceed_or_wait(seq_t seq) requires IS_ONE {
161+
return in_controls.proceed_or_wait(seq);
162+
}
163+
164+
bool proceed_or_wait(seq_t seq) requires IS_ONE_N {
165+
return in_controls[seastar::this_shard_id()].proceed_or_wait(seq);
166+
}
167+
168+
bool proceed_or_wait(seq_t seq, core_id_t source_core) requires IS_N_ONE {
169+
return in_controls[source_core].proceed_or_wait(seq);
170+
}
171+
172+
seastar::future<> wait(seq_t seq) requires IS_ONE {
173+
return in_controls.wait(seq);
174+
}
175+
176+
seastar::future<> wait(seq_t seq) requires IS_ONE_N {
177+
return in_controls[seastar::this_shard_id()].wait(seq);
178+
}
179+
180+
seastar::future<> wait(seq_t seq, core_id_t source_core) requires IS_N_ONE {
181+
return in_controls[source_core].wait(seq);
182+
}
183+
184+
void reset_wait() requires IS_N_ONE {
185+
for (auto &in_control : in_controls) {
186+
in_control.reset_wait();
187+
}
188+
}
189+
190+
private:
191+
struct in_control_t {
192+
seq_t seq = 0;
193+
std::optional<seastar::shared_promise<>> pr_wait;
194+
195+
bool proceed_or_wait(seq_t in_seq) {
196+
if (in_seq == seq + 1) {
197+
++seq;
198+
reset_wait();
199+
return true;
200+
} else {
201+
return false;
202+
}
203+
}
204+
205+
seastar::future<> wait(seq_t in_seq) {
206+
assert(in_seq != seq + 1);
207+
if (!pr_wait.has_value()) {
208+
pr_wait = seastar::shared_promise<>();
209+
}
210+
return pr_wait->get_shared_future();
211+
}
212+
213+
void reset_wait() {
214+
if (unlikely(pr_wait.has_value())) {
215+
pr_wait->set_value();
216+
pr_wait = std::nullopt;
217+
}
218+
}
219+
};
220+
221+
seq_t do_prepare_submit(seq_t &out_seq) {
222+
return ++out_seq;
223+
}
224+
225+
std::conditional_t<
226+
IS_ONE,
227+
seq_t, std::vector<seq_t>
228+
> out_seqs;
229+
230+
std::conditional_t<
231+
IS_ONE,
232+
in_control_t, std::vector<in_control_t>
233+
> in_controls;
234+
};
235+
236+
} // namespace crimson

src/crimson/net/Connection.h

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,44 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
8181
*
8282
* Send a message over a connection that has completed its handshake.
8383
*
84-
* May be invoked from any core, but that requires to chain the returned
85-
* future to preserve ordering.
84+
* May be invoked from any core, and the send order will be preserved upon
85+
* the call.
86+
*
87+
* The returned future will be resolved only after the message is enqueued
88+
* remotely.
8689
*/
87-
virtual seastar::future<> send(MessageURef msg) = 0;
90+
virtual seastar::future<> send(
91+
MessageURef msg) = 0;
92+
93+
/**
94+
* send_with_throttling
95+
*
96+
* Send a message over a connection that has completed its handshake.
97+
*
98+
* May be invoked from any core, and the send order will be preserved upon
99+
* the call.
100+
*
101+
* TODO:
102+
*
103+
* The returned future is reserved for throttling.
104+
*
105+
* Gating is needed for graceful shutdown, to wait until the message is
106+
* enqueued remotely.
107+
*/
108+
seastar::future<> send_with_throttling(
109+
MessageURef msg /* , seastar::gate & */) {
110+
std::ignore = send(std::move(msg));
111+
return seastar::now();
112+
}
88113

89114
/**
90115
* send_keepalive
91116
*
92117
* Send a keepalive message over a connection that has completed its
93118
* handshake.
94119
*
95-
* May be invoked from any core, but that requires to chain the returned
96-
* future to preserve ordering.
120+
* May be invoked from any core, and the send order will be preserved upon
121+
* the call.
97122
*/
98123
virtual seastar::future<> send_keepalive() = 0;
99124

src/crimson/net/Fwd.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include <seastar/core/sharded.hh>
2222

2323
#include "msg/Connection.h"
24-
#include "msg/MessageRef.h"
24+
#include "msg/Message.h"
2525
#include "msg/msg_types.h"
2626

2727
#include "crimson/common/errorator.h"

src/crimson/net/ProtocolV2.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2073,7 +2073,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
20732073
// READY state
20742074

20752075
seastar::future<> ProtocolV2::notify_out_fault(
2076-
crosscore_t::seq_t cc_seq,
2076+
cc_seq_t cc_seq,
20772077
const char *where,
20782078
std::exception_ptr eptr,
20792079
io_handler_state _io_states)
@@ -2121,7 +2121,7 @@ void ProtocolV2::execute_standby()
21212121
}
21222122

21232123
seastar::future<> ProtocolV2::notify_out(
2124-
crosscore_t::seq_t cc_seq)
2124+
cc_seq_t cc_seq)
21252125
{
21262126
assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
21272127
if (!crosscore.proceed_or_wait(cc_seq)) {
@@ -2210,7 +2210,7 @@ void ProtocolV2::execute_server_wait()
22102210
// CLOSING state
22112211

22122212
seastar::future<> ProtocolV2::notify_mark_down(
2213-
crosscore_t::seq_t cc_seq)
2213+
cc_seq_t cc_seq)
22142214
{
22152215
assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
22162216
if (!crosscore.proceed_or_wait(cc_seq)) {

src/crimson/net/ProtocolV2.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,16 @@ class ProtocolV2 final : public HandshakeListener {
2929
*/
3030
private:
3131
seastar::future<> notify_out(
32-
crosscore_t::seq_t cc_seq) final;
32+
cc_seq_t cc_seq) final;
3333

3434
seastar::future<> notify_out_fault(
35-
crosscore_t::seq_t cc_seq,
35+
cc_seq_t cc_seq,
3636
const char *where,
3737
std::exception_ptr,
3838
io_handler_state) final;
3939

4040
seastar::future<> notify_mark_down(
41-
crosscore_t::seq_t cc_seq) final;
41+
cc_seq_t cc_seq) final;
4242

4343
/*
4444
* as ProtocolV2 to be called by SocketConnection
@@ -251,7 +251,7 @@ class ProtocolV2 final : public HandshakeListener {
251251
// asynchronously populated from io_handler
252252
io_handler_state io_states;
253253

254-
crosscore_t crosscore;
254+
proto_crosscore_ordering_t crosscore;
255255

256256
bool has_socket = false;
257257

src/crimson/net/SocketConnection.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,13 @@ bool SocketConnection::peer_wins() const
7979
return (messenger.get_myaddr() > peer_addr || policy.server);
8080
}
8181

82-
seastar::future<> SocketConnection::send(MessageURef _msg)
82+
seastar::future<> SocketConnection::send(MessageURef msg)
8383
{
84-
// may be invoked from any core
85-
MessageFRef msg = seastar::make_foreign(std::move(_msg));
8684
return io_handler->send(std::move(msg));
8785
}
8886

8987
seastar::future<> SocketConnection::send_keepalive()
9088
{
91-
// may be invoked from any core
9289
return io_handler->send_keepalive();
9390
}
9491

src/crimson/net/SocketConnection.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class ConnectionHandler {
5454

5555
virtual bool is_connected() const = 0;
5656

57-
virtual seastar::future<> send(MessageFRef) = 0;
57+
virtual seastar::future<> send(MessageURef) = 0;
5858

5959
virtual seastar::future<> send_keepalive() = 0;
6060

0 commit comments

Comments
 (0)