Skip to content

Commit 3577a89

Browse files
authored
Merge pull request #870 from AntelopeIO/revert-853-GH-525-p2p-resolve
[1.0.2] Revert "[1.0.2] P2P: Resolve on reconnect"
2 parents eb40e5e + 7b2ce92 commit 3577a89

File tree

1 file changed

+103
-98
lines changed

1 file changed

+103
-98
lines changed

plugins/net_plugin/net_plugin.cpp

Lines changed: 103 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,8 @@ namespace eosio {
351351
struct connection_detail {
352352
std::string host;
353353
connection_ptr c;
354+
tcp::endpoint active_ip;
355+
tcp::resolver::results_type ips;
354356
};
355357

356358
using connection_details_index = multi_index_container<
@@ -402,8 +404,6 @@ namespace eosio {
402404
boost::asio::steady_timer::duration conn_period,
403405
uint32_t maximum_client_count);
404406

405-
std::chrono::milliseconds get_heartbeat_timeout() const { return heartbeat_timeout; }
406-
407407
uint32_t get_max_client_count() const { return max_client_count; }
408408

409409
fc::microseconds get_connector_period() const;
@@ -421,6 +421,8 @@ namespace eosio {
421421
void add(connection_ptr c);
422422
string connect(const string& host, const string& p2p_address);
423423
string resolve_and_connect(const string& host, const string& p2p_address);
424+
void update_connection_endpoint(connection_ptr c, const tcp::endpoint& endpoint);
425+
void connect(const connection_ptr& c);
424426
string disconnect(const string& host);
425427
void close_all();
426428

@@ -1013,7 +1015,7 @@ namespace eosio {
10131015

10141016
bool populate_handshake( handshake_message& hello ) const;
10151017

1016-
bool resolve_and_connect();
1018+
bool reconnect();
10171019
void connect( const tcp::resolver::results_type& endpoints );
10181020
void start_read_message();
10191021

@@ -1190,21 +1192,16 @@ namespace eosio {
11901192
};
11911193

11921194

1193-
std::tuple<std::string, std::string, std::string> split_host_port_type(const std::string& peer_add, bool incoming) {
1195+
std::tuple<std::string, std::string, std::string> split_host_port_type(const std::string& peer_add) {
11941196
// host:port:[<trx>|<blk>]
11951197
if (peer_add.empty()) return {};
11961198

11971199
string::size_type p = peer_add[0] == '[' ? peer_add.find(']') : 0;
1198-
string::size_type colon = p != string::npos ? peer_add.find(':', p) : string::npos;
1199-
if (colon == std::string::npos || colon == 0) {
1200-
// if incoming then not an error this peer can do anything about
1201-
if (incoming) {
1202-
fc_dlog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_add) );
1203-
} else {
1204-
fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_add) );
1205-
}
1200+
if (p == string::npos) {
1201+
fc_wlog( logger, "Invalid peer address: ${peer}", ("peer", peer_add) );
12061202
return {};
12071203
}
1204+
string::size_type colon = peer_add.find(':', p);
12081205
string::size_type colon2 = peer_add.find(':', colon + 1);
12091206
string::size_type end = colon2 == string::npos
12101207
? string::npos : peer_add.find_first_of( " :+=.,<>!$%^&(*)|-#@\t", colon2 + 1 ); // future proof by including most symbols without using regex
@@ -1281,8 +1278,8 @@ namespace eosio {
12811278
last_handshake_sent(),
12821279
p2p_address( endpoint )
12831280
{
1284-
set_connection_type( peer_address() );
12851281
my_impl->mark_bp_connection(this);
1282+
update_endpoints();
12861283
fc_ilog( logger, "created connection - ${c} to ${n}", ("c", connection_id)("n", endpoint) );
12871284
}
12881285

@@ -1297,6 +1294,7 @@ namespace eosio {
12971294
last_handshake_recv(),
12981295
last_handshake_sent()
12991296
{
1297+
update_endpoints();
13001298
fc_dlog( logger, "new connection - ${c} object created for peer ${address}:${port} from listener ${addr}",
13011299
("c", connection_id)("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", listen_address) );
13021300
}
@@ -1329,7 +1327,7 @@ namespace eosio {
13291327

13301328
// called from connection strand
13311329
void connection::set_connection_type( const std::string& peer_add ) {
1332-
auto [host, port, type] = split_host_port_type(peer_add, false);
1330+
auto [host, port, type] = split_host_port_type(peer_add);
13331331
if( type.empty() ) {
13341332
fc_dlog( logger, "Setting connection - ${c} type for: ${peer} to both transactions and blocks", ("c", connection_id)("peer", peer_add) );
13351333
connection_type = both;
@@ -1390,7 +1388,6 @@ namespace eosio {
13901388
bool connection::start_session() {
13911389
verify_strand_in_this_thread( strand, __func__, __LINE__ );
13921390

1393-
update_endpoints();
13941391
boost::asio::ip::tcp::no_delay nodelay( true );
13951392
boost::system::error_code ec;
13961393
socket->set_option( nodelay, ec );
@@ -2813,6 +2810,31 @@ namespace eosio {
28132810

28142811
//------------------------------------------------------------------------
28152812

2813+
bool connection::reconnect() {
2814+
switch ( no_retry ) {
2815+
case no_reason:
2816+
case wrong_version:
2817+
case benign_other:
2818+
case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate
2819+
break;
2820+
default:
2821+
fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
2822+
return false;
2823+
}
2824+
if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) {
2825+
fc::microseconds connector_period = my_impl->connections.get_connector_period();
2826+
fc::lock_guard g( conn_mtx );
2827+
if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) {
2828+
return true; // true so doesn't remove from valid connections
2829+
}
2830+
}
2831+
connection_ptr c = shared_from_this();
2832+
strand.post([c]() {
2833+
my_impl->connections.connect(c);
2834+
});
2835+
return true;
2836+
}
2837+
28162838
// called from connection strand
28172839
void connection::connect( const tcp::resolver::results_type& endpoints ) {
28182840
set_state(connection_state::connecting);
@@ -2822,6 +2844,7 @@ namespace eosio {
28222844
boost::asio::bind_executor( strand,
28232845
[c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) {
28242846
if( !err && socket->is_open() && socket == c->socket ) {
2847+
my_impl->connections.update_connection_endpoint(c, endpoint);
28252848
c->update_endpoints(endpoint);
28262849
if( c->start_session() ) {
28272850
c->send_handshake();
@@ -2871,7 +2894,7 @@ namespace eosio {
28712894
fc_ilog(logger, "Accepted new connection: " + paddr_str);
28722895

28732896
connections.any_of_supplied_peers([&listen_address, &paddr_str, &paddr_desc, &limit](const string& peer_addr) {
2874-
auto [host, port, type] = split_host_port_type(peer_addr, false);
2897+
auto [host, port, type] = split_host_port_type(peer_addr);
28752898
if (host == paddr_str) {
28762899
if (limit > 0) {
28772900
fc_dlog(logger, "Connection inbound to ${la} from ${a} is a configured p2p-peer-address and will not be throttled", ("la", listen_address)("a", paddr_desc));
@@ -3367,9 +3390,9 @@ namespace eosio {
33673390
}
33683391

33693392
if( incoming() ) {
3370-
auto [host, port, type] = split_host_port_type(msg.p2p_address, true);
3393+
auto [host, port, type] = split_host_port_type(msg.p2p_address);
33713394
if (host.size())
3372-
set_connection_type( msg.p2p_address);
3395+
set_connection_type( msg.p2p_address );
33733396

33743397
peer_dlog( this, "checking for duplicate" );
33753398
auto is_duplicate = [&](const connection_ptr& check) {
@@ -4584,7 +4607,7 @@ namespace eosio {
45844607
//----------------------------------------------------------------------------
45854608

45864609
size_t connections_manager::number_connections() const {
4587-
std::shared_lock g(connections_mtx);
4610+
std::lock_guard g(connections_mtx);
45884611
return connections.size();
45894612
}
45904613

@@ -4613,9 +4636,8 @@ namespace eosio {
46134636
update_p2p_connection_metrics = std::move(fun);
46144637
}
46154638

4616-
// can be called from any thread
46174639
void connections_manager::connect_supplied_peers(const string& p2p_address) {
4618-
std::shared_lock g(connections_mtx);
4640+
std::unique_lock g(connections_mtx);
46194641
chain::flat_set<string> peers = supplied_peers;
46204642
g.unlock();
46214643
for (const auto& peer : peers) {
@@ -4625,9 +4647,12 @@ namespace eosio {
46254647

46264648
void connections_manager::add( connection_ptr c ) {
46274649
std::lock_guard g( connections_mtx );
4650+
boost::system::error_code ec;
4651+
auto endpoint = c->socket->remote_endpoint(ec);
46284652
connections.insert( connection_detail{
46294653
.host = c->peer_address(),
4630-
.c = std::move(c)} );
4654+
.c = std::move(c),
4655+
.active_ip = endpoint} );
46314656
}
46324657

46334658
// called by API
@@ -4639,72 +4664,62 @@ namespace eosio {
46394664
}
46404665

46414666
string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address ) {
4642-
auto [host, port, type] = split_host_port_type(peer_address, false);
4643-
if (host.empty()) {
4667+
string::size_type colon = peer_address.find(':');
4668+
if (colon == std::string::npos || colon == 0) {
4669+
fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_address) );
46444670
return "invalid peer address";
46454671
}
46464672

4647-
{
4648-
std::shared_lock g( connections_mtx );
4649-
if( find_connection_i( peer_address ) )
4650-
return "already connected";
4651-
}
4652-
4653-
connection_ptr c = std::make_shared<connection>( peer_address, listen_address );
4654-
if (c->resolve_and_connect()) {
4655-
add(std::move(c));
4656-
4657-
return "added connection";
4658-
}
4673+
std::lock_guard g( connections_mtx );
4674+
if( find_connection_i( peer_address ) )
4675+
return "already connected";
4676+
4677+
auto [host, port, type] = split_host_port_type(peer_address);
4678+
4679+
auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );
4680+
4681+
resolver->async_resolve(host, port,
4682+
[resolver, host = host, port = port, peer_address = peer_address, listen_address = listen_address, this]( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
4683+
connection_ptr c = std::make_shared<connection>( peer_address, listen_address );
4684+
c->set_heartbeat_timeout( heartbeat_timeout );
4685+
std::lock_guard g( connections_mtx );
4686+
auto [it, inserted] = connections.emplace( connection_detail{
4687+
.host = peer_address,
4688+
.c = std::move(c),
4689+
.ips = results
4690+
});
4691+
if( !err ) {
4692+
it->c->connect( results );
4693+
} else {
4694+
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
4695+
("host", host)("port", port)( "error", err.message() ) );
4696+
it->c->set_state(connection::connection_state::closed);
4697+
++(it->c->consecutive_immediate_connection_close);
4698+
}
4699+
} );
46594700

4660-
return "connection failed";
4701+
return "added connection";
46614702
}
46624703

4663-
// called from any thread
4664-
bool connection::resolve_and_connect() {
4665-
switch ( no_retry ) {
4666-
case no_reason:
4667-
case wrong_version:
4668-
case benign_other:
4669-
case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate
4670-
break;
4671-
default:
4672-
fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
4673-
return false;
4704+
void connections_manager::update_connection_endpoint(connection_ptr c,
4705+
const tcp::endpoint& endpoint) {
4706+
std::unique_lock g( connections_mtx );
4707+
auto& index = connections.get<by_connection>();
4708+
const auto& it = index.find(c);
4709+
if( it != index.end() ) {
4710+
index.modify(it, [endpoint](connection_detail& cd) {
4711+
cd.active_ip = endpoint;
4712+
});
46744713
}
4714+
}
46754715

4676-
auto [host, port, type] = split_host_port_type(peer_address(), false);
4677-
if (host.empty())
4678-
return false;
4679-
4680-
connection_ptr c = shared_from_this();
4681-
4682-
if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) {
4683-
fc::microseconds connector_period = my_impl->connections.get_connector_period();
4684-
fc::lock_guard g( conn_mtx );
4685-
if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) {
4686-
return true; // true so doesn't remove from valid connections
4687-
}
4716+
void connections_manager::connect(const connection_ptr& c) {
4717+
std::lock_guard g( connections_mtx );
4718+
const auto& index = connections.get<by_connection>();
4719+
const auto& it = index.find(c);
4720+
if( it != index.end() ) {
4721+
it->c->connect( it->ips );
46884722
}
4689-
4690-
strand.post([c, host, port]() {
4691-
auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );
4692-
resolver->async_resolve(host, port,
4693-
[resolver, c, host, port]
4694-
( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
4695-
c->set_heartbeat_timeout( my_impl->connections.get_heartbeat_timeout() );
4696-
if( !err ) {
4697-
c->connect( results );
4698-
} else {
4699-
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
4700-
("host", host)("port", port)( "error", err.message() ) );
4701-
c->set_state(connection::connection_state::closed);
4702-
++c->consecutive_immediate_connection_close;
4703-
}
4704-
} );
4705-
} );
4706-
4707-
return true;
47084723
}
47094724

47104725
// called by API
@@ -4733,31 +4748,21 @@ namespace eosio {
47334748
}
47344749

47354750
std::optional<connection_status> connections_manager::status( const string& host )const {
4736-
connection_ptr con;
4737-
{
4738-
std::shared_lock g( connections_mtx );
4739-
con = find_connection_i( host );
4740-
}
4751+
std::shared_lock g( connections_mtx );
4752+
auto con = find_connection_i( host );
47414753
if( con ) {
47424754
return con->get_status();
47434755
}
47444756
return {};
47454757
}
47464758

47474759
vector<connection_status> connections_manager::connection_statuses()const {
4748-
vector<connection_ptr> conns;
47494760
vector<connection_status> result;
4750-
{
4751-
std::shared_lock g( connections_mtx );
4752-
auto& index = connections.get<by_connection>();
4753-
result.reserve( index.size() );
4754-
conns.reserve( index.size() );
4755-
for( const connection_detail& cd : index ) {
4756-
conns.emplace_back( cd.c );
4757-
}
4758-
}
4759-
for (const auto& c : conns) {
4760-
result.push_back( c->get_status() );
4761+
std::shared_lock g( connections_mtx );
4762+
auto& index = connections.get<by_connection>();
4763+
result.reserve( index.size() );
4764+
for( const connection_detail& cd : index ) {
4765+
result.emplace_back( cd.c->get_status() );
47614766
}
47624767
return result;
47634768
}
@@ -4819,7 +4824,7 @@ namespace eosio {
48194824
auto cleanup = [&num_peers, &num_rm, this](vector<connection_ptr>&& reconnecting,
48204825
vector<connection_ptr>&& removing) {
48214826
for( auto& c : reconnecting ) {
4822-
if (!c->resolve_and_connect()) {
4827+
if (!c->reconnect()) {
48234828
--num_peers;
48244829
++num_rm;
48254830
removing.push_back(c);
@@ -4885,7 +4890,7 @@ namespace eosio {
48854890
assert(update_p2p_connection_metrics);
48864891
auto from = from_connection.lock();
48874892
std::shared_lock g(connections_mtx);
4888-
const auto& index = connections.get<by_connection>();
4893+
auto& index = connections.get<by_connection>();
48894894
size_t num_clients = 0, num_peers = 0, num_bp_peers = 0;
48904895
net_plugin::p2p_per_connection_metrics per_connection(index.size());
48914896
for (auto it = index.begin(); it != index.end(); ++it) {

0 commit comments

Comments
 (0)