diff --git a/include/bitcoin/network/channels/channel_http.hpp b/include/bitcoin/network/channels/channel_http.hpp index a9fda2cd6..727c57f41 100644 --- a/include/bitcoin/network/channels/channel_http.hpp +++ b/include/bitcoin/network/channels/channel_http.hpp @@ -34,7 +34,7 @@ namespace network { /// Half-duplex reading of http-request and sending of http-response. class BCT_API channel_http - : public channel, protected tracker + : public channel { public: typedef std::shared_ptr ptr; @@ -60,8 +60,7 @@ class BCT_API channel_http const options_t& options) NOEXCEPT : channel(log, socket, identifier, settings, options), response_buffer_(system::to_shared()), - request_buffer_(settings.minimum_buffer), - tracker(log) + request_buffer_(settings.minimum_buffer) { } diff --git a/include/bitcoin/network/channels/channel_peer.hpp b/include/bitcoin/network/channels/channel_peer.hpp index 21c86921a..a83d857ad 100644 --- a/include/bitcoin/network/channels/channel_peer.hpp +++ b/include/bitcoin/network/channels/channel_peer.hpp @@ -34,7 +34,7 @@ namespace network { /// Version into should only be written before/during handshake. /// Attach/resume/signal_activity must be called from the strand. class BCT_API channel_peer - : public channel, protected tracker + : public channel { public: typedef std::shared_ptr ptr; @@ -88,8 +88,7 @@ class BCT_API channel_peer const settings_t& settings, const options_t& options) NOEXCEPT : channel(log, socket, identifier, settings, options), allocator_(allocator), - negotiated_version_(settings.protocol_maximum), - tracker(log) + negotiated_version_(settings.protocol_maximum) { } diff --git a/include/bitcoin/network/channels/channel_ws.hpp b/include/bitcoin/network/channels/channel_ws.hpp index 31237d15b..705de085c 100644 --- a/include/bitcoin/network/channels/channel_ws.hpp +++ b/include/bitcoin/network/channels/channel_ws.hpp @@ -31,7 +31,7 @@ namespace network { /// Abstract base websocket tcp/ip channel, on base http channel. class BCT_API channel_ws - : public channel_http, protected tracker + : public channel_http { public: typedef std::shared_ptr ptr; @@ -41,8 +41,7 @@ class BCT_API channel_ws inline channel_ws(const logger& log, const socket::ptr& socket, uint64_t identifier, const settings_t& settings, const options_t& options) NOEXCEPT - : channel_http(log, socket, identifier, settings, options), - tracker(log) + : channel_http(log, socket, identifier, settings, options) { } diff --git a/include/bitcoin/network/net/socket.hpp b/include/bitcoin/network/net/socket.hpp index 9a34d7796..060a4ac53 100644 --- a/include/bitcoin/network/net/socket.hpp +++ b/include/bitcoin/network/net/socket.hpp @@ -174,7 +174,7 @@ class BCT_API socket void do_ws_event(ws::frame_type kind, const std::string_view& data) NOEXCEPT; - void set_websocket(const http::request& request) NOEXCEPT; + code set_websocket(const http::request& request) NOEXCEPT; // completion // ------------------------------------------------------------------------ diff --git a/src/channels/channel_ws.cpp b/src/channels/channel_ws.cpp index 649a3cbae..03fecb687 100644 --- a/src/channels/channel_ws.cpp +++ b/src/channels/channel_ws.cpp @@ -46,6 +46,7 @@ void channel_ws::read_request() NOEXCEPT return; } + request_buffer().consume(request_buffer().size()); ws_read(request_buffer(), std::bind(&channel_ws::handle_read_websocket, shared_from_base(), _1, _2)); @@ -78,14 +79,12 @@ void channel_ws::handle_read_websocket(const code& ec, size_t bytes) NOEXCEPT } dispatch_websocket(request_buffer(), bytes); - request_buffer().consume(bytes); } void channel_ws::dispatch_websocket(const http::flat_buffer&, size_t LOG_ONLY(bytes)) NOEXCEPT { - LOGA("Websocket read of " << bytes << " bytes unhandled [" - << authority() << "]"); + LOGA("Websocket read of " << bytes << " bytes [" << authority() << "]"); // Restart reader. read_request(); diff --git a/src/net/proxy.cpp b/src/net/proxy.cpp index d6a473a8a..e0b8025db 100644 --- a/src/net/proxy.cpp +++ b/src/net/proxy.cpp @@ -81,7 +81,7 @@ void proxy::waiting() NOEXCEPT // Stop (socket/proxy started upon create). // ---------------------------------------------------------------------------- -// Internal stop must call stop() or async_stop(). +// The proxy does not (must not) stop itself. bool proxy::stopped() const NOEXCEPT { diff --git a/src/net/socket.cpp b/src/net/socket.cpp index a6806ce19..1daf9b896 100644 --- a/src/net/socket.cpp +++ b/src/net/socket.cpp @@ -68,8 +68,9 @@ socket::~socket() NOEXCEPT // Stop. // ---------------------------------------------------------------------------- -// Internal stop must call stop() or async_stop(). +// The socket does not (must not) stop itself. +// Immediate stop (no graceful websocket closing). void socket::stop() NOEXCEPT { if (stopped_.load()) @@ -88,6 +89,9 @@ void socket::do_stop() NOEXCEPT { BC_ASSERT(stranded()); + // Release the callback closure before shutdown/close. + if (websocket()) websocket_->control_callback(); + boost_code ignore{}; auto& socket = get_transport(); @@ -99,16 +103,14 @@ void socket::do_stop() NOEXCEPT // Any asynchronous send, receive or connect operations are canceled // immediately, and will complete with the operation_aborted error. socket.close(ignore); - - // Discard the optional. - websocket_.reset(); } -// Called internally from stranded handle_ws_event, when peer closes websocket. -// That ensures derived proxy has a chance to invoke stopping, and dispatch -// here ensures websocket_->async_close() will get invoked before thread stop. +// Lazy stop (graceful websocket closing). void socket::async_stop() NOEXCEPT { + if (stopped_.load()) + return; + // Stop flag accelerates work stoppage, as it does not wait on strand. stopped_.store(true); @@ -152,7 +154,6 @@ asio::socket& socket::get_transport() NOEXCEPT void socket::accept(asio::acceptor& acceptor, result_handler&& handler) NOEXCEPT { - BC_ASSERT_MSG(!websocket_.has_value(), "socket is upgraded"); BC_ASSERT_MSG(!socket_.is_open(), "accept on open socket"); // Closure of the acceptor, not the socket, releases this handler. @@ -428,6 +429,11 @@ void socket::do_ws_write(const asio::const_buffer& in, bool binary, void socket::do_ws_event(ws::frame_type kind, const std::string_view& data) NOEXCEPT { + // Must not post to the iocontext once closed, and this is under control of + // the websocket, so must be guarded here. Otherwise the socket will leak. + if (stopped()) + return; + // Takes ownership of the string. boost::asio::dispatch(strand_, std::bind(&socket::handle_ws_event, @@ -526,8 +532,7 @@ void socket::handle_http_read(const boost_code& ec, size_t size, if (!ec && beast::websocket::is_upgrade(request.get())) { - set_websocket(request.get()); - handler(error::upgraded, size); + handler(set_websocket(request.get()), size); return; } @@ -611,8 +616,9 @@ void socket::handle_ws_event(ws::frame_type kind, const std::string& data) NOEXCEPT { BC_ASSERT(stranded()); - ////BC_ASSERT(websocket()); + // Beast sends the necessary responses during our read. + // Close will be picked up in our async read/write handlers. switch (kind) { case ws::frame_type::ping: @@ -623,7 +629,6 @@ void socket::handle_ws_event(ws::frame_type kind, break; case ws::frame_type::close: LOGX("WS close [" << authority() << "] " << websocket_->reason()); - async_stop(); break; } } @@ -680,23 +685,36 @@ bool socket::websocket() const NOEXCEPT return websocket_.has_value(); } -void socket::set_websocket(const http::request& request) NOEXCEPT +code socket::set_websocket(const http::request& request) NOEXCEPT { BC_ASSERT(stranded()); BC_ASSERT(!websocket()); - websocket_.emplace(std::move(socket_)); - websocket_->set_option(ws::decorator{[](http::fields& header) NOEXCEPT + try { - // Customize the response header. - header.set(http::field::server, "libbitcoin/4.0"); - }}); - websocket_->accept(request); - websocket_->binary(true); + websocket_.emplace(std::move(socket_)); + websocket_->set_option(ws::decorator + { + [](http::fields& header) NOEXCEPT + { + // Customize the response header. + header.set(http::field::server, "libbitcoin/4.0"); + } + }); - // Handle ping, pong, close. - websocket_->control_callback(std::bind(&socket::do_ws_event, - shared_from_this(), _1, _2)); + // Handle ping, pong, close - must be cleared on stop. + websocket_->control_callback(std::bind(&socket::do_ws_event, + shared_from_this(), _1, _2)); + + websocket_->binary(true); + websocket_->accept(request); + return error::upgraded; + } + catch (const std::exception& LOG_ONLY(e)) + { + LOGF("Exception @ set_websocket: " << e.what()); + return error::operation_failed; + } } BC_POP_WARNING() diff --git a/src/sessions/session_inbound.cpp b/src/sessions/session_inbound.cpp index 369333f71..11a092748 100644 --- a/src/sessions/session_inbound.cpp +++ b/src/sessions/session_inbound.cpp @@ -154,6 +154,7 @@ void session_inbound::handle_accepted(const code& ec, { LOGS("Dropping inbound peer connection (disabled)."); socket->stop(); + defer(BIND(start_accept, _1, acceptor)); return; } @@ -162,6 +163,7 @@ void session_inbound::handle_accepted(const code& ec, { LOGS("Dropping oversubscribed peer [" << socket->authority() << "]."); socket->stop(); + defer(BIND(start_accept, _1, acceptor)); return; } @@ -171,6 +173,7 @@ void session_inbound::handle_accepted(const code& ec, { ////LOGS("Dropping not whitelisted peer [" << socket->authority() << "]."); socket->stop(); + start_accept(error::success, acceptor); return; } @@ -178,6 +181,7 @@ void session_inbound::handle_accepted(const code& ec, { ////LOGS("Dropping blacklisted peer [" << socket->authority() << "]."); socket->stop(); + start_accept(error::success, acceptor); return; } diff --git a/src/sessions/session_server.cpp b/src/sessions/session_server.cpp index 381c6589d..9d2561d16 100644 --- a/src/sessions/session_server.cpp +++ b/src/sessions/session_server.cpp @@ -124,8 +124,8 @@ void session_server::start_accept(const code&, acceptor->accept(BIND(handle_accepted, _1, _2, acceptor)); } -void session_server::handle_accepted(const code& ec, - const socket::ptr& socket, const acceptor::ptr& acceptor) NOEXCEPT +void session_server::handle_accepted(const code& ec, const socket::ptr& socket, + const acceptor::ptr& acceptor) NOEXCEPT { BC_ASSERT(stranded()); @@ -157,6 +157,7 @@ void session_server::handle_accepted(const code& ec, { LOGS("Dropping " << name_ << " connection (disabled)."); socket->stop(); + defer(BIND(start_accept, _1, acceptor)); return; } @@ -167,6 +168,7 @@ void session_server::handle_accepted(const code& ec, LOGS("Dropping oversubscribed " << name_ << " connection [" << socket->authority() << "]."); socket->stop(); + defer(BIND(start_accept, _1, acceptor)); return; } @@ -176,6 +178,7 @@ void session_server::handle_accepted(const code& ec, { ////LOGS("Dropping not whitelisted peer [" << socket->authority() << "]."); socket->stop(); + start_accept(error::success, acceptor); return; } @@ -183,6 +186,7 @@ void session_server::handle_accepted(const code& ec, { ////LOGS("Dropping blacklisted peer [" << socket->authority() << "]."); socket->stop(); + start_accept(error::success, acceptor); return; }