Skip to content

Commit 6552d3f

Browse files
authored
Don't reuse closed tcp connections (#70)
Signed-off-by: turuslan <[email protected]>
1 parent 658312c commit 6552d3f

File tree

2 files changed

+24
-30
lines changed

2 files changed

+24
-30
lines changed

src/network/impl/connection_manager_impl.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ namespace libp2p::network {
2222
ConnectionManager::ConnectionSPtr
2323
ConnectionManagerImpl::getBestConnectionForPeer(const peer::PeerId &p) const {
2424
// TODO(warchant): maybe make pluggable strategies
25-
auto c = getConnectionsToPeer(p);
26-
if (c.empty()) {
27-
return nullptr;
25+
for (auto &conn : getConnectionsToPeer(p)) {
26+
if (!conn->isClosed()) {
27+
// for now, return first connection
28+
return conn;
29+
}
2830
}
29-
30-
// for now, return first connection
31-
return c[0];
31+
return nullptr;
3232
}
3333

3434
ConnectionManager::Connectedness ConnectionManagerImpl::connectedness(

src/transport/tcp/tcp_connection.cpp

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -76,48 +76,42 @@ namespace libp2p::transport {
7676
});
7777
}
7878

79+
template <typename Callback>
80+
auto closeOnError(TcpConnection &conn, Callback &&cb) {
81+
return [cb{std::move(cb)}, conn{conn.shared_from_this()}](auto &&ec,
82+
auto &&result) {
83+
if (ec == boost::asio::error::broken_pipe) {
84+
std::ignore = conn->close();
85+
}
86+
if (ec) {
87+
return cb(std::forward<decltype(ec)>(ec));
88+
}
89+
cb(result);
90+
};
91+
}
92+
7993
void TcpConnection::read(gsl::span<uint8_t> out, size_t bytes,
8094
TcpConnection::ReadCallbackFunc cb) {
8195
boost::asio::async_read(socket_, detail::makeBuffer(out, bytes),
82-
[cb = std::move(cb)](auto &&ec, auto &&read) {
83-
if (ec) {
84-
return cb(std::forward<decltype(ec)>(ec));
85-
}
86-
return cb(read);
87-
});
96+
closeOnError(*this, cb));
8897
}
8998

9099
void TcpConnection::readSome(gsl::span<uint8_t> out, size_t bytes,
91100
TcpConnection::ReadCallbackFunc cb) {
92101
socket_.async_read_some(detail::makeBuffer(out, bytes),
93-
[cb = std::move(cb)](auto &&ec, auto &&read) {
94-
if (ec) {
95-
return cb(std::forward<decltype(ec)>(ec));
96-
}
97-
return cb(read);
98-
});
102+
closeOnError(*this, cb));
99103
}
100104

101105
void TcpConnection::write(gsl::span<const uint8_t> in, size_t bytes,
102106
TcpConnection::WriteCallbackFunc cb) {
103107
boost::asio::async_write(socket_, detail::makeBuffer(in, bytes),
104-
[cb = std::move(cb)](auto &&ec, auto &&written) {
105-
if (ec) {
106-
return cb(std::forward<decltype(ec)>(ec));
107-
}
108-
return cb(written);
109-
});
108+
closeOnError(*this, cb));
110109
}
111110

112111
void TcpConnection::writeSome(gsl::span<const uint8_t> in, size_t bytes,
113112
TcpConnection::WriteCallbackFunc cb) {
114113
socket_.async_write_some(detail::makeBuffer(in, bytes),
115-
[cb = std::move(cb)](auto &&ec, auto &&written) {
116-
if (ec) {
117-
return cb(std::forward<decltype(ec)>(ec));
118-
}
119-
return cb(written);
120-
});
114+
closeOnError(*this, cb));
121115
}
122116

123117
} // namespace libp2p::transport

0 commit comments

Comments
 (0)