Skip to content

Commit e81d4ea

Browse files
committed
common/async: Update use_blocked for newer asio
Reimplement with `initiate` rather than the old style. This necessitates getting rid of the old `async::Completion` in anything that was calling it, and other changes. Also, use disposition for error handling. Signed-off-by: Adam C. Emerson <[email protected]>
1 parent 07c77b0 commit e81d4ea

File tree

11 files changed

+635
-495
lines changed

11 files changed

+635
-495
lines changed

src/common/async/blocked_completion.h

Lines changed: 249 additions & 119 deletions
Large diffs are not rendered by default.

src/common/async/forward_handler.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#ifndef CEPH_ASYNC_FORWARD_HANDLER_H
1616
#define CEPH_ASYNC_FORWARD_HANDLER_H
1717

18+
#include <utility>
19+
1820
#include <boost/asio/associator.hpp>
1921

2022
namespace ceph::async {

src/common/error_code.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ inline boost::system::error_condition make_error_condition(errc e) noexcept {
9494
#pragma GCC diagnostic pop
9595
#pragma clang diagnostic pop
9696

97-
inline int from_exception(std::exception_ptr eptr) {
97+
[[nodiscard]] inline int from_exception(std::exception_ptr eptr) {
9898
if (!eptr) [[likely]] {
9999
return 0;
100100
}

src/mon/MonClient.cc

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
#undef dout_prefix
6363
#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
6464

65+
namespace asio = boost::asio;
6566
namespace bs = boost::system;
6667
using std::string;
6768
using namespace std::literals;
@@ -534,8 +535,9 @@ void MonClient::shutdown()
534535
monc_lock.lock();
535536
stopping = true;
536537
while (!version_requests.empty()) {
537-
ceph::async::post(std::move(version_requests.begin()->second),
538-
monc_errc::shutting_down, 0, 0);
538+
asio::dispatch(
539+
asio::append(std::move(version_requests.begin()->second),
540+
make_error_code(monc_errc::shutting_down), 0, 0));
539541
ldout(cct, 20) << __func__ << " canceling and discarding version request "
540542
<< version_requests.begin()->first << dendl;
541543
version_requests.erase(version_requests.begin());
@@ -710,7 +712,7 @@ void MonClient::_finish_auth(int auth_err)
710712
ceph_assert(auth);
711713
_check_auth_tickets();
712714
} else if (auth_err == -EAGAIN && !active_con) {
713-
ldout(cct,10) << __func__
715+
ldout(cct,10) << __func__
714716
<< " auth returned EAGAIN, reopening the session to try again"
715717
<< dendl;
716718
_reopen_session();
@@ -767,8 +769,9 @@ void MonClient::_reopen_session(int rank)
767769

768770
// throw out version check requests
769771
while (!version_requests.empty()) {
770-
ceph::async::post(std::move(version_requests.begin()->second),
771-
monc_errc::session_reset, 0, 0);
772+
asio::dispatch(asio::append(std::move(version_requests.begin()->second),
773+
make_error_code(monc_errc::session_reset),
774+
0, 0));
772775
version_requests.erase(version_requests.begin());
773776
}
774777

@@ -1168,7 +1171,8 @@ void MonClient::_send_command(MonCommand *r)
11681171
if (r->is_tell()) {
11691172
++r->send_attempts;
11701173
if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
1171-
_finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {});
1174+
_finish_command(r, make_error_code(monc_errc::mon_unavailable),
1175+
"mon unavailable", {});
11721176
return;
11731177
}
11741178
// tell-style command
@@ -1180,7 +1184,8 @@ void MonClient::_send_command(MonCommand *r)
11801184
if (r->target_rank >= (int)monmap.size()) {
11811185
ldout(cct, 10) << " target " << r->target_rank
11821186
<< " >= max mon " << monmap.size() << dendl;
1183-
_finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
1187+
_finish_command(r, make_error_code(monc_errc::rank_dne),
1188+
"mon rank dne"sv, {});
11841189
return;
11851190
}
11861191
r->target_con = messenger->connect_to_mon(
@@ -1189,7 +1194,8 @@ void MonClient::_send_command(MonCommand *r)
11891194
if (!monmap.contains(r->target_name)) {
11901195
ldout(cct, 10) << " target " << r->target_name
11911196
<< " not present in monmap" << dendl;
1192-
_finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
1197+
_finish_command(r, make_error_code(monc_errc::mon_dne),
1198+
"mon dne"sv, {});
11931199
return;
11941200
}
11951201
r->target_con = messenger->connect_to_mon(
@@ -1224,7 +1230,8 @@ void MonClient::_send_command(MonCommand *r)
12241230
if (r->target_rank >= (int)monmap.size()) {
12251231
ldout(cct, 10) << " target " << r->target_rank
12261232
<< " >= max mon " << monmap.size() << dendl;
1227-
_finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
1233+
_finish_command(r, make_error_code(monc_errc::rank_dne),
1234+
"mon rank dne"sv, {});
12281235
return;
12291236
}
12301237
_reopen_session(r->target_rank);
@@ -1239,7 +1246,8 @@ void MonClient::_send_command(MonCommand *r)
12391246
if (!monmap.contains(r->target_name)) {
12401247
ldout(cct, 10) << " target " << r->target_name
12411248
<< " not present in monmap" << dendl;
1242-
_finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
1249+
_finish_command(r, make_error_code(monc_errc::mon_dne),
1250+
"mon dne"sv, {});
12431251
return;
12441252
}
12451253
_reopen_session(monmap.get_rank(r->target_name));
@@ -1377,7 +1385,8 @@ int MonClient::_cancel_mon_command(uint64_t tid)
13771385
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
13781386

13791387
MonCommand *cmd = it->second;
1380-
_finish_command(cmd, monc_errc::timed_out, "timed out"sv, {});
1388+
_finish_command(cmd, make_error_code(monc_errc::timed_out),
1389+
"timed out"sv, {});
13811390
return 0;
13821391
}
13831392

@@ -1386,8 +1395,9 @@ void MonClient::_finish_command(MonCommand *r, bs::error_code ret,
13861395
{
13871396
ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs
13881397
<< dendl;
1389-
ceph::async::post(std::move(r->onfinish), ret, std::string(rs),
1390-
std::move(bl));
1398+
asio::post(service.get_executor(),
1399+
asio::append(std::move(r->onfinish), ret, std::string(rs),
1400+
std::move(bl)));
13911401
if (r->target_con) {
13921402
r->target_con->mark_down();
13931403
}
@@ -1409,8 +1419,9 @@ void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
14091419
ldout(cct, 10) << __func__ << " finishing " << iter->first << " version "
14101420
<< m->version << dendl;
14111421
version_requests.erase(iter);
1412-
ceph::async::post(std::move(req), bs::error_code(),
1413-
m->version, m->oldest_version);
1422+
asio::post(service.get_executor(),
1423+
asio::append(std::move(req), bs::error_code(),
1424+
m->version, m->oldest_version));
14141425
}
14151426
m->put();
14161427
}

0 commit comments

Comments
 (0)