Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions include/bitcoin/network/channels/channel_rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ class BCT_API channel_rpc
{
}

/// Serialize and write response to client (requires strand).
/// Completion handler is always invoked on the channel strand.
void send(rpc::response_t&& message, size_t size_hint,
result_handler&& handler) NOEXCEPT;

/// Resume reading from the socket (requires strand).
void resume() NOEXCEPT override;

/// Must call after successful message handling if no stop.
virtual void receive() NOEXCEPT;

/// Serialize and write response to client (requires strand).
/// Completion handler is always invoked on the channel strand.
virtual void send(rpc::response_t&& message, size_t size_hint,
result_handler&& handler) NOEXCEPT;

protected:
/// Stranded handler invoked from stop().
void stopping(const code& ec) NOEXCEPT override;
Expand Down
5 changes: 3 additions & 2 deletions include/bitcoin/network/net.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,10 @@ class BCT_API net
friend class session;

/// I/O factories.
virtual acceptor::ptr create_acceptor() NOEXCEPT;
virtual connector::ptr create_connector(bool seed=false) NOEXCEPT;
virtual acceptor::ptr create_acceptor(size_t maximum) NOEXCEPT;
virtual connectors_ptr create_connectors(size_t count) NOEXCEPT;
virtual connector::ptr create_connector(size_t maximum) NOEXCEPT;
virtual connector::ptr create_connector() NOEXCEPT;

/// Sequences.
virtual void do_start(const result_handler& handler) NOEXCEPT;
Expand Down
5 changes: 3 additions & 2 deletions include/bitcoin/network/net/acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class BCT_API acceptor
// ------------------------------------------------------------------------

/// Construct an instance.
acceptor(const logger& log, asio::strand& strand,
asio::io_context& service, std::atomic_bool& suspended) NOEXCEPT;
acceptor(const logger& log, asio::strand& strand, asio::io_context& service,
size_t maximum_request, std::atomic_bool& suspended) NOEXCEPT;

/// Asserts/logs stopped.
virtual ~acceptor() NOEXCEPT;
Expand Down Expand Up @@ -81,6 +81,7 @@ class BCT_API acceptor
virtual code start(const asio::endpoint& point) NOEXCEPT;

// These are thread safe.
const size_t maximum_;
asio::io_context& service_;
asio::strand& strand_;
std::atomic_bool& suspended_;
Expand Down
3 changes: 2 additions & 1 deletion include/bitcoin/network/net/connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BCT_API connector
/// Construct an instance.
connector(const logger& log, asio::strand& strand,
asio::io_context& service, const steady_clock::duration& timeout,
std::atomic_bool& suspended) NOEXCEPT;
size_t maximum_request, std::atomic_bool& suspended) NOEXCEPT;

/// Asserts/logs stopped.
virtual ~connector() NOEXCEPT;
Expand Down Expand Up @@ -88,6 +88,7 @@ class BCT_API connector
const config::address& host, socket_handler&& handler) NOEXCEPT;

// These are thread safe
const size_t maximum_;
asio::io_context& service_;
asio::strand& strand_;
std::atomic_bool& suspended_;
Expand Down
7 changes: 4 additions & 3 deletions include/bitcoin/network/net/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ class BCT_API socket
DELETE_COPY_MOVE(socket);

/// Use only for incoming connections (defaults outgoing address).
socket(const logger& log, asio::io_context& service) NOEXCEPT;
socket(const logger& log, asio::io_context& service,
size_t maximum_request) NOEXCEPT;

/// Use only for outgoing connections (retains outgoing address).
socket(const logger& log, asio::io_context& service,
const config::address& address) NOEXCEPT;
size_t maximum_request, const config::address& address) NOEXCEPT;

/// Asserts/logs stopped.
virtual ~socket() NOEXCEPT;
Expand Down Expand Up @@ -284,12 +285,12 @@ class BCT_API socket

protected:
// These are thread safe.
const size_t maximum_;
asio::strand strand_;
asio::io_context& service_;
std::atomic_bool stopped_{};

// These are protected by strand (see also handle_accept).
size_t maximum_;
asio::socket socket_;
config::address address_;
config::authority authority_{};
Expand Down
9 changes: 6 additions & 3 deletions include/bitcoin/network/sessions/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,13 @@ class BCT_API session
/// -----------------------------------------------------------------------

/// Call to create channel acceptor.
virtual acceptor::ptr create_acceptor() NOEXCEPT;
virtual acceptor::ptr create_acceptor(size_t maximum) NOEXCEPT;

/// Call to create channel connector (option for seed connection timeout).
virtual connector::ptr create_connector(bool seed=false) NOEXCEPT;
/// Call to create channel connector for seed connections.
virtual connector::ptr create_connector() NOEXCEPT;

/// Call to create channel connector for manual/outbound connections.
virtual connector::ptr create_connector(size_t maximum) NOEXCEPT;

/// Call to create a set of channel connectors.
virtual connectors_ptr create_connectors(size_t count) NOEXCEPT;
Expand Down
1 change: 0 additions & 1 deletion include/bitcoin/network/settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ struct BCT_API settings

/// Helpers.
virtual bool witness_node() const NOEXCEPT;
virtual size_t maximum_payload() const NOEXCEPT;
virtual steady_clock::duration retry_timeout() const NOEXCEPT;
virtual steady_clock::duration connect_timeout() const NOEXCEPT;
virtual steady_clock::duration channel_handshake() const NOEXCEPT;
Expand Down
2 changes: 1 addition & 1 deletion src/channels/channel_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void channel_peer::handle_read_heading(const code& ec, size_t) NOEXCEPT
return;
}

if (head->payload_size > settings().maximum_payload())
if (head->payload_size > options().maximum_request)
{
LOGR("Oversized payload indicated by " << head->command
<< " heading from [" << authority() << "] ("
Expand Down
33 changes: 21 additions & 12 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,41 @@ net::~net() NOEXCEPT
// I/O factories.
// ----------------------------------------------------------------------------

acceptor::ptr net::create_acceptor() NOEXCEPT
// inbound/server
acceptor::ptr net::create_acceptor(size_t maximum) NOEXCEPT
{
return emplace_shared<acceptor>(log, strand(), service(),
return emplace_shared<acceptor>(log, strand(), service(), maximum,
accept_suspended_);
}

connector::ptr net::create_connector(bool seed) NOEXCEPT
{
const auto timeout = seed ? settings_.outbound.seeding_timeout() :
settings_.connect_timeout();

return emplace_shared<connector>(log, strand(), service(), timeout,
connect_suspended_);
}

// outbound (batch)
connectors_ptr net::create_connectors(size_t count) NOEXCEPT
{
const auto connects = to_shared<connectors>();
connects->reserve(count);

const auto maximum = settings_.outbound.maximum_request;
for (size_t connect{}; connect < count; ++connect)
connects->push_back(create_connector());
connects->push_back(create_connector(maximum));

return connects;
}

// manual/outbound
connector::ptr net::create_connector(size_t maximum) NOEXCEPT
{
return emplace_shared<connector>(log, strand(), service(),
settings_.connect_timeout(), maximum, connect_suspended_);
}

// seed
connector::ptr net::create_connector() NOEXCEPT
{
return emplace_shared<connector>(log, strand(), service(),
settings_.outbound.seeding_timeout(),
settings_.outbound.maximum_request, connect_suspended_);
}

// Start sequence.
// ----------------------------------------------------------------------------

Expand Down
9 changes: 6 additions & 3 deletions src/net/acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ using namespace std::placeholders;
// Calls are stranded to protect the acceptor member.

acceptor::acceptor(const logger& log, asio::strand& strand,
asio::io_context& service, std::atomic_bool& suspended) NOEXCEPT
: service_(service),
asio::io_context& service, size_t maximum_request,
std::atomic_bool& suspended) NOEXCEPT
: maximum_(maximum_request),
service_(service),
strand_(strand),
suspended_(suspended),
acceptor_(strand_),
Expand Down Expand Up @@ -144,7 +146,8 @@ void acceptor::accept(socket_handler&& handler) NOEXCEPT
}

// Create the socket.
const auto socket = std::make_shared<network::socket>(log, service_);
const auto socket = std::make_shared<network::socket>(log, service_,
maximum_);

// Posts handle_accept to the acceptor's strand.
// Establishes a socket connection by waiting on the socket.
Expand Down
8 changes: 5 additions & 3 deletions src/net/connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ using namespace std::placeholders;

connector::connector(const logger& log, asio::strand& strand,
asio::io_context& service, const steady_clock::duration& timeout,
std::atomic_bool& suspended) NOEXCEPT
: service_(service),
size_t maximum_request, std::atomic_bool& suspended) NOEXCEPT
: maximum_(maximum_request),
service_(service),
strand_(strand),
suspended_(suspended),
resolver_(strand),
Expand Down Expand Up @@ -122,7 +123,8 @@ void connector::start(const std::string& hostname, uint16_t port,

// Create a socket and shared finish context.
const auto finish = std::make_shared<bool>(false);
const auto socket = std::make_shared<network::socket>(log, service_, host);
const auto socket = std::make_shared<network::socket>(log, service_,
maximum_, host);

// Posts handle_timer to strand.
timer_->start(
Expand Down
13 changes: 7 additions & 6 deletions src/net/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,19 @@ BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
// ----------------------------------------------------------------------------

// authority_.port() zero implies inbound connection.
socket::socket(const logger& log, asio::io_context& service) NOEXCEPT
: socket(log, service, config::address{})
socket::socket(const logger& log, asio::io_context& service,
size_t maximum_request) NOEXCEPT
: socket(log, service, maximum_request, config::address{})
{
}

// authority_.port() nonzero implies outbound connection.
socket::socket(const logger& log, asio::io_context& service,
const config::address& address) NOEXCEPT
: strand_(service.get_executor()),
maximum_(5u * 1024u * 1024u),
socket_(strand_),
size_t maximum_request, const config::address& address) NOEXCEPT
: maximum_(maximum_request),
strand_(service.get_executor()),
service_(service),
socket_(strand_),
address_(address),
reporter(log),
tracker<socket>(log)
Expand Down
21 changes: 15 additions & 6 deletions src/sessions/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,19 +342,28 @@ void session::unsubscribe_close() NOEXCEPT
// Factories.
// ----------------------------------------------------------------------------

acceptor::ptr session::create_acceptor() NOEXCEPT
// inbound/server
acceptor::ptr session::create_acceptor(size_t maximum) NOEXCEPT
{
return network_.create_acceptor();
return network_.create_acceptor(maximum);
}

connector::ptr session::create_connector(bool seed) NOEXCEPT
// outbound (batch)
connectors_ptr session::create_connectors(size_t count) NOEXCEPT
{
return network_.create_connector(seed);
return network_.create_connectors(count);
}

connectors_ptr session::create_connectors(size_t count) NOEXCEPT
// manual/outbound
connector::ptr session::create_connector(size_t maximum) NOEXCEPT
{
return network_.create_connectors(count);
return network_.create_connector(maximum);
}

// seed
connector::ptr session::create_connector() NOEXCEPT
{
return network_.create_connector();
}

// Properties.
Expand Down
3 changes: 2 additions & 1 deletion src/sessions/session_inbound.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ void session_inbound::handle_started(const code& ec,
LOGN("Accepting " << settings().inbound.connections << " peers on "
<< settings().inbound.binds.size() << " bindings.");

const auto maximum = settings().inbound.maximum_request;
for (const auto& bind: settings().inbound.binds)
{
const auto acceptor = create_acceptor();
const auto acceptor = create_acceptor(maximum);

// Require that all acceptors at least start.
if (const auto error_code = acceptor->start(bind))
Expand Down
2 changes: 1 addition & 1 deletion src/sessions/session_manual.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void session_manual::connect(const config::endpoint& peer,
BC_ASSERT(stranded());

// Create a persistent connector for the manual connection.
const auto connector = create_connector();
const auto connector = create_connector(settings().manual.maximum_request);

subscribe_stop([=](const code&) NOEXCEPT
{
Expand Down
2 changes: 1 addition & 1 deletion src/sessions/session_seed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void session_seed::handle_started(const code& ec,

for (const auto& seed: settings().outbound.seeds)
{
const auto connector = create_connector(true);
const auto connector = create_connector();
subscribe_stop([=](const code&) NOEXCEPT
{
connector->stop();
Expand Down
3 changes: 2 additions & 1 deletion src/sessions/session_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ void session_server::handle_started(const code& ec,
LOGN("Accepting " << options_.connections << " " << name_
<< " connections on " << options_.binds.size() << " bindings.");

const auto maximum = options_.maximum_request;
for (const auto& bind: options_.binds)
{
const auto acceptor = create_acceptor();
const auto acceptor = create_acceptor(maximum);

// Require that all acceptors at least start.
if (const auto error_code = acceptor->start(bind))
Expand Down
6 changes: 0 additions & 6 deletions src/settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,6 @@ bool settings::witness_node() const NOEXCEPT
return to_bool(services_minimum & service::node_witness);
}

size_t settings::maximum_payload() const NOEXCEPT
{
return heading::maximum_payload(protocol_maximum,
to_bool(services_maximum & service::node_witness));
}

// Randomized from 50% to maximum milliseconds (specified in seconds).
steady_clock::duration settings::retry_timeout() const NOEXCEPT
{
Expand Down
2 changes: 1 addition & 1 deletion test/channels/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ BOOST_AUTO_TEST_CASE(channel__stopped__default__false)
threadpool pool{ one };
asio::strand strand(pool.service().get_executor());
const settings set(bc::system::chain::selection::mainnet);
auto socket_ptr = std::make_shared<network::socket>(log, pool.service());
auto socket_ptr = std::make_shared<network::socket>(log, pool.service(), 42);
auto channel_ptr = std::make_shared<accessor>(log, socket_ptr, expected, set, set.outbound);
BOOST_REQUIRE(!channel_ptr->stopped());
BOOST_REQUIRE_NE(channel_ptr->nonce(), zero);
Expand Down
Loading
Loading