Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<Session> openSession(
std::shared_ptr<connection::Stream> stream) override;

/// @see SessionHost::closeSession
void closeSession(std::shared_ptr<connection::Stream> stream) override;

private:
void onPutValue(const std::shared_ptr<Session> &session, Message &&msg);
void onGetValue(const std::shared_ptr<Session> &session, Message &&msg);
Expand Down Expand Up @@ -142,26 +139,6 @@ namespace libp2p::protocol::kademlia {
event::Handle new_connection_subscription_;
event::Handle on_disconnected_;

struct StreamPtrComparator {
bool operator()(const std::shared_ptr<connection::Stream> &lhs,
const std::shared_ptr<connection::Stream> &rhs) const {
return lhs.get() == rhs.get();
}
};

struct StreamPtrHaher {
size_t operator()(const std::shared_ptr<connection::Stream> &s) const {
auto r = std::hash<decltype(s.get())>()(s.get());
return r;
}
};

std::unordered_map<const std::shared_ptr<connection::Stream>,
std::shared_ptr<Session>,
StreamPtrHaher,
StreamPtrComparator>
sessions_;

// Random walk's auxiliary data
struct {
size_t iteration = 0;
Expand Down
89 changes: 29 additions & 60 deletions include/libp2p/protocol/kademlia/impl/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,85 +6,54 @@

#pragma once

#include <atomic>
#include <functional>

#include <libp2p/basic/scheduler.hpp>
#include <libp2p/connection/stream.hpp>
#include <libp2p/log/sublogger.hpp>
#include <libp2p/multi/uvarint.hpp>
#include <libp2p/protocol/kademlia/error.hpp>
#include <libp2p/protocol/kademlia/impl/response_handler.hpp>
#include <libp2p/protocol/kademlia/impl/session_host.hpp>
#include <libp2p/protocol/kademlia/message.hpp>

namespace libp2p::protocol::kademlia {
namespace libp2p::basic {
class MessageReadWriterUvarint;
class Scheduler;
} // namespace libp2p::basic

class FindPeerExecutor;
namespace libp2p::protocol::kademlia {
class ResponseHandler;
class SessionHost;
} // namespace libp2p::protocol::kademlia

namespace libp2p::protocol::kademlia {
class Session : public std::enable_shared_from_this<Session> {
public:
Session(std::weak_ptr<SessionHost> session_host,
std::weak_ptr<basic::Scheduler> scheduler,
Session(std::weak_ptr<basic::Scheduler> scheduler,
std::shared_ptr<connection::Stream> stream,
Time operations_timeout = Time::zero());

Time operations_timeout);
~Session();

std::shared_ptr<connection::Stream> stream() const {
return stream_;
}

boost::optional<Message::Type> expectedResponseType() const {
return expected_response_type_;
}

bool read();
using OnRead = std::function<void(outcome::result<Message>)>;
void read(OnRead on_read);
using OnWrite = std::function<void(outcome::result<void>)>;
void write(BytesIn frame, OnWrite on_write);

bool write(const std::shared_ptr<std::vector<uint8_t>> &buffer,
const std::shared_ptr<ResponseHandler> &response_handler);
void read(std::weak_ptr<SessionHost> weak_session_host);
void read(std::shared_ptr<ResponseHandler> response_handler);
void write(const Message &msg,
std::weak_ptr<SessionHost> weak_session_host);
void write(BytesIn frame,
std::shared_ptr<ResponseHandler> response_handler);
void write(BytesIn frame);

bool canBeClosed() const {
return not closed_ && writing_ == 0 && reading_ == 0;
std::shared_ptr<connection::Stream> stream() const {
return stream_;
}

void close(outcome::result<void> = outcome::success());

private:
void onLengthRead(outcome::result<multi::UVarint> varint);

void onMessageRead(outcome::result<size_t> res);

void onMessageWritten(
outcome::result<size_t> res,
const std::shared_ptr<ResponseHandler> &response_handler);

void setReadingTimeout();
void cancelReadingTimeout();

void setResponseTimeout(
const std::shared_ptr<ResponseHandler> &response_handler);
void cancelResponseTimeout(
const std::shared_ptr<ResponseHandler> &response_handler);
void setTimer();

std::unordered_map<std::shared_ptr<ResponseHandler>,
basic::Scheduler::Handle>
response_handlers_;

std::weak_ptr<SessionHost> session_host_;
std::weak_ptr<basic::Scheduler> scheduler_;
std::shared_ptr<connection::Stream> stream_;

std::vector<uint8_t> inner_buffer_;

std::atomic_size_t reading_ = 0;
std::atomic_size_t writing_ = 0;
bool closed_ = false;

const Time operations_timeout_;
basic::Scheduler::Handle reading_timeout_handle_;

boost::optional<Message::Type> expected_response_type_;

static std::atomic_size_t instance_number;
log::SubLogger log_;
std::shared_ptr<basic::MessageReadWriterUvarint> framing_;
Cancel timer_;
};
} // namespace libp2p::protocol::kademlia
3 changes: 0 additions & 3 deletions include/libp2p/protocol/kademlia/impl/session_host.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ namespace libp2p::protocol::kademlia {
/// Opens new session for stream
virtual std::shared_ptr<Session> openSession(
std::shared_ptr<connection::Stream> stream) = 0;

/// Closes session by stream
virtual void closeSession(std::shared_ptr<connection::Stream> stream) = 0;
};

} // namespace libp2p::protocol::kademlia
2 changes: 1 addition & 1 deletion include/libp2p/protocol/kademlia/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace libp2p::protocol::kademlia {
void clear();

// tries to deserialize message from byte array
bool deserialize(const void *data, size_t sz);
bool deserialize(BytesIn pb);

// serializes varint(message length) + message into buffer
bool serialize(std::vector<uint8_t> &buffer) const;
Expand Down
27 changes: 8 additions & 19 deletions src/protocol/kademlia/impl/add_provider_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,24 +188,13 @@ namespace libp2p::protocol::kademlia {
stream->remotePeerId().value().toBase58());

auto session = session_host_->openSession(stream);

--requests_in_progress_;

if (session->write(serialized_request_, {})) {
++requests_succeed_;
log_.debug("write to {} successfuly; done {}, active {}, in queue {}",
addr,
requests_succeed_,
requests_in_progress_,
queue_.size());
} else {
log_.debug("write to {} failed; done {}, active {}, in queue {}",
addr,
requests_succeed_,
requests_in_progress_,
queue_.size());
}

spawn();
session->write(*serialized_request_,
[self{shared_from_this()}](outcome::result<void> r) {
--self->requests_in_progress_;
if (r) {
++self->requests_succeed_;
}
self->spawn();
});
}
} // namespace libp2p::protocol::kademlia
12 changes: 1 addition & 11 deletions src/protocol/kademlia/impl/find_peer_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,7 @@ namespace libp2p::protocol::kademlia {
stream->remotePeerId().value().toBase58());

auto session = session_host_->openSession(stream);
if (!session->write(serialized_request_, shared_from_this())) {
--requests_in_progress_;

log_.debug("write to {} failed; active {}, in queue {}",
addr,
requests_in_progress_,
queue_.size());

spawn();
return;
}
session->write(*serialized_request_, shared_from_this());
}

Time FindPeerExecutor::responseTimeout() const {
Expand Down
13 changes: 1 addition & 12 deletions src/protocol/kademlia/impl/find_providers_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,7 @@ namespace libp2p::protocol::kademlia {
stream->remotePeerId().value().toBase58());

auto session = session_host_->openSession(stream);

if (!session->write(serialized_request_, shared_from_this())) {
--requests_in_progress_;

log_.debug("write to {} failed; active {}, in queue {}",
addr,
requests_in_progress_,
queue_.size());

spawn();
return;
}
session->write(*serialized_request_, shared_from_this());
}

Time FindProvidersExecutor::responseTimeout() const {
Expand Down
13 changes: 1 addition & 12 deletions src/protocol/kademlia/impl/get_value_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,7 @@ namespace libp2p::protocol::kademlia {
stream->remotePeerId().value().toBase58());

auto session = session_host_->openSession(stream);

if (!session->write(serialized_request_, shared_from_this())) {
--requests_in_progress_;

log_.debug("write to {} failed; active {}, in queue {}",
addr,
requests_in_progress_,
queue_.size());

spawn();
return;
}
session->write(*serialized_request_, shared_from_this());
}

Time GetValueExecutor::responseTimeout() const {
Expand Down
74 changes: 9 additions & 65 deletions src/protocol/kademlia/impl/kademlia_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,7 @@ namespace libp2p::protocol::kademlia {
onPing(session, std::move(msg));
break;
default:
session->close(Error::UNEXPECTED_MESSAGE_TYPE);
return;
break;
}
}

Expand All @@ -348,13 +347,7 @@ namespace libp2p::protocol::kademlia {
}

// echo request
auto buffer = std::make_shared<std::vector<uint8_t>>();
if (not msg.serialize(*buffer)) {
session->close(Error::MESSAGE_SERIALIZE_ERROR);
BOOST_UNREACHABLE_RETURN();
}

session->write(buffer, {});
session->write(msg, weak_from_this());
}

void KademliaImpl::onGetValue(const std::shared_ptr<Session> &session,
Expand Down Expand Up @@ -393,13 +386,7 @@ namespace libp2p::protocol::kademlia {
std::move(msg.key), std::move(value), std::to_string(expire.count())};
}

auto buffer = std::make_shared<std::vector<uint8_t>>();
if (not msg.serialize(*buffer)) {
session->close(Error::MESSAGE_SERIALIZE_ERROR);
BOOST_UNREACHABLE_RETURN();
}

session->write(buffer, {});
session->write(msg, weak_from_this());
}

void KademliaImpl::onAddProvider(const std::shared_ptr<Session> &session,
Expand Down Expand Up @@ -480,13 +467,7 @@ namespace libp2p::protocol::kademlia {
}
}

auto buffer = std::make_shared<std::vector<uint8_t>>();
if (not msg.serialize(*buffer)) {
session->close(Error::MESSAGE_SERIALIZE_ERROR);
BOOST_UNREACHABLE_RETURN();
}

session->write(buffer, {});
session->write(msg, weak_from_this());
}

void KademliaImpl::onFindNode(const std::shared_ptr<Session> &session,
Expand Down Expand Up @@ -535,26 +516,14 @@ namespace libp2p::protocol::kademlia {
msg.closer_peers = std::move(peers);
}

auto buffer = std::make_shared<std::vector<uint8_t>>();
if (not msg.serialize(*buffer)) {
session->close(Error::MESSAGE_SERIALIZE_ERROR);
BOOST_UNREACHABLE_RETURN();
}

session->write(buffer, {});
session->write(msg, weak_from_this());
}

void KademliaImpl::onPing(const std::shared_ptr<Session> &session,
Message &&msg) {
msg.clear();

auto buffer = std::make_shared<std::vector<uint8_t>>();
if (not msg.serialize(*buffer)) {
session->close(Error::MESSAGE_SERIALIZE_ERROR);
BOOST_UNREACHABLE_RETURN();
}

session->write(buffer, {});
session->write(msg, weak_from_this());
}

outcome::result<void> KademliaImpl::findRandomPeer() {
Expand Down Expand Up @@ -601,28 +570,8 @@ namespace libp2p::protocol::kademlia {

std::shared_ptr<Session> KademliaImpl::openSession(
std::shared_ptr<connection::Stream> stream) {
auto [it, is_new_session] = sessions_.emplace(
stream,
std::make_shared<Session>(weak_from_this(), scheduler_, stream));
assert(is_new_session);

log_.debug("session opened, total sessions: {}", sessions_.size());

return it->second;
}

void KademliaImpl::closeSession(std::shared_ptr<connection::Stream> stream) {
auto it = sessions_.find(stream);
if (it == sessions_.end()) {
return;
}

auto &session = it->second;

session->close();
sessions_.erase(it);

log_.debug("session completed, total sessions: {}", sessions_.size());
return std::make_shared<Session>(
scheduler_, stream, config_.responseTimeout);
}

void KademliaImpl::handleProtocol(StreamAndProtocol stream_and_protocol) {
Expand All @@ -638,12 +587,7 @@ namespace libp2p::protocol::kademlia {
stream->remotePeerId().value().toBase58());

auto session = openSession(stream);

if (!session->read()) {
sessions_.erase(stream);
stream->reset();
return;
}
session->read(weak_from_this());
}

std::shared_ptr<PutValueExecutor> KademliaImpl::createPutValueExecutor(
Expand Down
Loading