33
44#include < atomic>
55#include < ctime>
6+ #include < list>
67#include < memory>
7- #include < vector>
88
9+ #include < boost/asio/bind_executor.hpp>
10+ #include < boost/asio/bind_cancellation_slot.hpp>
11+ #include < boost/asio/cancellation_signal.hpp>
12+ #include < boost/asio/detached.hpp>
913#include < boost/asio/error.hpp>
1014#include < boost/asio/io_context.hpp>
1115#include < boost/asio/ip/tcp.hpp>
@@ -64,6 +68,44 @@ auto make_stack_allocator() {
6468 return boost::context::protected_fixedsize_stack{512 *1024 };
6569}
6670
71+ static constexpr std::chrono::milliseconds BACKOFF_MAX_WAIT (5000 );
72+
73+ class RGWAsioBackoff {
74+ using Clock = ceph::coarse_mono_clock;
75+ using Timer = boost::asio::basic_waitable_timer<Clock>;
76+ Timer timer;
77+
78+ ceph::timespan cur_wait;
79+ void update_wait_time ();
80+ public:
81+ explicit RGWAsioBackoff (boost::asio::io_context& context) :
82+ timer(context),
83+ cur_wait(std::chrono::milliseconds(1 )) {
84+ }
85+
86+ void backoff_sleep (boost::asio::yield_context yield);
87+ void reset () {
88+ cur_wait = std::chrono::milliseconds (1 );
89+ }
90+ };
91+
92+ void RGWAsioBackoff::update_wait_time ()
93+ {
94+ if (cur_wait < BACKOFF_MAX_WAIT) {
95+ cur_wait = cur_wait * 2 ;
96+ }
97+ if (cur_wait > BACKOFF_MAX_WAIT) {
98+ cur_wait = BACKOFF_MAX_WAIT;
99+ }
100+ }
101+
102+ void RGWAsioBackoff::backoff_sleep (boost::asio::yield_context yield)
103+ {
104+ update_wait_time ();
105+ timer.expires_after (cur_wait);
106+ timer.async_wait (yield);
107+ }
108+
67109using namespace std ;
68110
69111template <typename Stream>
@@ -424,29 +466,34 @@ class AsioFrontend {
424466 tcp::endpoint endpoint;
425467 tcp::acceptor acceptor;
426468 tcp::socket socket;
469+ boost::asio::cancellation_signal signal;
427470 bool use_ssl = false ;
428471 bool use_nodelay = false ;
429472
430473 explicit Listener (boost::asio::io_context& context)
431474 : acceptor(context), socket(context) {}
432475 };
433- std::vector <Listener> listeners;
476+ std::list <Listener> listeners;
434477
435478 ConnectionList connections;
436479
437480 std::atomic<bool > going_down{false };
438481
482+ RGWAsioBackoff backoff;
439483 CephContext* ctx () const { return cct.get (); }
440484 std::optional<dmc::ClientCounters> client_counters;
441485 std::unique_ptr<dmc::ClientConfig> client_config;
442- void accept (Listener& listener, boost::system::error_code ec);
486+
487+ void accept (Listener& listener, boost::asio::yield_context yield);
488+ void on_accept (Listener& listener, tcp::socket stream);
443489
444490 public:
445491 AsioFrontend (RGWProcessEnv& env, RGWFrontendConfig* conf,
446492 dmc::SchedulerCtx& sched_ctx,
447493 boost::asio::io_context& context)
448494 : env(env), conf(conf), context(context),
449- pause_mutex (context.get_executor())
495+ pause_mutex (context.get_executor()),
496+ backoff(context)
450497 {
451498 auto sched_t = dmc::get_scheduler_t (ctx ());
452499 switch (sched_t ){
@@ -683,10 +730,13 @@ int AsioFrontend::init()
683730 }
684731 }
685732 l.acceptor .listen (max_connection_backlog);
686- l.acceptor .async_accept (l.socket ,
687- [this , &l] (boost::system::error_code ec) {
688- accept (l, ec);
689- });
733+
734+ // spawn a cancellable coroutine to the run the accept loop
735+ boost::asio::spawn (context,
736+ [this , &l] (boost::asio::yield_context yield) mutable {
737+ accept (l, yield);
738+ }, bind_cancellation_slot (l.signal .slot (),
739+ bind_executor (context, boost::asio::detached)));
690740
691741 ldout (ctx (), 4 ) << " frontend listening on " << l.endpoint << dendl;
692742 socket_bound = true ;
@@ -1003,22 +1053,39 @@ int AsioFrontend::init_ssl()
10031053}
10041054#endif // WITH_RADOSGW_BEAST_OPENSSL
10051055
1006- void AsioFrontend::accept (Listener& l, boost::system::error_code ec )
1056+ void AsioFrontend::accept (Listener& l, boost::asio::yield_context yield )
10071057{
1008- if (!l.acceptor .is_open ()) {
1009- return ;
1010- } else if (ec == boost::asio::error::operation_aborted) {
1011- return ;
1012- } else if (ec) {
1013- ldout (ctx (), 1 ) << " accept failed: " << ec.message () << dendl;
1014- return ;
1058+ for (;;) {
1059+ boost::system::error_code ec;
1060+ l.acceptor .async_accept (l.socket , yield[ec]);
1061+
1062+ if (!l.acceptor .is_open ()) {
1063+ return ;
1064+ } else if (ec == boost::asio::error::operation_aborted) {
1065+ return ;
1066+ } else if (ec) {
1067+ ldout (ctx (), 1 ) << " accept failed: " << ec.message () << dendl;
1068+ if (ec == boost::system::errc::too_many_files_open ||
1069+ ec == boost::system::errc::too_many_files_open_in_system ||
1070+ ec == boost::system::errc::no_buffer_space ||
1071+ ec == boost::system::errc::not_enough_memory) {
1072+ // always retry accept() if we hit a resource limit
1073+ backoff.backoff_sleep (yield);
1074+ continue ;
1075+ }
1076+ ldout (ctx (), 0 ) << " accept stopped due to error: " << ec.message () << dendl;
1077+ return ;
1078+ }
1079+
1080+ backoff.reset ();
1081+ on_accept (l, std::move (l.socket ));
10151082 }
1016- auto stream = std::move (l.socket );
1083+ }
1084+
1085+ void AsioFrontend::on_accept (Listener& l, tcp::socket stream)
1086+ {
1087+ boost::system::error_code ec;
10171088 stream.set_option (tcp::no_delay (l.use_nodelay ), ec);
1018- l.acceptor .async_accept (l.socket ,
1019- [this , &l] (boost::system::error_code ec) {
1020- accept (l, ec);
1021- });
10221089
10231090 // spawn a coroutine to handle the connection
10241091#ifdef WITH_RADOSGW_BEAST_OPENSSL
@@ -1086,6 +1153,8 @@ void AsioFrontend::stop()
10861153 // close all listeners
10871154 for (auto & listener : listeners) {
10881155 listener.acceptor .close (ec);
1156+ // signal cancellation of accept()
1157+ listener.signal .emit (boost::asio::cancellation_type::terminal);
10891158 }
10901159 // close all connections
10911160 connections.close (ec);
@@ -1107,6 +1176,8 @@ void AsioFrontend::pause()
11071176 boost::system::error_code ec;
11081177 for (auto & l : listeners) {
11091178 l.acceptor .cancel (ec);
1179+ // signal cancellation of accept()
1180+ l.signal .emit (boost::asio::cancellation_type::terminal);
11101181 }
11111182
11121183 // close all connections so outstanding requests fail quickly
@@ -1129,10 +1200,12 @@ void AsioFrontend::unpause()
11291200
11301201 // start accepting connections again
11311202 for (auto & l : listeners) {
1132- l.acceptor .async_accept (l.socket ,
1133- [this , &l] (boost::system::error_code ec) {
1134- accept (l, ec);
1135- });
1203+ boost::asio::spawn (context,
1204+ [this , &l] (boost::asio::yield_context yield) mutable {
1205+ accept (l, yield);
1206+ }, bind_cancellation_slot (l.signal .slot (),
1207+ bind_executor (context, boost::asio::detached)));
1208+
11361209 }
11371210
11381211 ldout (ctx (), 4 ) << " frontend unpaused" << dendl;
0 commit comments