diff --git a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp index 7644a996..d1c0d24d 100644 --- a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp +++ b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp @@ -84,9 +84,6 @@ namespace libp2p::protocol::kademlia { std::shared_ptr openSession( std::shared_ptr stream) override; - /// @see SessionHost::closeSession - void closeSession(std::shared_ptr stream) override; - private: void onPutValue(const std::shared_ptr &session, Message &&msg); void onGetValue(const std::shared_ptr &session, Message &&msg); @@ -142,26 +139,6 @@ namespace libp2p::protocol::kademlia { event::Handle new_connection_subscription_; event::Handle on_disconnected_; - struct StreamPtrComparator { - bool operator()(const std::shared_ptr &lhs, - const std::shared_ptr &rhs) const { - return lhs.get() == rhs.get(); - } - }; - - struct StreamPtrHaher { - size_t operator()(const std::shared_ptr &s) const { - auto r = std::hash()(s.get()); - return r; - } - }; - - std::unordered_map, - std::shared_ptr, - StreamPtrHaher, - StreamPtrComparator> - sessions_; - // Random walk's auxiliary data struct { size_t iteration = 0; diff --git a/include/libp2p/protocol/kademlia/impl/session.hpp b/include/libp2p/protocol/kademlia/impl/session.hpp index 351c90df..d77d9239 100644 --- a/include/libp2p/protocol/kademlia/impl/session.hpp +++ b/include/libp2p/protocol/kademlia/impl/session.hpp @@ -6,85 +6,54 @@ #pragma once +#include #include -#include -#include -#include -#include -#include -#include -#include +#include -namespace libp2p::protocol::kademlia { +namespace libp2p::basic { + class MessageReadWriterUvarint; + class Scheduler; +} // namespace libp2p::basic - class FindPeerExecutor; +namespace libp2p::protocol::kademlia { + class ResponseHandler; + class SessionHost; +} // namespace libp2p::protocol::kademlia +namespace libp2p::protocol::kademlia { class Session : public std::enable_shared_from_this { public: - Session(std::weak_ptr session_host, - std::weak_ptr scheduler, + Session(std::weak_ptr scheduler, std::shared_ptr stream, - Time operations_timeout = Time::zero()); - + Time operations_timeout); ~Session(); - std::shared_ptr stream() const { - return stream_; - } - - boost::optional expectedResponseType() const { - return expected_response_type_; - } - - bool read(); + using OnRead = std::function)>; + void read(OnRead on_read); + using OnWrite = std::function)>; + void write(BytesIn frame, OnWrite on_write); - bool write(const std::shared_ptr> &buffer, - const std::shared_ptr &response_handler); + void read(std::weak_ptr weak_session_host); + void read(std::shared_ptr response_handler); + void write(const Message &msg, + std::weak_ptr weak_session_host); + void write(BytesIn frame, + std::shared_ptr response_handler); + void write(BytesIn frame); - bool canBeClosed() const { - return not closed_ && writing_ == 0 && reading_ == 0; + std::shared_ptr stream() const { + return stream_; } - void close(outcome::result = outcome::success()); - private: - void onLengthRead(outcome::result varint); - - void onMessageRead(outcome::result res); - - void onMessageWritten( - outcome::result res, - const std::shared_ptr &response_handler); - - void setReadingTimeout(); - void cancelReadingTimeout(); - - void setResponseTimeout( - const std::shared_ptr &response_handler); - void cancelResponseTimeout( - const std::shared_ptr &response_handler); + void setTimer(); - std::unordered_map, - basic::Scheduler::Handle> - response_handlers_; - - std::weak_ptr session_host_; std::weak_ptr scheduler_; std::shared_ptr stream_; - - std::vector inner_buffer_; - - std::atomic_size_t reading_ = 0; - std::atomic_size_t writing_ = 0; - bool closed_ = false; - const Time operations_timeout_; - basic::Scheduler::Handle reading_timeout_handle_; - - boost::optional expected_response_type_; - static std::atomic_size_t instance_number; - log::SubLogger log_; + std::shared_ptr framing_; + Cancel timer_; }; } // namespace libp2p::protocol::kademlia diff --git a/include/libp2p/protocol/kademlia/impl/session_host.hpp b/include/libp2p/protocol/kademlia/impl/session_host.hpp index deb0bca7..f9995efe 100644 --- a/include/libp2p/protocol/kademlia/impl/session_host.hpp +++ b/include/libp2p/protocol/kademlia/impl/session_host.hpp @@ -19,9 +19,6 @@ namespace libp2p::protocol::kademlia { /// Opens new session for stream virtual std::shared_ptr openSession( std::shared_ptr stream) = 0; - - /// Closes session by stream - virtual void closeSession(std::shared_ptr stream) = 0; }; } // namespace libp2p::protocol::kademlia diff --git a/include/libp2p/protocol/kademlia/message.hpp b/include/libp2p/protocol/kademlia/message.hpp index 0cd7026f..794cd956 100644 --- a/include/libp2p/protocol/kademlia/message.hpp +++ b/include/libp2p/protocol/kademlia/message.hpp @@ -51,7 +51,7 @@ namespace libp2p::protocol::kademlia { void clear(); // tries to deserialize message from byte array - bool deserialize(const void *data, size_t sz); + bool deserialize(BytesIn pb); // serializes varint(message length) + message into buffer bool serialize(std::vector &buffer) const; diff --git a/src/protocol/kademlia/impl/add_provider_executor.cpp b/src/protocol/kademlia/impl/add_provider_executor.cpp index f3892aba..80922ca6 100644 --- a/src/protocol/kademlia/impl/add_provider_executor.cpp +++ b/src/protocol/kademlia/impl/add_provider_executor.cpp @@ -188,24 +188,13 @@ namespace libp2p::protocol::kademlia { stream->remotePeerId().value().toBase58()); auto session = session_host_->openSession(stream); - - --requests_in_progress_; - - if (session->write(serialized_request_, {})) { - ++requests_succeed_; - log_.debug("write to {} successfuly; done {}, active {}, in queue {}", - addr, - requests_succeed_, - requests_in_progress_, - queue_.size()); - } else { - log_.debug("write to {} failed; done {}, active {}, in queue {}", - addr, - requests_succeed_, - requests_in_progress_, - queue_.size()); - } - - spawn(); + session->write(*serialized_request_, + [self{shared_from_this()}](outcome::result r) { + --self->requests_in_progress_; + if (r) { + ++self->requests_succeed_; + } + self->spawn(); + }); } } // namespace libp2p::protocol::kademlia diff --git a/src/protocol/kademlia/impl/find_peer_executor.cpp b/src/protocol/kademlia/impl/find_peer_executor.cpp index 57931e95..6088d5cb 100644 --- a/src/protocol/kademlia/impl/find_peer_executor.cpp +++ b/src/protocol/kademlia/impl/find_peer_executor.cpp @@ -198,17 +198,7 @@ namespace libp2p::protocol::kademlia { stream->remotePeerId().value().toBase58()); auto session = session_host_->openSession(stream); - if (!session->write(serialized_request_, shared_from_this())) { - --requests_in_progress_; - - log_.debug("write to {} failed; active {}, in queue {}", - addr, - requests_in_progress_, - queue_.size()); - - spawn(); - return; - } + session->write(*serialized_request_, shared_from_this()); } Time FindPeerExecutor::responseTimeout() const { diff --git a/src/protocol/kademlia/impl/find_providers_executor.cpp b/src/protocol/kademlia/impl/find_providers_executor.cpp index bc3f1928..88f0d4e8 100644 --- a/src/protocol/kademlia/impl/find_providers_executor.cpp +++ b/src/protocol/kademlia/impl/find_providers_executor.cpp @@ -212,18 +212,7 @@ namespace libp2p::protocol::kademlia { stream->remotePeerId().value().toBase58()); auto session = session_host_->openSession(stream); - - if (!session->write(serialized_request_, shared_from_this())) { - --requests_in_progress_; - - log_.debug("write to {} failed; active {}, in queue {}", - addr, - requests_in_progress_, - queue_.size()); - - spawn(); - return; - } + session->write(*serialized_request_, shared_from_this()); } Time FindProvidersExecutor::responseTimeout() const { diff --git a/src/protocol/kademlia/impl/get_value_executor.cpp b/src/protocol/kademlia/impl/get_value_executor.cpp index 75cfe251..50f34c7c 100644 --- a/src/protocol/kademlia/impl/get_value_executor.cpp +++ b/src/protocol/kademlia/impl/get_value_executor.cpp @@ -205,18 +205,7 @@ namespace libp2p::protocol::kademlia { stream->remotePeerId().value().toBase58()); auto session = session_host_->openSession(stream); - - if (!session->write(serialized_request_, shared_from_this())) { - --requests_in_progress_; - - log_.debug("write to {} failed; active {}, in queue {}", - addr, - requests_in_progress_, - queue_.size()); - - spawn(); - return; - } + session->write(*serialized_request_, shared_from_this()); } Time GetValueExecutor::responseTimeout() const { diff --git a/src/protocol/kademlia/impl/kademlia_impl.cpp b/src/protocol/kademlia/impl/kademlia_impl.cpp index 26a97abc..cfabbeb6 100644 --- a/src/protocol/kademlia/impl/kademlia_impl.cpp +++ b/src/protocol/kademlia/impl/kademlia_impl.cpp @@ -320,8 +320,7 @@ namespace libp2p::protocol::kademlia { onPing(session, std::move(msg)); break; default: - session->close(Error::UNEXPECTED_MESSAGE_TYPE); - return; + break; } } @@ -348,13 +347,7 @@ namespace libp2p::protocol::kademlia { } // echo request - auto buffer = std::make_shared>(); - if (not msg.serialize(*buffer)) { - session->close(Error::MESSAGE_SERIALIZE_ERROR); - BOOST_UNREACHABLE_RETURN(); - } - - session->write(buffer, {}); + session->write(msg, weak_from_this()); } void KademliaImpl::onGetValue(const std::shared_ptr &session, @@ -393,13 +386,7 @@ namespace libp2p::protocol::kademlia { std::move(msg.key), std::move(value), std::to_string(expire.count())}; } - auto buffer = std::make_shared>(); - if (not msg.serialize(*buffer)) { - session->close(Error::MESSAGE_SERIALIZE_ERROR); - BOOST_UNREACHABLE_RETURN(); - } - - session->write(buffer, {}); + session->write(msg, weak_from_this()); } void KademliaImpl::onAddProvider(const std::shared_ptr &session, @@ -480,13 +467,7 @@ namespace libp2p::protocol::kademlia { } } - auto buffer = std::make_shared>(); - if (not msg.serialize(*buffer)) { - session->close(Error::MESSAGE_SERIALIZE_ERROR); - BOOST_UNREACHABLE_RETURN(); - } - - session->write(buffer, {}); + session->write(msg, weak_from_this()); } void KademliaImpl::onFindNode(const std::shared_ptr &session, @@ -535,26 +516,14 @@ namespace libp2p::protocol::kademlia { msg.closer_peers = std::move(peers); } - auto buffer = std::make_shared>(); - if (not msg.serialize(*buffer)) { - session->close(Error::MESSAGE_SERIALIZE_ERROR); - BOOST_UNREACHABLE_RETURN(); - } - - session->write(buffer, {}); + session->write(msg, weak_from_this()); } void KademliaImpl::onPing(const std::shared_ptr &session, Message &&msg) { msg.clear(); - auto buffer = std::make_shared>(); - if (not msg.serialize(*buffer)) { - session->close(Error::MESSAGE_SERIALIZE_ERROR); - BOOST_UNREACHABLE_RETURN(); - } - - session->write(buffer, {}); + session->write(msg, weak_from_this()); } outcome::result KademliaImpl::findRandomPeer() { @@ -601,28 +570,8 @@ namespace libp2p::protocol::kademlia { std::shared_ptr KademliaImpl::openSession( std::shared_ptr stream) { - auto [it, is_new_session] = sessions_.emplace( - stream, - std::make_shared(weak_from_this(), scheduler_, stream)); - assert(is_new_session); - - log_.debug("session opened, total sessions: {}", sessions_.size()); - - return it->second; - } - - void KademliaImpl::closeSession(std::shared_ptr stream) { - auto it = sessions_.find(stream); - if (it == sessions_.end()) { - return; - } - - auto &session = it->second; - - session->close(); - sessions_.erase(it); - - log_.debug("session completed, total sessions: {}", sessions_.size()); + return std::make_shared( + scheduler_, stream, config_.responseTimeout); } void KademliaImpl::handleProtocol(StreamAndProtocol stream_and_protocol) { @@ -638,12 +587,7 @@ namespace libp2p::protocol::kademlia { stream->remotePeerId().value().toBase58()); auto session = openSession(stream); - - if (!session->read()) { - sessions_.erase(stream); - stream->reset(); - return; - } + session->read(weak_from_this()); } std::shared_ptr KademliaImpl::createPutValueExecutor( diff --git a/src/protocol/kademlia/impl/put_value_executor.cpp b/src/protocol/kademlia/impl/put_value_executor.cpp index e8e0eac6..1d9af853 100644 --- a/src/protocol/kademlia/impl/put_value_executor.cpp +++ b/src/protocol/kademlia/impl/put_value_executor.cpp @@ -140,16 +140,7 @@ namespace libp2p::protocol::kademlia { stream->remotePeerId().value().toBase58()); auto session = session_host_->openSession(stream); - - if (!session->write(serialized_request_, shared_from_this())) { - --requests_in_progress_; - log_.debug("write to {} failed; done {}, active {}, in queue {}", - addr, - requests_succeed_, - requests_in_progress_, - addressees_.size() - addressees_idx_); - spawn(); - } + session->write(*serialized_request_, shared_from_this()); } Time PutValueExecutor::responseTimeout() const { diff --git a/src/protocol/kademlia/impl/session.cpp b/src/protocol/kademlia/impl/session.cpp index 99e0a52d..98f0b6fd 100644 --- a/src/protocol/kademlia/impl/session.cpp +++ b/src/protocol/kademlia/impl/session.cpp @@ -6,293 +6,137 @@ #include -#include -#include +#include + +#include +#include #include -#include -#include +#include +#include namespace libp2p::protocol::kademlia { - - // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) - std::atomic_size_t Session::instance_number = 0; - - Session::Session(std::weak_ptr session_host, - std::weak_ptr scheduler, + Session::Session(std::weak_ptr scheduler, std::shared_ptr stream, Time operations_timeout) - : session_host_(std::move(session_host)), - scheduler_(std::move(scheduler)), - stream_(std::move(stream)), - operations_timeout_(operations_timeout), - log_("KademliaSession", "kademlia", "Session", ++instance_number) { - log_.debug("created"); - } + : scheduler_{std::move(scheduler)}, + stream_{std::move(stream)}, + operations_timeout_{operations_timeout}, + framing_{std::make_shared(stream_)} {} Session::~Session() { - log_.debug("destroyed"); + stream_->reset(); } - bool Session::read() { - if (stream_->isClosedForRead()) { - close(Error::STREAM_RESET); - return false; - } - - ++reading_; - - libp2p::basic::VarintReader::readVarint( - stream_, - [wp = weak_from_this()](outcome::result varint) { - if (auto self = wp.lock()) { - self->onLengthRead(std::move(varint)); - } - }); - setReadingTimeout(); - return true; - } - - bool Session::write( - const std::shared_ptr> &buffer, - const std::shared_ptr &response_handler) { - if (stream_->isClosedForWrite()) { - close(Error::STREAM_RESET); - return false; - } - - ++writing_; - - writeReturnSize(stream_, - *buffer, - [wp = weak_from_this(), buffer, response_handler]( - outcome::result result) { - if (auto self = wp.lock()) { - self->onMessageWritten(result, response_handler); - } - }); - - setResponseTimeout(response_handler); - return true; - } - - void Session::close(outcome::result reason) { - if (closed_) { - return; - } - - closed_ = true; - - if (reason.has_value()) { - reason = Error::SESSION_CLOSED; - } - - for (auto &pair : response_handlers_) { - pair.second.reset(); - pair.first->onResult(shared_from_this(), reason.as_failure()); - } - response_handlers_.clear(); - - cancelReadingTimeout(); - - stream_->close([self{shared_from_this()}](outcome::result) {}); - - if (auto session_host = session_host_.lock()) { - session_host->closeSession(stream_); - } + void Session::read(OnRead on_read) { + setTimer(); + framing_->read([self{shared_from_this()}, on_read{std::move(on_read)}]( + basic::MessageReadWriter::ReadCallback r) { + self->timer_.reset(); + if (not r) { + on_read(r.error()); + return; + } + Message msg; + if (not msg.deserialize(*r.value())) { + on_read(Error::MESSAGE_DESERIALIZE_ERROR); + return; + } + on_read(std::move(msg)); + }); } - void Session::onLengthRead(outcome::result varint) { - if (stream_->isClosedForRead()) { - close(Error::STREAM_RESET); - return; - } - - if (closed_) { - return; - } - - if (varint.has_error()) { - close(varint.error()); - return; - } - - auto msg_len = varint.value().toUInt64(); - inner_buffer_.resize(msg_len); - - // NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions) - stream_->read(std::span(inner_buffer_.data(), inner_buffer_.size()), - msg_len, - [wp = weak_from_this()](auto &&res) { - if (auto self = wp.lock()) { - self->onMessageRead(std::forward(res)); + void Session::write(BytesIn frame, OnWrite on_write) { + setTimer(); + auto buf = std::make_shared(qtils::asVec(frame)); + libp2p::write(stream_, + frame, + [self{shared_from_this()}, + on_write{std::move(on_write)}, + buf](outcome::result r) { + self->timer_.reset(); + if (not r) { + on_write(r.error()); + return; } + on_write(outcome::success()); }); } - void Session::onMessageRead(outcome::result res) { - cancelReadingTimeout(); - - if (closed_) { - return; - } - - if (not res) { - close(res.as_failure()); - return; - } - - if (inner_buffer_.size() != res.value()) { - close(Error::MESSAGE_PARSE_ERROR); - return; - } - - Message msg; - if (!msg.deserialize(inner_buffer_.data(), inner_buffer_.size())) { - close(Error::MESSAGE_DESERIALIZE_ERROR); - return; - } - - switch (msg.type) { - case Message::Type::kPutValue: - log_.debug("Incoming PutValue message"); - break; - case Message::Type::kGetValue: - log_.debug("Incoming GetValue message"); - break; - case Message::Type::kAddProvider: - log_.debug("Incoming AddProvider message"); - break; - case Message::Type::kGetProviders: - log_.debug("Incoming GetProviders message"); - break; - case Message::Type::kFindNode: - log_.debug("Incoming FindNode message"); - break; - case Message::Type::kPing: - log_.debug("Incoming Ping message"); - break; - default: - close(Error::UNEXPECTED_MESSAGE_TYPE); + void Session::read(std::weak_ptr weak_session_host) { + read([self{shared_from_this()}, + weak_session_host{std::move(weak_session_host)}]( + outcome::result r) { + auto session_host = weak_session_host.lock(); + if (not session_host) { return; - } - - --reading_; - - bool pocessed = false; - for (auto it = response_handlers_.begin(); - it != response_handlers_.end();) { - auto cit = it++; - auto &[response_handler, timeout_handle] = *cit; - if (response_handler->match(msg)) { - timeout_handle.reset(); - response_handler->onResult(shared_from_this(), msg); - response_handlers_.erase(cit); - pocessed = true; } - } - - // Propogate to session host - if (not pocessed) { - if (auto session_host = session_host_.lock()) { - session_host->onMessage(shared_from_this(), std::move(msg)); + if (not r) { + return; } - } - - // Continue to wait some response - if (not response_handlers_.empty()) { - read(); - } + session_host->onMessage(self, std::move(r.value())); + }); + } - if (canBeClosed()) { - close(); - } + void Session::read(std::shared_ptr response_handler) { + read([self{shared_from_this()}, + response_handler](outcome::result r) { + if (r and not response_handler->match(r.value())) { + r = Error::UNEXPECTED_MESSAGE_TYPE; + } + response_handler->onResult(self, std::move(r)); + }); } - void Session::onMessageWritten( - outcome::result res, - const std::shared_ptr &response_handler) { - if (not res) { - close(res.as_failure()); + void Session::write(const Message &msg, + std::weak_ptr weak_session_host) { + Bytes pb; + if (not msg.serialize(pb)) { return; } + write(pb, + [self{shared_from_this()}, + weak_session_host{std::move(weak_session_host)}]( + outcome::result r) { + if (not r) { + return; + } + self->read(weak_session_host); + }); + } - --writing_; - - if (not response_handlers_.empty()) { - read(); - } + void Session::write(BytesIn frame, + std::shared_ptr response_handler) { + write( + frame, + [self{shared_from_this()}, response_handler](outcome::result r) { + if (not r) { + response_handler->onResult(self, r.error()); + return; + } + self->read(std::move(response_handler)); + }); + } - if (canBeClosed()) { - close(); - } + void Session::write(BytesIn frame) { + write(frame, [self{shared_from_this()}](outcome::result r) {}); } - void Session::setReadingTimeout() { + void Session::setTimer() { if (operations_timeout_ == Time::zero()) { return; } - auto scheduler = scheduler_.lock(); if (not scheduler) { - close(Error::INTERNAL_ERROR); return; } - - reading_timeout_handle_ = scheduler->scheduleWithHandle( - [wp = weak_from_this()] { - if (auto self = wp.lock()) { - self->close(Error::TIMEOUT); + timer_ = scheduler->scheduleWithHandle( + [weak_self = weak_from_this()] { + auto self = weak_self.lock(); + if (not self) { + return; } + self->stream_->reset(); }, operations_timeout_); } - - void Session::cancelReadingTimeout() { - reading_timeout_handle_.reset(); - } - - void Session::setResponseTimeout( - const std::shared_ptr &response_handler) { - if (not response_handler) { - return; - } - - if (response_handler->responseTimeout() == Time::zero()) { - return; - } - - auto scheduler = scheduler_.lock(); - if (not scheduler) { - close(Error::INTERNAL_ERROR); - return; - } - - response_handlers_.emplace( - response_handler, - scheduler->scheduleWithHandle( - [wp = weak_from_this(), response_handler] { - if (auto self = wp.lock()) { - if (response_handler) { - self->cancelResponseTimeout(response_handler); - response_handler->onResult(self, Error::TIMEOUT); - self->close(); - } - } - }, - response_handler->responseTimeout())); - } - - void Session::cancelResponseTimeout( - const std::shared_ptr &response_handler) { - if (not response_handler) { - return; - } - - if (auto it = response_handlers_.find(response_handler); - it != response_handlers_.end()) { - it->second.reset(); - response_handlers_.erase(it); - } - } - } // namespace libp2p::protocol::kademlia diff --git a/src/protocol/kademlia/message.cpp b/src/protocol/kademlia/message.cpp index 6dd8180a..28d2637b 100644 --- a/src/protocol/kademlia/message.cpp +++ b/src/protocol/kademlia/message.cpp @@ -105,10 +105,10 @@ namespace libp2p::protocol::kademlia { error_message_.clear(); } - bool Message::deserialize(const void *data, size_t sz) { + bool Message::deserialize(BytesIn pb) { clear(); pb::Message pb_msg; - if (!pb_msg.ParseFromArray(data, static_cast(sz))) { + if (!pb_msg.ParseFromArray(pb.data(), static_cast(pb.size()))) { error_message_ = "Invalid protobuf data"; return false; }