Skip to content
48 changes: 48 additions & 0 deletions include/libp2p/protocol/kademlia/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,44 @@ namespace libp2p::protocol::kademlia {
*/
std::chrono::seconds delay = 10s;
};

struct PeriodicReplication {
/**
* True if periodic replication is enabled
*/
bool enabled = true;

/**
* Interval for periodic replication
* @note Default: 1h
*/
std::chrono::seconds interval = 1h;

/**
* Number of peers to replicate to per cycle
* @note Default: 3 (subset of K_VALUE)
*/
size_t peers_per_cycle = 3;
};

struct PeriodicRepublishing {
/**
* True if periodic republishing is enabled
*/
bool enabled = true;

/**
* Interval for periodic republishing
* @note Default: 24h
*/
std::chrono::seconds interval = 24h;

/**
* Number of peers to republish to per cycle
* @note Default: 6 (subset of K_VALUE)
*/
size_t peers_per_cycle = 6;
};
} // namespace

class Config {
Expand Down Expand Up @@ -146,6 +184,16 @@ namespace libp2p::protocol::kademlia {
*/
RandomWalk randomWalk{};

/**
* Periodic replication config
*/
PeriodicReplication periodicReplication{};

/**
* Periodic republishing config
*/
PeriodicRepublishing periodicRepublishing{};

// https://github.com/libp2p/rust-libp2p/blob/c6cf7fec6913aa590622aeea16709fce6e9c99a5/protocols/kad/src/query/peers/closest.rs#L110-L120
size_t query_initial_peers = K_VALUE;

Expand Down
18 changes: 18 additions & 0 deletions include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<Session> openSession(
std::shared_ptr<connection::Stream> stream) override;

// Periodic behavior is driven by configuration only

private:
void onPutValue(const std::shared_ptr<Session> &session, Message &&msg);
void onGetValue(const std::shared_ptr<Session> &session, Message &&msg);
Expand Down Expand Up @@ -145,6 +147,22 @@ namespace libp2p::protocol::kademlia {
basic::Scheduler::Handle handle{};
} random_walking_;

// Periodic replication and republishing
basic::Scheduler::Handle replication_timer_;
basic::Scheduler::Handle republishing_timer_;

// Periodic operation callbacks
void setReplicationTimer();
void setRepublishingTimer();
void onReplicationTimer();
void onRepublishingTimer();
void performReplication();
void performRepublishing();

// Helper methods for periodic operations
std::vector<PeerId> getClosestPeers(const Key& key, size_t count);
void replicateRecord(const Key& key, const Value& value, bool extend_expiration);

log::SubLogger log_;
};

Expand Down
4 changes: 4 additions & 0 deletions include/libp2p/protocol/kademlia/impl/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ namespace libp2p::protocol::kademlia {

/// @returns true if it has value corresponding to given @param key.
virtual bool hasValue(const Key &key) const = 0;

/// Get all stored records for periodic operations
/// @return vector of stored records
virtual std::vector<std::pair<Key, Value>> getAllRecords() const = 0;
};

} // namespace libp2p::protocol::kademlia
2 changes: 2 additions & 0 deletions include/libp2p/protocol/kademlia/impl/storage_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ namespace libp2p::protocol::kademlia {

bool hasValue(const Key &key) const override;

std::vector<std::pair<Key, Value>> getAllRecords() const override;

private:
void onRefreshTimer();
void setTimerRefresh();
Expand Down
13 changes: 11 additions & 2 deletions src/network/impl/connection_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,18 @@ namespace libp2p::network {
std::vector<ConnectionManager::ConnectionSPtr>
ConnectionManagerImpl::getConnections() const {
std::vector<ConnectionSPtr> out;
out.reserve(connections_.size());
// Pre-allocate space for better performance (only open connections)
size_t total_connections = 0;
for (const auto &entry : connections_) {
for (const auto &conn : entry.second) {
if (not conn->isClosed()) {
++total_connections;
}
}
}
out.reserve(total_connections);

for (auto &&entry : connections_) {
for (const auto &entry : connections_) {
for (const auto &conn : entry.second) {
if (not conn->isClosed()) {
out.emplace_back(conn);
Expand Down
111 changes: 111 additions & 0 deletions src/protocol/kademlia/impl/kademlia_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ namespace libp2p::protocol::kademlia {
if (config_.randomWalk.enabled) {
randomWalk();
}

// start periodic replication and republishing
setReplicationTimer();
setRepublishingTimer();
}

outcome::result<void> KademliaImpl::bootstrap() {
Expand Down Expand Up @@ -648,4 +652,111 @@ namespace libp2p::protocol::kademlia {
std::move(handler));
}

// Periodic behavior is driven by configuration only; no runtime setters

void KademliaImpl::setReplicationTimer() {
if (not config_.periodicReplication.enabled) {
return;
}
replication_timer_ = scheduler_->scheduleWithHandle(
[weak_self{weak_from_this()}] {
auto self = weak_self.lock();
if (not self) {
return;
}
self->setReplicationTimer();
self->onReplicationTimer();
},
config_.periodicReplication.interval);
}

void KademliaImpl::setRepublishingTimer() {
if (not config_.periodicRepublishing.enabled) {
return;
}
republishing_timer_ = scheduler_->scheduleWithHandle(
[weak_self{weak_from_this()}] {
auto self = weak_self.lock();
if (not self) {
return;
}
self->setRepublishingTimer();
self->onRepublishingTimer();
},
config_.periodicRepublishing.interval);
}

void KademliaImpl::onReplicationTimer() {
performReplication();
}

void KademliaImpl::onRepublishingTimer() {
performRepublishing();
}

void KademliaImpl::performReplication() {
log_.debug("Performing periodic replication");

auto records = storage_->getAllRecords();
for (const auto& [key, value]: records) {
replicateRecord(key, value, false); // false = don't extend expiration
}
}

void KademliaImpl::performRepublishing() {
log_.debug("Performing periodic republishing");

auto records = storage_->getAllRecords();
for (const auto& [key, value] : records) {
replicateRecord(key, value, true); // true = extend expiration
}
}

std::vector<PeerId> KademliaImpl::getClosestPeers(const Key& key, size_t count) {
std::vector<PeerId> closest_peers;

// Get peers from peer routing table
HashedKey hashed_key(key);
auto peers = peer_routing_table_->getNearestPeers(hashed_key.hash, count);

for (const auto& peer : peers) {
if (peer != self_id_) { // Don't include self
closest_peers.push_back(peer);
}
}

return closest_peers;
}

void KademliaImpl::replicateRecord(const Key& key, const Value& value, bool extend_expiration) {
// If republishing, extend local expiration by putting the value back to storage
if (extend_expiration) {
auto put_res = storage_->putValue(key, value);
if (!put_res) {
log_.warn("Republish: failed to extend expiration for key: {}: {}",
multi::detail::encodeBase58(key), put_res.error());
}
}

auto closest_peers = getClosestPeers(key,
extend_expiration ? config_.periodicRepublishing.peers_per_cycle
: config_.periodicReplication.peers_per_cycle);

if (closest_peers.empty()) {
log_.debug("No peers available for replication/republishing of key: {}",
multi::detail::encodeBase58(key));
return;
}

// Create and start put value executor
auto executor = createPutValueExecutor(key, value, closest_peers);
if (executor) {
std::ignore = executor->start();
log_.debug("Started {} for key: {} to {} peers",
extend_expiration ? "republishing" : "replication",
multi::detail::encodeBase58(key),
closest_peers.size());
}
}

} // namespace libp2p::protocol::kademlia
18 changes: 18 additions & 0 deletions src/protocol/kademlia/impl/storage_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,22 @@ namespace libp2p::protocol::kademlia {
},
config_.storageRefreshInterval);
}

std::vector<std::pair<Key, Value>> StorageImpl::getAllRecords() const {
std::vector<std::pair<Key, Value>> records;
auto now = scheduler_->now();

// Iterate through all records and get their values from backend
for (const auto& record : *table_) {
// Only include non-expired records
if (record.expire_time > now) {
auto value_result = backend_->getValue(record.key);
if (value_result) {
records.emplace_back(record.key, value_result.value());
}
}
}

return records;
}
} // namespace libp2p::protocol::kademlia
Loading