44#include " client.h"
55
66#include < seastar/core/sleep.hh>
7+ #include < seastar/util/defer.hh>
78
89#include " crimson/common/log.h"
910#include " crimson/net/Connection.h"
1011#include " crimson/net/Messenger.h"
12+ #include " crimson/common/coroutine.h"
1113#include " messages/MMgrConfigure.h"
1214#include " messages/MMgrMap.h"
1315#include " messages/MMgrOpen.h"
1416#include " messages/MMgrReport.h"
1517
16- namespace {
17- seastar::logger& logger ()
18- {
19- return crimson::get_logger (ceph_subsys_mgrc);
20- }
21- }
18+ SET_SUBSYS (mgrc);
2219
2320using crimson::common::local_conf;
2421
@@ -38,25 +35,51 @@ Client::Client(crimson::net::Messenger& msgr,
3835
3936seastar::future<> Client::start ()
4037{
41- return seastar::now ();
38+ LOG_PREFIX (Client::start);
39+ DEBUGDPP (" " , *this );
40+ co_return ;
4241}
4342
4443seastar::future<> Client::stop ()
4544{
46- logger ().info (" {}" , __func__);
45+ LOG_PREFIX (Client::stop);
46+ DEBUGDPP (" " , *this );
4747 report_timer.cancel ();
48- auto fut = gates.close_all ();
4948 if (conn) {
49+ DEBUGDPP (" marking down" , *this );
5050 conn->mark_down ();
5151 }
52- return fut;
52+ co_await gates.close_all ();
53+ }
54+
55+ seastar::future<> Client::send (MessageURef msg)
56+ {
57+ LOG_PREFIX (Client::send);
58+ DEBUGDPP (" {}" , *this , *msg);
59+ if (!conn_lock.try_lock_shared ()) {
60+ WARNDPP (" ongoing reconnect, report skipped" , *this , *msg);
61+ co_return ;
62+ }
63+ auto unlocker = seastar::defer ([this ] {
64+ conn_lock.unlock_shared ();
65+ });
66+ if (!conn) {
67+ WARNDPP (" no conn available, report skipped" , *this , *msg);
68+ co_return ;
69+ }
70+ DEBUGDPP (" sending {}" , *this , *msg);
71+ co_await conn->send (std::move (msg));
5372}
5473
5574std::optional<seastar::future<>>
5675Client::ms_dispatch (crimson::net::ConnectionRef conn, MessageRef m)
5776{
77+ LOG_PREFIX (Client::ms_dispatch);
78+ DEBUGDPP (" {}" , *this , *m);
5879 bool dispatched = true ;
59- gates.dispatch_in_background (__func__, *this , [this , conn, &m, &dispatched] {
80+ gates.dispatch_in_background (__func__, *this ,
81+ [this , conn, &m, &dispatched, FNAME] {
82+ DEBUGDPP (" dispatching in background {}" , *this , *m);
6083 switch (m->get_type ()) {
6184 case MSG_MGR_MAP:
6285 return handle_mgr_map (conn, boost::static_pointer_cast<MMgrMap>(m));
@@ -74,24 +97,33 @@ void Client::ms_handle_connect(
7497 crimson::net::ConnectionRef c,
7598 seastar::shard_id prv_shard)
7699{
100+ LOG_PREFIX (Client::ms_handle_connect);
101+ DEBUGDPP (" prev_shard: {}" , *this , prv_shard);
77102 ceph_assert_always (prv_shard == seastar::this_shard_id ());
78- gates.dispatch_in_background (__func__, *this , [this , c] {
103+ gates.dispatch_in_background (__func__, *this ,
104+ [this , c, FNAME] {
79105 if (conn == c) {
106+ DEBUGDPP (" dispatching in background" , *this );
80107 // ask for the mgrconfigure message
81108 auto m = crimson::make_message<MMgrOpen>();
82109 m->daemon_name = local_conf ()->name .get_id ();
83110 local_conf ().get_config_bl (0 , &m->config_bl , &last_config_bl_version);
84111 local_conf ().get_defaults_bl (&m->config_defaults_bl );
85- return conn-> send (std::move (m));
112+ return send (std::move (m));
86113 } else {
114+ DEBUGDPP (" connection changed" , *this );
87115 return seastar::now ();
88116 }
89117 });
90118}
91119
92120void Client::ms_handle_reset (crimson::net::ConnectionRef c, bool /* is_replace */ )
93121{
94- gates.dispatch_in_background (__func__, *this , [this , c] {
122+ LOG_PREFIX (Client::ms_handle_reset);
123+ DEBUGDPP (" " , *this );
124+ gates.dispatch_in_background (__func__, *this ,
125+ [this , c, FNAME] {
126+ DEBUGDPP (" dispatching in background" , *this );
95127 if (conn == c) {
96128 report_timer.cancel ();
97129 return reconnect ();
@@ -101,50 +133,64 @@ void Client::ms_handle_reset(crimson::net::ConnectionRef c, bool /* is_replace *
101133 });
102134}
103135
136+ seastar::future<> Client::retry_interval ()
137+ {
138+ LOG_PREFIX (Client::retry_interval);
139+ auto retry_interval = std::chrono::duration<double >(
140+ local_conf ().get_val <double >(" mgr_connect_retry_interval" ));
141+ auto a_while = std::chrono::duration_cast<seastar::steady_clock_type::duration>(
142+ retry_interval);
143+ DEBUGDPP (" reconnecting in {} seconds" , *this , retry_interval);
144+ co_await seastar::sleep (a_while);
145+ }
146+
104147seastar::future<> Client::reconnect ()
105148{
149+ LOG_PREFIX (Client::reconnect);
150+ DEBUGDPP (" " , *this );
151+ co_await conn_lock.lock ();
152+ auto unlocker = seastar::defer ([this ] {
153+ conn_lock.unlock ();
154+ });
106155 if (conn) {
156+ DEBUGDPP (" marking down" , *this );
107157 conn->mark_down ();
108158 conn = {};
109159 }
110160 if (!mgrmap.get_available ()) {
111- logger (). warn ( " No active mgr available yet" );
112- return seastar::now () ;
161+ WARNDPP ( " No active mgr available yet" , * this );
162+ co_return ;
113163 }
114- auto retry_interval = std::chrono::duration<double >(
115- local_conf ().get_val <double >(" mgr_connect_retry_interval" ));
116- auto a_while = std::chrono::duration_cast<seastar::steady_clock_type::duration>(
117- retry_interval);
118- return seastar::sleep (a_while).then ([this ] {
119- auto peer = mgrmap.get_active_addrs ().pick_addr (msgr.get_myaddr ().get_type ());
120- if (peer == entity_addr_t {}) {
121- // crimson msgr only uses the first bound addr
122- logger ().error (" mgr.{} does not have an addr compatible with me" ,
123- mgrmap.get_active_name ());
124- return ;
125- }
126- conn = msgr.connect (peer, CEPH_ENTITY_TYPE_MGR);
127- });
164+ co_await retry_interval ();
165+
166+ auto peer = mgrmap.get_active_addrs ().pick_addr (msgr.get_myaddr ().get_type ());
167+ if (peer == entity_addr_t {}) {
168+ // crimson msgr only uses the first bound addr
169+ ERRORDPP (" mgr.{} does not have an addr compatible with me" ,
170+ *this , mgrmap.get_active_name ());
171+ co_return ;
172+ }
173+ conn = msgr.connect (peer, CEPH_ENTITY_TYPE_MGR);
174+ DEBUGDPP (" reconnected successfully" , *this );
128175}
129176
130177seastar::future<> Client::handle_mgr_map (crimson::net::ConnectionRef,
131178 Ref<MMgrMap> m)
132179{
180+ LOG_PREFIX (Client::handle_mgr_map);
181+ DEBUGDPP (" " , *this );
133182 mgrmap = m->get_map ();
134- if (!conn) {
135- return reconnect ();
136- } else if (conn->get_peer_addr () !=
137- mgrmap.get_active_addrs ().legacy_addr ()) {
138- return reconnect ();
139- } else {
140- return seastar::now ();
183+ if (!conn || conn->get_peer_addr () !=
184+ mgrmap.get_active_addrs ().legacy_addr ()) {
185+ co_await reconnect ();
141186 }
142187}
143188
144189seastar::future<> Client::handle_mgr_conf (crimson::net::ConnectionRef,
145190 Ref<MMgrConfigure> m)
146191{
147- logger ().info (" {} {}" , __func__, *m);
192+ LOG_PREFIX (Client::handle_mgr_conf);
193+ DEBUGDPP (" {}" , *this , *m);
148194
149195 auto report_period = std::chrono::seconds{m->stats_period };
150196 if (report_period.count ()) {
@@ -158,26 +204,20 @@ seastar::future<> Client::handle_mgr_conf(crimson::net::ConnectionRef,
158204 }
159205 if (!m->osd_perf_metric_queries .empty ()) {
160206 ceph_assert (set_perf_queries_cb);
161- return set_perf_queries_cb (m->osd_perf_metric_queries );
207+ co_await set_perf_queries_cb (m->osd_perf_metric_queries );
162208 }
163- return seastar::now ();
164209}
165210
166211void Client::report ()
167212{
213+ LOG_PREFIX (Client::report);
214+ DEBUGDPP (" " , *this );
168215 _send_report ();
169- gates.dispatch_in_background (__func__, *this , [this ] {
170- if (!conn) {
171- logger ().warn (" report: no conn available; report skipped" );
172- return seastar::now ();
173- }
216+ gates.dispatch_in_background (__func__, *this , [this , FNAME] {
217+ DEBUGDPP (" dispatching in background" , *this );
174218 return with_stats.get_stats (
175219 ).then ([this ](auto &&pg_stats) {
176- if (!conn) {
177- logger ().warn (" report: no conn available; before sending stats, report skipped" );
178- return seastar::now ();
179- }
180- return conn->send (std::move (pg_stats));
220+ return send (std::move (pg_stats));
181221 });
182222 });
183223}
@@ -189,11 +229,10 @@ void Client::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
189229
190230void Client::_send_report ()
191231{
192- gates.dispatch_in_background (__func__, *this , [this ] {
193- if (!conn) {
194- logger ().warn (" cannot send report; no conn available" );
195- return seastar::now ();
196- }
232+ LOG_PREFIX (Client::_send_report);
233+ DEBUGDPP (" " , *this );
234+ gates.dispatch_in_background (__func__, *this , [this , FNAME] {
235+ DEBUGDPP (" dispatching in background" , *this );
197236 auto report = make_message<MMgrReport>();
198237 // Adding empty information since we don't support perfcounters yet
199238 report->undeclare_types .emplace_back ();
@@ -214,10 +253,10 @@ void Client::_send_report()
214253 return get_perf_report_cb (
215254 ).then ([report=std::move (report), this ](auto payload) mutable {
216255 report->metric_report_message = MetricReportMessage (std::move (payload));
217- return conn-> send (std::move (report));
256+ return send (std::move (report));
218257 });
219258 }
220- return conn-> send (std::move (report));
259+ return send (std::move (report));
221260 });
222261}
223262
0 commit comments