Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions include/bitcoin/network/channels/channel_http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace network {

/// Half-duplex reading of http-request and sending of http-response.
class BCT_API channel_http
: public channel, protected tracker<channel_http>
: public channel
{
public:
typedef std::shared_ptr<channel_http> ptr;
Expand All @@ -60,8 +60,7 @@ class BCT_API channel_http
const options_t& options) NOEXCEPT
: channel(log, socket, identifier, settings, options),
response_buffer_(system::to_shared<http::flat_buffer>()),
request_buffer_(settings.minimum_buffer),
tracker<channel_http>(log)
request_buffer_(settings.minimum_buffer)
{
}

Expand Down
5 changes: 2 additions & 3 deletions include/bitcoin/network/channels/channel_peer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace network {
/// Version into should only be written before/during handshake.
/// Attach/resume/signal_activity must be called from the strand.
class BCT_API channel_peer
: public channel, protected tracker<channel_peer>
: public channel
{
public:
typedef std::shared_ptr<channel_peer> ptr;
Expand Down Expand Up @@ -88,8 +88,7 @@ class BCT_API channel_peer
const settings_t& settings, const options_t& options) NOEXCEPT
: channel(log, socket, identifier, settings, options),
allocator_(allocator),
negotiated_version_(settings.protocol_maximum),
tracker<channel_peer>(log)
negotiated_version_(settings.protocol_maximum)
{
}

Expand Down
5 changes: 2 additions & 3 deletions include/bitcoin/network/channels/channel_ws.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace network {

/// Abstract base websocket tcp/ip channel, on base http channel.
class BCT_API channel_ws
: public channel_http, protected tracker<channel_ws>
: public channel_http
{
public:
typedef std::shared_ptr<channel_ws> ptr;
Expand All @@ -41,8 +41,7 @@ class BCT_API channel_ws
inline channel_ws(const logger& log, const socket::ptr& socket,
uint64_t identifier, const settings_t& settings,
const options_t& options) NOEXCEPT
: channel_http(log, socket, identifier, settings, options),
tracker<channel_ws>(log)
: channel_http(log, socket, identifier, settings, options)
{
}

Expand Down
2 changes: 1 addition & 1 deletion include/bitcoin/network/net/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class BCT_API socket
void do_ws_event(ws::frame_type kind,
const std::string_view& data) NOEXCEPT;

void set_websocket(const http::request& request) NOEXCEPT;
code set_websocket(const http::request& request) NOEXCEPT;

// completion
// ------------------------------------------------------------------------
Expand Down
5 changes: 2 additions & 3 deletions src/channels/channel_ws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ void channel_ws::read_request() NOEXCEPT
return;
}

request_buffer().consume(request_buffer().size());
ws_read(request_buffer(),
std::bind(&channel_ws::handle_read_websocket,
shared_from_base<channel_ws>(), _1, _2));
Expand Down Expand Up @@ -78,14 +79,12 @@ void channel_ws::handle_read_websocket(const code& ec, size_t bytes) NOEXCEPT
}

dispatch_websocket(request_buffer(), bytes);
request_buffer().consume(bytes);
}

void channel_ws::dispatch_websocket(const http::flat_buffer&,
size_t LOG_ONLY(bytes)) NOEXCEPT
{
LOGA("Websocket read of " << bytes << " bytes unhandled ["
<< authority() << "]");
LOGA("Websocket read of " << bytes << " bytes [" << authority() << "]");

// Restart reader.
read_request();
Expand Down
2 changes: 1 addition & 1 deletion src/net/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void proxy::waiting() NOEXCEPT

// Stop (socket/proxy started upon create).
// ----------------------------------------------------------------------------
// Internal stop must call stop() or async_stop().
// The proxy does not (must not) stop itself.

bool proxy::stopped() const NOEXCEPT
{
Expand Down
64 changes: 41 additions & 23 deletions src/net/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ socket::~socket() NOEXCEPT

// Stop.
// ----------------------------------------------------------------------------
// Internal stop must call stop() or async_stop().
// The socket does not (must not) stop itself.

// Immediate stop (no graceful websocket closing).
void socket::stop() NOEXCEPT
{
if (stopped_.load())
Expand All @@ -88,6 +89,9 @@ void socket::do_stop() NOEXCEPT
{
BC_ASSERT(stranded());

// Release the callback closure before shutdown/close.
if (websocket()) websocket_->control_callback();

boost_code ignore{};
auto& socket = get_transport();

Expand All @@ -99,16 +103,14 @@ void socket::do_stop() NOEXCEPT
// Any asynchronous send, receive or connect operations are canceled
// immediately, and will complete with the operation_aborted error.
socket.close(ignore);

// Discard the optional.
websocket_.reset();
}

// Called internally from stranded handle_ws_event, when peer closes websocket.
// That ensures derived proxy has a chance to invoke stopping, and dispatch
// here ensures websocket_->async_close() will get invoked before thread stop.
// Lazy stop (graceful websocket closing).
void socket::async_stop() NOEXCEPT
{
if (stopped_.load())
return;

// Stop flag accelerates work stoppage, as it does not wait on strand.
stopped_.store(true);

Expand Down Expand Up @@ -152,7 +154,6 @@ asio::socket& socket::get_transport() NOEXCEPT
void socket::accept(asio::acceptor& acceptor,
result_handler&& handler) NOEXCEPT
{
BC_ASSERT_MSG(!websocket_.has_value(), "socket is upgraded");
BC_ASSERT_MSG(!socket_.is_open(), "accept on open socket");

// Closure of the acceptor, not the socket, releases this handler.
Expand Down Expand Up @@ -428,6 +429,11 @@ void socket::do_ws_write(const asio::const_buffer& in, bool binary,
void socket::do_ws_event(ws::frame_type kind,
const std::string_view& data) NOEXCEPT
{
// Must not post to the iocontext once closed, and this is under control of
// the websocket, so must be guarded here. Otherwise the socket will leak.
if (stopped())
return;

// Takes ownership of the string.
boost::asio::dispatch(strand_,
std::bind(&socket::handle_ws_event,
Expand Down Expand Up @@ -526,8 +532,7 @@ void socket::handle_http_read(const boost_code& ec, size_t size,

if (!ec && beast::websocket::is_upgrade(request.get()))
{
set_websocket(request.get());
handler(error::upgraded, size);
handler(set_websocket(request.get()), size);
return;
}

Expand Down Expand Up @@ -611,8 +616,9 @@ void socket::handle_ws_event(ws::frame_type kind,
const std::string& data) NOEXCEPT
{
BC_ASSERT(stranded());
////BC_ASSERT(websocket());

// Beast sends the necessary responses during our read.
// Close will be picked up in our async read/write handlers.
switch (kind)
{
case ws::frame_type::ping:
Expand All @@ -623,7 +629,6 @@ void socket::handle_ws_event(ws::frame_type kind,
break;
case ws::frame_type::close:
LOGX("WS close [" << authority() << "] " << websocket_->reason());
async_stop();
break;
}
}
Expand Down Expand Up @@ -680,23 +685,36 @@ bool socket::websocket() const NOEXCEPT
return websocket_.has_value();
}

void socket::set_websocket(const http::request& request) NOEXCEPT
code socket::set_websocket(const http::request& request) NOEXCEPT
{
BC_ASSERT(stranded());
BC_ASSERT(!websocket());

websocket_.emplace(std::move(socket_));
websocket_->set_option(ws::decorator{[](http::fields& header) NOEXCEPT
try
{
// Customize the response header.
header.set(http::field::server, "libbitcoin/4.0");
}});
websocket_->accept(request);
websocket_->binary(true);
websocket_.emplace(std::move(socket_));
websocket_->set_option(ws::decorator
{
[](http::fields& header) NOEXCEPT
{
// Customize the response header.
header.set(http::field::server, "libbitcoin/4.0");
}
});

// Handle ping, pong, close.
websocket_->control_callback(std::bind(&socket::do_ws_event,
shared_from_this(), _1, _2));
// Handle ping, pong, close - must be cleared on stop.
websocket_->control_callback(std::bind(&socket::do_ws_event,
shared_from_this(), _1, _2));

websocket_->binary(true);
websocket_->accept(request);
return error::upgraded;
}
catch (const std::exception& LOG_ONLY(e))
{
LOGF("Exception @ set_websocket: " << e.what());
return error::operation_failed;
}
}

BC_POP_WARNING()
Expand Down
4 changes: 4 additions & 0 deletions src/sessions/session_inbound.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ void session_inbound::handle_accepted(const code& ec,
{
LOGS("Dropping inbound peer connection (disabled).");
socket->stop();
defer(BIND(start_accept, _1, acceptor));
return;
}

Expand All @@ -162,6 +163,7 @@ void session_inbound::handle_accepted(const code& ec,
{
LOGS("Dropping oversubscribed peer [" << socket->authority() << "].");
socket->stop();
defer(BIND(start_accept, _1, acceptor));
return;
}

Expand All @@ -171,13 +173,15 @@ void session_inbound::handle_accepted(const code& ec,
{
////LOGS("Dropping not whitelisted peer [" << socket->authority() << "].");
socket->stop();
start_accept(error::success, acceptor);
return;
}

if (blacklisted(address))
{
////LOGS("Dropping blacklisted peer [" << socket->authority() << "].");
socket->stop();
start_accept(error::success, acceptor);
return;
}

Expand Down
8 changes: 6 additions & 2 deletions src/sessions/session_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ void session_server::start_accept(const code&,
acceptor->accept(BIND(handle_accepted, _1, _2, acceptor));
}

void session_server::handle_accepted(const code& ec,
const socket::ptr& socket, const acceptor::ptr& acceptor) NOEXCEPT
void session_server::handle_accepted(const code& ec, const socket::ptr& socket,
const acceptor::ptr& acceptor) NOEXCEPT
{
BC_ASSERT(stranded());

Expand Down Expand Up @@ -157,6 +157,7 @@ void session_server::handle_accepted(const code& ec,
{
LOGS("Dropping " << name_ << " connection (disabled).");
socket->stop();
defer(BIND(start_accept, _1, acceptor));
return;
}

Expand All @@ -167,6 +168,7 @@ void session_server::handle_accepted(const code& ec,
LOGS("Dropping oversubscribed " << name_ << " connection ["
<< socket->authority() << "].");
socket->stop();
defer(BIND(start_accept, _1, acceptor));
return;
}

Expand All @@ -176,13 +178,15 @@ void session_server::handle_accepted(const code& ec,
{
////LOGS("Dropping not whitelisted peer [" << socket->authority() << "].");
socket->stop();
start_accept(error::success, acceptor);
return;
}

if (blacklisted(address))
{
////LOGS("Dropping blacklisted peer [" << socket->authority() << "].");
socket->stop();
start_accept(error::success, acceptor);
return;
}

Expand Down
Loading