Skip to content

Commit d8d08b0

Browse files
authored
Merge pull request #663 from evoskuil/master
Add socket monitoring so close is detected w/o read.
2 parents d5269b0 + cb59784 commit d8d08b0

File tree

8 files changed

+148
-1
lines changed

8 files changed

+148
-1
lines changed

include/bitcoin/network/channels/channel.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ class BCT_API channel
7474
/// Resume reading from the socket, starts timers (requires strand).
7575
void resume() NOEXCEPT override;
7676

77+
/// Monitor/unmonitor the socket for cancel/write (requires strand).
78+
void monitor(bool value) NOEXCEPT;
79+
7780
/// Seconds before channel expires, zero if expired (requires strand).
7881
size_t remaining() const NOEXCEPT;
7982

@@ -106,6 +109,8 @@ class BCT_API channel
106109
void start_inactivity() NOEXCEPT;
107110
void handle_inactivity(const code& ec) NOEXCEPT;
108111

112+
void handle_monitor(const code& ec) NOEXCEPT;
113+
109114
// These are thread safe (const).
110115
const settings_t& settings_;
111116
const uint64_t identifier_;

include/bitcoin/network/net/proxy.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,15 @@ class BCT_API proxy
108108
/// Subscribe to stop notification (requires strand).
109109
void subscribe_stop(result_handler&& handler) NOEXCEPT;
110110

111+
/// Wait.
112+
/// -----------------------------------------------------------------------
113+
114+
/// Wait on a peer close/cancel/send, no data capture/loss.
115+
virtual void wait(result_handler&& handler) NOEXCEPT;
116+
117+
/// Cancel wait or any asynchronous read/write operation, handlers posted.
118+
virtual void cancel(result_handler&& handler) NOEXCEPT;
119+
111120
/// TCP.
112121
/// -----------------------------------------------------------------------
113122

include/bitcoin/network/net/socket.hpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ class BCT_API socket
6767
/// Same as stop but provides graceful shutdown for websocket connections.
6868
virtual void async_stop() NOEXCEPT;
6969

70+
/// Wait.
71+
/// -----------------------------------------------------------------------
72+
73+
/// Wait on a peer close/cancel/send, no data capture/loss.
74+
virtual void wait(result_handler&& handler) NOEXCEPT;
75+
76+
/// Cancel wait or any asynchronous read/write operation, handlers posted.
77+
virtual void cancel(result_handler&& handler) NOEXCEPT;
78+
7079
/// Connection.
7180
/// -----------------------------------------------------------------------
7281

@@ -145,6 +154,12 @@ class BCT_API socket
145154
void do_async_stop() NOEXCEPT;
146155
asio::socket& get_transport() NOEXCEPT;
147156

157+
// wait
158+
// ------------------------------------------------------------------------
159+
160+
void do_wait(const result_handler& handler) NOEXCEPT;
161+
void do_cancel(const result_handler& handler) NOEXCEPT;
162+
148163
// stranded
149164
// ------------------------------------------------------------------------
150165

@@ -179,6 +194,10 @@ class BCT_API socket
179194
// completion
180195
// ------------------------------------------------------------------------
181196

197+
// wait
198+
void handle_wait(const boost_code& ec,
199+
const result_handler& handler) NOEXCEPT;
200+
182201
// connection
183202
void handle_accept(const boost_code& ec,
184203
const result_handler& handler) NOEXCEPT;

include/bitcoin/network/protocols/protocol.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ class BCT_API protocol
160160
/// Resume reading from the socket, starts timers (requires strand).
161161
virtual void resume() NOEXCEPT;
162162

163+
/// Monitor/unmonitor the socket for cancel/write (requires strand).
164+
virtual void monitor(bool value) NOEXCEPT;
165+
163166
/// Seconds before channel expires, zero if expired (requires strand).
164167
virtual size_t remaining() const NOEXCEPT;
165168

src/channels/channel.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
namespace libbitcoin {
2828
namespace network {
2929

30+
#define CLASS channel
31+
3032
using namespace system;
3133
using namespace std::placeholders;
3234

@@ -90,6 +92,22 @@ void channel::resume() NOEXCEPT
9092
proxy::resume();
9193
}
9294

95+
void channel::monitor(bool value) NOEXCEPT
96+
{
97+
BC_ASSERT(stranded());
98+
99+
auto handler = std::bind(&channel::handle_monitor,
100+
shared_from_base<channel>(), _1);
101+
102+
value ? wait(std::move(handler)) : cancel(std::move(handler));
103+
}
104+
105+
void channel::handle_monitor(const code& ec) NOEXCEPT
106+
{
107+
BC_ASSERT(stranded());
108+
if (ec) stop(ec);
109+
}
110+
93111
// Timers.
94112
// ----------------------------------------------------------------------------
95113
// TODO: build DoS protection around rate_limit_, backlog(), total(), and time.

src/net/proxy.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,19 @@ void proxy::do_subscribe_stop(const result_handler& handler,
166166
complete(error::success);
167167
}
168168

169+
// Wait.
170+
// ----------------------------------------------------------------------------
171+
172+
void proxy::wait(result_handler&& handler) NOEXCEPT
173+
{
174+
socket_->wait(std::move(handler));
175+
}
176+
177+
void proxy::cancel(result_handler&& handler) NOEXCEPT
178+
{
179+
socket_->cancel(std::move(handler));
180+
}
181+
169182
// TCP.
170183
// ----------------------------------------------------------------------------
171184

src/net/socket.cpp

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,79 @@ asio::socket& socket::get_transport() NOEXCEPT
145145
return websocket() ? beast::get_lowest_layer(*websocket_) : socket_;
146146
}
147147

148+
// Wait.
149+
// ----------------------------------------------------------------------------
150+
151+
void socket::wait(result_handler&& handler) NOEXCEPT
152+
{
153+
boost::asio::dispatch(strand_,
154+
std::bind(&socket::do_wait,
155+
shared_from_this(), std::move(handler)));
156+
}
157+
158+
// private
159+
void socket::do_wait(const result_handler& handler) NOEXCEPT
160+
{
161+
BC_ASSERT(stranded());
162+
163+
get_transport().async_wait(asio::socket::wait_read,
164+
std::bind(&socket::handle_wait,
165+
shared_from_this(), _1, handler));
166+
}
167+
168+
// private
169+
void socket::handle_wait(const boost_code& ec,
170+
const result_handler& handler) NOEXCEPT
171+
{
172+
BC_ASSERT(stranded());
173+
174+
// Only wait cancel results in caller not calling stop.
175+
if (error::asio_is_canceled(ec))
176+
{
177+
handler(error::success);
178+
return;
179+
}
180+
181+
if (ec)
182+
{
183+
handler(error::asio_to_error_code(ec));
184+
return;
185+
}
186+
187+
handler(error::operation_canceled);
188+
}
189+
190+
void socket::cancel(result_handler&& handler) NOEXCEPT
191+
{
192+
boost::asio::dispatch(strand_,
193+
std::bind(&socket::do_cancel,
194+
shared_from_this(), std::move(handler)));
195+
}
196+
197+
// private
198+
void socket::do_cancel(const result_handler& handler) NOEXCEPT
199+
{
200+
BC_ASSERT(stranded());
201+
202+
if (stopped())
203+
{
204+
handler(error::success);
205+
return;
206+
}
207+
208+
try
209+
{
210+
// Causes connect, send, and receive calls to quit with
211+
// asio::error::operation_aborted passed to handlers.
212+
socket_.cancel();
213+
}
214+
catch (const std::exception& LOG_ONLY(e))
215+
{
216+
LOGF("Exception @ do_cancel: " << e.what());
217+
handler(error::service_stopped);
218+
}
219+
}
220+
148221
// Connection.
149222
// ----------------------------------------------------------------------------
150223
// Boost async functions are NOT THREAD SAFE for the same socket object.
@@ -168,7 +241,7 @@ void socket::accept(asio::acceptor& acceptor,
168241
// Dispatches on the acceptor's strand (which should be network).
169242
acceptor.async_accept(socket_,
170243
std::bind(&socket::handle_accept,
171-
shared_from_this(), _1, handler));
244+
shared_from_this(), _1, std::move(handler)));
172245
}
173246
catch (const std::exception& LOG_ONLY(e))
174247
{

src/protocols/protocol.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,13 @@ void protocol::resume() NOEXCEPT
104104
channel_->resume();
105105
}
106106

107+
// Monitor/unmonitor the socket for stop/cancel/bytes.
108+
void protocol::monitor(bool value) NOEXCEPT
109+
{
110+
BC_ASSERT(stranded());
111+
channel_->monitor(value);
112+
}
113+
107114
// Zero if timer expired.
108115
size_t protocol::remaining() const NOEXCEPT
109116
{

0 commit comments

Comments
 (0)