Skip to content

Commit 4e76a07

Browse files
authored
Merge pull request #660 from evoskuil/master
Fix channel_ws failure to release ws events capture.
2 parents 8cbc3e3 + 676fcfc commit 4e76a07

File tree

9 files changed

+61
-39
lines changed

9 files changed

+61
-39
lines changed

include/bitcoin/network/channels/channel_http.hpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ namespace network {
3434

3535
/// Half-duplex reading of http-request and sending of http-response.
3636
class BCT_API channel_http
37-
: public channel, protected tracker<channel_http>
37+
: public channel
3838
{
3939
public:
4040
typedef std::shared_ptr<channel_http> ptr;
@@ -60,8 +60,7 @@ class BCT_API channel_http
6060
const options_t& options) NOEXCEPT
6161
: channel(log, socket, identifier, settings, options),
6262
response_buffer_(system::to_shared<http::flat_buffer>()),
63-
request_buffer_(settings.minimum_buffer),
64-
tracker<channel_http>(log)
63+
request_buffer_(settings.minimum_buffer)
6564
{
6665
}
6766

include/bitcoin/network/channels/channel_peer.hpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ namespace network {
3434
/// Version into should only be written before/during handshake.
3535
/// Attach/resume/signal_activity must be called from the strand.
3636
class BCT_API channel_peer
37-
: public channel, protected tracker<channel_peer>
37+
: public channel
3838
{
3939
public:
4040
typedef std::shared_ptr<channel_peer> ptr;
@@ -88,8 +88,7 @@ class BCT_API channel_peer
8888
const settings_t& settings, const options_t& options) NOEXCEPT
8989
: channel(log, socket, identifier, settings, options),
9090
allocator_(allocator),
91-
negotiated_version_(settings.protocol_maximum),
92-
tracker<channel_peer>(log)
91+
negotiated_version_(settings.protocol_maximum)
9392
{
9493
}
9594

include/bitcoin/network/channels/channel_ws.hpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ namespace network {
3131

3232
/// Abstract base websocket tcp/ip channel, on base http channel.
3333
class BCT_API channel_ws
34-
: public channel_http, protected tracker<channel_ws>
34+
: public channel_http
3535
{
3636
public:
3737
typedef std::shared_ptr<channel_ws> ptr;
@@ -41,8 +41,7 @@ class BCT_API channel_ws
4141
inline channel_ws(const logger& log, const socket::ptr& socket,
4242
uint64_t identifier, const settings_t& settings,
4343
const options_t& options) NOEXCEPT
44-
: channel_http(log, socket, identifier, settings, options),
45-
tracker<channel_ws>(log)
44+
: channel_http(log, socket, identifier, settings, options)
4645
{
4746
}
4847

include/bitcoin/network/net/socket.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ class BCT_API socket
174174
void do_ws_event(ws::frame_type kind,
175175
const std::string_view& data) NOEXCEPT;
176176

177-
void set_websocket(const http::request& request) NOEXCEPT;
177+
code set_websocket(const http::request& request) NOEXCEPT;
178178

179179
// completion
180180
// ------------------------------------------------------------------------

src/channels/channel_ws.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ void channel_ws::read_request() NOEXCEPT
4646
return;
4747
}
4848

49+
request_buffer().consume(request_buffer().size());
4950
ws_read(request_buffer(),
5051
std::bind(&channel_ws::handle_read_websocket,
5152
shared_from_base<channel_ws>(), _1, _2));
@@ -78,14 +79,12 @@ void channel_ws::handle_read_websocket(const code& ec, size_t bytes) NOEXCEPT
7879
}
7980

8081
dispatch_websocket(request_buffer(), bytes);
81-
request_buffer().consume(bytes);
8282
}
8383

8484
void channel_ws::dispatch_websocket(const http::flat_buffer&,
8585
size_t LOG_ONLY(bytes)) NOEXCEPT
8686
{
87-
LOGA("Websocket read of " << bytes << " bytes unhandled ["
88-
<< authority() << "]");
87+
LOGA("Websocket read of " << bytes << " bytes [" << authority() << "]");
8988

9089
// Restart reader.
9190
read_request();

src/net/proxy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void proxy::waiting() NOEXCEPT
8181

8282
// Stop (socket/proxy started upon create).
8383
// ----------------------------------------------------------------------------
84-
// Internal stop must call stop() or async_stop().
84+
// The proxy does not (must not) stop itself.
8585

8686
bool proxy::stopped() const NOEXCEPT
8787
{

src/net/socket.cpp

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ socket::~socket() NOEXCEPT
6868

6969
// Stop.
7070
// ----------------------------------------------------------------------------
71-
// Internal stop must call stop() or async_stop().
71+
// The socket does not (must not) stop itself.
7272

73+
// Immediate stop (no graceful websocket closing).
7374
void socket::stop() NOEXCEPT
7475
{
7576
if (stopped_.load())
@@ -88,6 +89,9 @@ void socket::do_stop() NOEXCEPT
8889
{
8990
BC_ASSERT(stranded());
9091

92+
// Release the callback closure before shutdown/close.
93+
if (websocket()) websocket_->control_callback();
94+
9195
boost_code ignore{};
9296
auto& socket = get_transport();
9397

@@ -99,16 +103,14 @@ void socket::do_stop() NOEXCEPT
99103
// Any asynchronous send, receive or connect operations are canceled
100104
// immediately, and will complete with the operation_aborted error.
101105
socket.close(ignore);
102-
103-
// Discard the optional.
104-
websocket_.reset();
105106
}
106107

107-
// Called internally from stranded handle_ws_event, when peer closes websocket.
108-
// That ensures derived proxy has a chance to invoke stopping, and dispatch
109-
// here ensures websocket_->async_close() will get invoked before thread stop.
108+
// Lazy stop (graceful websocket closing).
110109
void socket::async_stop() NOEXCEPT
111110
{
111+
if (stopped_.load())
112+
return;
113+
112114
// Stop flag accelerates work stoppage, as it does not wait on strand.
113115
stopped_.store(true);
114116

@@ -152,7 +154,6 @@ asio::socket& socket::get_transport() NOEXCEPT
152154
void socket::accept(asio::acceptor& acceptor,
153155
result_handler&& handler) NOEXCEPT
154156
{
155-
BC_ASSERT_MSG(!websocket_.has_value(), "socket is upgraded");
156157
BC_ASSERT_MSG(!socket_.is_open(), "accept on open socket");
157158

158159
// Closure of the acceptor, not the socket, releases this handler.
@@ -428,6 +429,11 @@ void socket::do_ws_write(const asio::const_buffer& in, bool binary,
428429
void socket::do_ws_event(ws::frame_type kind,
429430
const std::string_view& data) NOEXCEPT
430431
{
432+
// Must not post to the iocontext once closed, and this is under control of
433+
// the websocket, so must be guarded here. Otherwise the socket will leak.
434+
if (stopped())
435+
return;
436+
431437
// Takes ownership of the string.
432438
boost::asio::dispatch(strand_,
433439
std::bind(&socket::handle_ws_event,
@@ -526,8 +532,7 @@ void socket::handle_http_read(const boost_code& ec, size_t size,
526532

527533
if (!ec && beast::websocket::is_upgrade(request.get()))
528534
{
529-
set_websocket(request.get());
530-
handler(error::upgraded, size);
535+
handler(set_websocket(request.get()), size);
531536
return;
532537
}
533538

@@ -611,8 +616,9 @@ void socket::handle_ws_event(ws::frame_type kind,
611616
const std::string& data) NOEXCEPT
612617
{
613618
BC_ASSERT(stranded());
614-
////BC_ASSERT(websocket());
615619

620+
// Beast sends the necessary responses during our read.
621+
// Close will be picked up in our async read/write handlers.
616622
switch (kind)
617623
{
618624
case ws::frame_type::ping:
@@ -623,7 +629,6 @@ void socket::handle_ws_event(ws::frame_type kind,
623629
break;
624630
case ws::frame_type::close:
625631
LOGX("WS close [" << authority() << "] " << websocket_->reason());
626-
async_stop();
627632
break;
628633
}
629634
}
@@ -680,23 +685,36 @@ bool socket::websocket() const NOEXCEPT
680685
return websocket_.has_value();
681686
}
682687

683-
void socket::set_websocket(const http::request& request) NOEXCEPT
688+
code socket::set_websocket(const http::request& request) NOEXCEPT
684689
{
685690
BC_ASSERT(stranded());
686691
BC_ASSERT(!websocket());
687692

688-
websocket_.emplace(std::move(socket_));
689-
websocket_->set_option(ws::decorator{[](http::fields& header) NOEXCEPT
693+
try
690694
{
691-
// Customize the response header.
692-
header.set(http::field::server, "libbitcoin/4.0");
693-
}});
694-
websocket_->accept(request);
695-
websocket_->binary(true);
695+
websocket_.emplace(std::move(socket_));
696+
websocket_->set_option(ws::decorator
697+
{
698+
[](http::fields& header) NOEXCEPT
699+
{
700+
// Customize the response header.
701+
header.set(http::field::server, "libbitcoin/4.0");
702+
}
703+
});
696704

697-
// Handle ping, pong, close.
698-
websocket_->control_callback(std::bind(&socket::do_ws_event,
699-
shared_from_this(), _1, _2));
705+
// Handle ping, pong, close - must be cleared on stop.
706+
websocket_->control_callback(std::bind(&socket::do_ws_event,
707+
shared_from_this(), _1, _2));
708+
709+
websocket_->binary(true);
710+
websocket_->accept(request);
711+
return error::upgraded;
712+
}
713+
catch (const std::exception& LOG_ONLY(e))
714+
{
715+
LOGF("Exception @ set_websocket: " << e.what());
716+
return error::operation_failed;
717+
}
700718
}
701719

702720
BC_POP_WARNING()

src/sessions/session_inbound.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ void session_inbound::handle_accepted(const code& ec,
154154
{
155155
LOGS("Dropping inbound peer connection (disabled).");
156156
socket->stop();
157+
defer(BIND(start_accept, _1, acceptor));
157158
return;
158159
}
159160

@@ -162,6 +163,7 @@ void session_inbound::handle_accepted(const code& ec,
162163
{
163164
LOGS("Dropping oversubscribed peer [" << socket->authority() << "].");
164165
socket->stop();
166+
defer(BIND(start_accept, _1, acceptor));
165167
return;
166168
}
167169

@@ -171,13 +173,15 @@ void session_inbound::handle_accepted(const code& ec,
171173
{
172174
////LOGS("Dropping not whitelisted peer [" << socket->authority() << "].");
173175
socket->stop();
176+
start_accept(error::success, acceptor);
174177
return;
175178
}
176179

177180
if (blacklisted(address))
178181
{
179182
////LOGS("Dropping blacklisted peer [" << socket->authority() << "].");
180183
socket->stop();
184+
start_accept(error::success, acceptor);
181185
return;
182186
}
183187

src/sessions/session_server.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ void session_server::start_accept(const code&,
124124
acceptor->accept(BIND(handle_accepted, _1, _2, acceptor));
125125
}
126126

127-
void session_server::handle_accepted(const code& ec,
128-
const socket::ptr& socket, const acceptor::ptr& acceptor) NOEXCEPT
127+
void session_server::handle_accepted(const code& ec, const socket::ptr& socket,
128+
const acceptor::ptr& acceptor) NOEXCEPT
129129
{
130130
BC_ASSERT(stranded());
131131

@@ -157,6 +157,7 @@ void session_server::handle_accepted(const code& ec,
157157
{
158158
LOGS("Dropping " << name_ << " connection (disabled).");
159159
socket->stop();
160+
defer(BIND(start_accept, _1, acceptor));
160161
return;
161162
}
162163

@@ -167,6 +168,7 @@ void session_server::handle_accepted(const code& ec,
167168
LOGS("Dropping oversubscribed " << name_ << " connection ["
168169
<< socket->authority() << "].");
169170
socket->stop();
171+
defer(BIND(start_accept, _1, acceptor));
170172
return;
171173
}
172174

@@ -176,13 +178,15 @@ void session_server::handle_accepted(const code& ec,
176178
{
177179
////LOGS("Dropping not whitelisted peer [" << socket->authority() << "].");
178180
socket->stop();
181+
start_accept(error::success, acceptor);
179182
return;
180183
}
181184

182185
if (blacklisted(address))
183186
{
184187
////LOGS("Dropping blacklisted peer [" << socket->authority() << "].");
185188
socket->stop();
189+
start_accept(error::success, acceptor);
186190
return;
187191
}
188192

0 commit comments

Comments
 (0)