diff --git a/include/libp2p/protocol/kademlia/config.hpp b/include/libp2p/protocol/kademlia/config.hpp index 34b22ade7..70104aa3e 100644 --- a/include/libp2p/protocol/kademlia/config.hpp +++ b/include/libp2p/protocol/kademlia/config.hpp @@ -47,6 +47,44 @@ namespace libp2p::protocol::kademlia { */ std::chrono::seconds delay = 10s; }; + + struct PeriodicReplication { + /** + * True if periodic replication is enabled + */ + bool enabled = true; + + /** + * Interval for periodic replication + * @note Default: 1h + */ + std::chrono::seconds interval = 1h; + + /** + * Number of peers to replicate to per cycle + * @note Default: 3 (subset of K_VALUE) + */ + size_t peers_per_cycle = 3; + }; + + struct PeriodicRepublishing { + /** + * True if periodic republishing is enabled + */ + bool enabled = true; + + /** + * Interval for periodic republishing + * @note Default: 24h + */ + std::chrono::seconds interval = 24h; + + /** + * Number of peers to republish to per cycle + * @note Default: 6 (subset of K_VALUE) + */ + size_t peers_per_cycle = 6; + }; } // namespace class Config { @@ -146,6 +184,16 @@ namespace libp2p::protocol::kademlia { */ RandomWalk randomWalk{}; + /** + * Periodic replication config + */ + PeriodicReplication periodicReplication{}; + + /** + * Periodic republishing config + */ + PeriodicRepublishing periodicRepublishing{}; + // https://github.com/libp2p/rust-libp2p/blob/c6cf7fec6913aa590622aeea16709fce6e9c99a5/protocols/kad/src/query/peers/closest.rs#L110-L120 size_t query_initial_peers = K_VALUE; diff --git a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp index d1c0d24d7..6bfd0f18a 100644 --- a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp +++ b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp @@ -84,6 +84,8 @@ namespace libp2p::protocol::kademlia { std::shared_ptr openSession( std::shared_ptr stream) override; + // Periodic behavior is driven by configuration only + private: void onPutValue(const std::shared_ptr &session, Message &&msg); void onGetValue(const std::shared_ptr &session, Message &&msg); @@ -145,6 +147,22 @@ namespace libp2p::protocol::kademlia { basic::Scheduler::Handle handle{}; } random_walking_; + // Periodic replication and republishing + basic::Scheduler::Handle replication_timer_; + basic::Scheduler::Handle republishing_timer_; + + // Periodic operation callbacks + void setReplicationTimer(); + void setRepublishingTimer(); + void onReplicationTimer(); + void onRepublishingTimer(); + void performReplication(); + void performRepublishing(); + + // Helper methods for periodic operations + std::vector getClosestPeers(const Key& key, size_t count); + void replicateRecord(const Key& key, const Value& value, bool extend_expiration); + log::SubLogger log_; }; diff --git a/include/libp2p/protocol/kademlia/impl/storage.hpp b/include/libp2p/protocol/kademlia/impl/storage.hpp index 1416ac400..a007f128d 100644 --- a/include/libp2p/protocol/kademlia/impl/storage.hpp +++ b/include/libp2p/protocol/kademlia/impl/storage.hpp @@ -25,6 +25,10 @@ namespace libp2p::protocol::kademlia { /// @returns true if it has value corresponding to given @param key. virtual bool hasValue(const Key &key) const = 0; + + /// Get all stored records for periodic operations + /// @return vector of stored records + virtual std::vector> getAllRecords() const = 0; }; } // namespace libp2p::protocol::kademlia diff --git a/include/libp2p/protocol/kademlia/impl/storage_impl.hpp b/include/libp2p/protocol/kademlia/impl/storage_impl.hpp index 2332b549d..7dec2666b 100644 --- a/include/libp2p/protocol/kademlia/impl/storage_impl.hpp +++ b/include/libp2p/protocol/kademlia/impl/storage_impl.hpp @@ -65,6 +65,8 @@ namespace libp2p::protocol::kademlia { bool hasValue(const Key &key) const override; + std::vector> getAllRecords() const override; + private: void onRefreshTimer(); void setTimerRefresh(); diff --git a/src/network/impl/connection_manager_impl.cpp b/src/network/impl/connection_manager_impl.cpp index 1b9d7d540..d70fb2f17 100644 --- a/src/network/impl/connection_manager_impl.cpp +++ b/src/network/impl/connection_manager_impl.cpp @@ -68,9 +68,18 @@ namespace libp2p::network { std::vector ConnectionManagerImpl::getConnections() const { std::vector out; - out.reserve(connections_.size()); + // Pre-allocate space for better performance (only open connections) + size_t total_connections = 0; + for (const auto &entry : connections_) { + for (const auto &conn : entry.second) { + if (not conn->isClosed()) { + ++total_connections; + } + } + } + out.reserve(total_connections); - for (auto &&entry : connections_) { + for (const auto &entry : connections_) { for (const auto &conn : entry.second) { if (not conn->isClosed()) { out.emplace_back(conn); diff --git a/src/protocol/kademlia/impl/kademlia_impl.cpp b/src/protocol/kademlia/impl/kademlia_impl.cpp index cfabbeb69..0e42de3e7 100644 --- a/src/protocol/kademlia/impl/kademlia_impl.cpp +++ b/src/protocol/kademlia/impl/kademlia_impl.cpp @@ -120,6 +120,10 @@ namespace libp2p::protocol::kademlia { if (config_.randomWalk.enabled) { randomWalk(); } + + // start periodic replication and republishing + setReplicationTimer(); + setRepublishingTimer(); } outcome::result KademliaImpl::bootstrap() { @@ -648,4 +652,111 @@ namespace libp2p::protocol::kademlia { std::move(handler)); } + // Periodic behavior is driven by configuration only; no runtime setters + + void KademliaImpl::setReplicationTimer() { + if (not config_.periodicReplication.enabled) { + return; + } + replication_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (not self) { + return; + } + self->setReplicationTimer(); + self->onReplicationTimer(); + }, + config_.periodicReplication.interval); + } + + void KademliaImpl::setRepublishingTimer() { + if (not config_.periodicRepublishing.enabled) { + return; + } + republishing_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (not self) { + return; + } + self->setRepublishingTimer(); + self->onRepublishingTimer(); + }, + config_.periodicRepublishing.interval); + } + + void KademliaImpl::onReplicationTimer() { + performReplication(); + } + + void KademliaImpl::onRepublishingTimer() { + performRepublishing(); + } + + void KademliaImpl::performReplication() { + log_.debug("Performing periodic replication"); + + auto records = storage_->getAllRecords(); + for (const auto& [key, value]: records) { + replicateRecord(key, value, false); // false = don't extend expiration + } + } + + void KademliaImpl::performRepublishing() { + log_.debug("Performing periodic republishing"); + + auto records = storage_->getAllRecords(); + for (const auto& [key, value] : records) { + replicateRecord(key, value, true); // true = extend expiration + } + } + + std::vector KademliaImpl::getClosestPeers(const Key& key, size_t count) { + std::vector closest_peers; + + // Get peers from peer routing table + HashedKey hashed_key(key); + auto peers = peer_routing_table_->getNearestPeers(hashed_key.hash, count); + + for (const auto& peer : peers) { + if (peer != self_id_) { // Don't include self + closest_peers.push_back(peer); + } + } + + return closest_peers; + } + + void KademliaImpl::replicateRecord(const Key& key, const Value& value, bool extend_expiration) { + // If republishing, extend local expiration by putting the value back to storage + if (extend_expiration) { + auto put_res = storage_->putValue(key, value); + if (!put_res) { + log_.warn("Republish: failed to extend expiration for key: {}: {}", + multi::detail::encodeBase58(key), put_res.error()); + } + } + + auto closest_peers = getClosestPeers(key, + extend_expiration ? config_.periodicRepublishing.peers_per_cycle + : config_.periodicReplication.peers_per_cycle); + + if (closest_peers.empty()) { + log_.debug("No peers available for replication/republishing of key: {}", + multi::detail::encodeBase58(key)); + return; + } + + // Create and start put value executor + auto executor = createPutValueExecutor(key, value, closest_peers); + if (executor) { + std::ignore = executor->start(); + log_.debug("Started {} for key: {} to {} peers", + extend_expiration ? "republishing" : "replication", + multi::detail::encodeBase58(key), + closest_peers.size()); + } + } + } // namespace libp2p::protocol::kademlia diff --git a/src/protocol/kademlia/impl/storage_impl.cpp b/src/protocol/kademlia/impl/storage_impl.cpp index b8a5fd0be..40183eae3 100644 --- a/src/protocol/kademlia/impl/storage_impl.cpp +++ b/src/protocol/kademlia/impl/storage_impl.cpp @@ -117,4 +117,22 @@ namespace libp2p::protocol::kademlia { }, config_.storageRefreshInterval); } + + std::vector> StorageImpl::getAllRecords() const { + std::vector> records; + auto now = scheduler_->now(); + + // Iterate through all records and get their values from backend + for (const auto& record : *table_) { + // Only include non-expired records + if (record.expire_time > now) { + auto value_result = backend_->getValue(record.key); + if (value_result) { + records.emplace_back(record.key, value_result.value()); + } + } + } + + return records; + } } // namespace libp2p::protocol::kademlia