Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/bitcoin/network/channels/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class BCT_API channel
/// Resume reading from the socket, starts timers (requires strand).
void resume() NOEXCEPT override;

/// Monitor/unmonitor the socket for cancel/write (requires strand).
void monitor(bool value) NOEXCEPT;

/// Seconds before channel expires, zero if expired (requires strand).
size_t remaining() const NOEXCEPT;

Expand Down Expand Up @@ -106,6 +109,8 @@ class BCT_API channel
void start_inactivity() NOEXCEPT;
void handle_inactivity(const code& ec) NOEXCEPT;

void handle_monitor(const code& ec) NOEXCEPT;

// These are thread safe (const).
const settings_t& settings_;
const uint64_t identifier_;
Expand Down
9 changes: 9 additions & 0 deletions include/bitcoin/network/net/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ class BCT_API proxy
/// Subscribe to stop notification (requires strand).
void subscribe_stop(result_handler&& handler) NOEXCEPT;

/// Wait.
/// -----------------------------------------------------------------------

/// Wait on a peer close/cancel/send, no data capture/loss.
virtual void wait(result_handler&& handler) NOEXCEPT;

/// Cancel wait or any asynchronous read/write operation, handlers posted.
virtual void cancel(result_handler&& handler) NOEXCEPT;

/// TCP.
/// -----------------------------------------------------------------------

Expand Down
19 changes: 19 additions & 0 deletions include/bitcoin/network/net/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ class BCT_API socket
/// Same as stop but provides graceful shutdown for websocket connections.
virtual void async_stop() NOEXCEPT;

/// Wait.
/// -----------------------------------------------------------------------

/// Wait on a peer close/cancel/send, no data capture/loss.
virtual void wait(result_handler&& handler) NOEXCEPT;

/// Cancel wait or any asynchronous read/write operation, handlers posted.
virtual void cancel(result_handler&& handler) NOEXCEPT;

/// Connection.
/// -----------------------------------------------------------------------

Expand Down Expand Up @@ -145,6 +154,12 @@ class BCT_API socket
void do_async_stop() NOEXCEPT;
asio::socket& get_transport() NOEXCEPT;

// wait
// ------------------------------------------------------------------------

void do_wait(const result_handler& handler) NOEXCEPT;
void do_cancel(const result_handler& handler) NOEXCEPT;

// stranded
// ------------------------------------------------------------------------

Expand Down Expand Up @@ -179,6 +194,10 @@ class BCT_API socket
// completion
// ------------------------------------------------------------------------

// wait
void handle_wait(const boost_code& ec,
const result_handler& handler) NOEXCEPT;

// connection
void handle_accept(const boost_code& ec,
const result_handler& handler) NOEXCEPT;
Expand Down
3 changes: 3 additions & 0 deletions include/bitcoin/network/protocols/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ class BCT_API protocol
/// Resume reading from the socket, starts timers (requires strand).
virtual void resume() NOEXCEPT;

/// Monitor/unmonitor the socket for cancel/write (requires strand).
virtual void monitor(bool value) NOEXCEPT;

/// Seconds before channel expires, zero if expired (requires strand).
virtual size_t remaining() const NOEXCEPT;

Expand Down
18 changes: 18 additions & 0 deletions src/channels/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
namespace libbitcoin {
namespace network {

#define CLASS channel

using namespace system;
using namespace std::placeholders;

Expand Down Expand Up @@ -90,6 +92,22 @@ void channel::resume() NOEXCEPT
proxy::resume();
}

void channel::monitor(bool value) NOEXCEPT
{
BC_ASSERT(stranded());

auto handler = std::bind(&channel::handle_monitor,
shared_from_base<channel>(), _1);

value ? wait(std::move(handler)) : cancel(std::move(handler));
}

void channel::handle_monitor(const code& ec) NOEXCEPT
{
BC_ASSERT(stranded());
if (ec) stop(ec);
}

// Timers.
// ----------------------------------------------------------------------------
// TODO: build DoS protection around rate_limit_, backlog(), total(), and time.
Expand Down
13 changes: 13 additions & 0 deletions src/net/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,19 @@ void proxy::do_subscribe_stop(const result_handler& handler,
complete(error::success);
}

// Wait.
// ----------------------------------------------------------------------------

void proxy::wait(result_handler&& handler) NOEXCEPT
{
socket_->wait(std::move(handler));
}

void proxy::cancel(result_handler&& handler) NOEXCEPT
{
socket_->cancel(std::move(handler));
}

// TCP.
// ----------------------------------------------------------------------------

Expand Down
75 changes: 74 additions & 1 deletion src/net/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,79 @@ asio::socket& socket::get_transport() NOEXCEPT
return websocket() ? beast::get_lowest_layer(*websocket_) : socket_;
}

// Wait.
// ----------------------------------------------------------------------------

void socket::wait(result_handler&& handler) NOEXCEPT
{
boost::asio::dispatch(strand_,
std::bind(&socket::do_wait,
shared_from_this(), std::move(handler)));
}

// private
void socket::do_wait(const result_handler& handler) NOEXCEPT
{
BC_ASSERT(stranded());

get_transport().async_wait(asio::socket::wait_read,
std::bind(&socket::handle_wait,
shared_from_this(), _1, handler));
}

// private
void socket::handle_wait(const boost_code& ec,
const result_handler& handler) NOEXCEPT
{
BC_ASSERT(stranded());

// Only wait cancel results in caller not calling stop.
if (error::asio_is_canceled(ec))
{
handler(error::success);
return;
}

if (ec)
{
handler(error::asio_to_error_code(ec));
return;
}

handler(error::operation_canceled);
}

void socket::cancel(result_handler&& handler) NOEXCEPT
{
boost::asio::dispatch(strand_,
std::bind(&socket::do_cancel,
shared_from_this(), std::move(handler)));
}

// private
void socket::do_cancel(const result_handler& handler) NOEXCEPT
{
BC_ASSERT(stranded());

if (stopped())
{
handler(error::success);
return;
}

try
{
// Causes connect, send, and receive calls to quit with
// asio::error::operation_aborted passed to handlers.
socket_.cancel();
}
catch (const std::exception& LOG_ONLY(e))
{
LOGF("Exception @ do_cancel: " << e.what());
handler(error::service_stopped);
}
}

// Connection.
// ----------------------------------------------------------------------------
// Boost async functions are NOT THREAD SAFE for the same socket object.
Expand All @@ -168,7 +241,7 @@ void socket::accept(asio::acceptor& acceptor,
// Dispatches on the acceptor's strand (which should be network).
acceptor.async_accept(socket_,
std::bind(&socket::handle_accept,
shared_from_this(), _1, handler));
shared_from_this(), _1, std::move(handler)));
}
catch (const std::exception& LOG_ONLY(e))
{
Expand Down
7 changes: 7 additions & 0 deletions src/protocols/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ void protocol::resume() NOEXCEPT
channel_->resume();
}

// Monitor/unmonitor the socket for stop/cancel/bytes.
void protocol::monitor(bool value) NOEXCEPT
{
BC_ASSERT(stranded());
channel_->monitor(value);
}

// Zero if timer expired.
size_t protocol::remaining() const NOEXCEPT
{
Expand Down
Loading