Skip to content

Commit 347eb8d

Browse files
alienx5499turuslan
andauthored
Kademlia Periodic Replication and Republishing Implementation (#334)
* feat: implement Kademlia periodic replication and republishing * refactor(kademlia): drop duplicated mutable flags; read from config_ directly per review * refactor(kademlia): remove runtime setters from interface; rely on Config as requested * refactor(kademlia): drop try/catch in periodic replication/republishing per review * refactor(kademlia): deduplicate timer logic into setReplicationTimer/setRepublishingTimer per review * feat(kademlia): on republish extend local expiration by putValue; log on failure * perf(connection_manager): reserve only for open connections as suggested in review * Update kademlia_impl.cpp --------- Co-authored-by: Ruslan Tushov <[email protected]>
1 parent 8e7284c commit 347eb8d

File tree

7 files changed

+212
-2
lines changed

7 files changed

+212
-2
lines changed

include/libp2p/protocol/kademlia/config.hpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,44 @@ namespace libp2p::protocol::kademlia {
4747
*/
4848
std::chrono::seconds delay = 10s;
4949
};
50+
51+
struct PeriodicReplication {
52+
/**
53+
* True if periodic replication is enabled
54+
*/
55+
bool enabled = true;
56+
57+
/**
58+
* Interval for periodic replication
59+
* @note Default: 1h
60+
*/
61+
std::chrono::seconds interval = 1h;
62+
63+
/**
64+
* Number of peers to replicate to per cycle
65+
* @note Default: 3 (subset of K_VALUE)
66+
*/
67+
size_t peers_per_cycle = 3;
68+
};
69+
70+
struct PeriodicRepublishing {
71+
/**
72+
* True if periodic republishing is enabled
73+
*/
74+
bool enabled = true;
75+
76+
/**
77+
* Interval for periodic republishing
78+
* @note Default: 24h
79+
*/
80+
std::chrono::seconds interval = 24h;
81+
82+
/**
83+
* Number of peers to republish to per cycle
84+
* @note Default: 6 (subset of K_VALUE)
85+
*/
86+
size_t peers_per_cycle = 6;
87+
};
5088
} // namespace
5189

5290
class Config {
@@ -146,6 +184,16 @@ namespace libp2p::protocol::kademlia {
146184
*/
147185
RandomWalk randomWalk{};
148186

187+
/**
188+
* Periodic replication config
189+
*/
190+
PeriodicReplication periodicReplication{};
191+
192+
/**
193+
* Periodic republishing config
194+
*/
195+
PeriodicRepublishing periodicRepublishing{};
196+
149197
// https://github.com/libp2p/rust-libp2p/blob/c6cf7fec6913aa590622aeea16709fce6e9c99a5/protocols/kad/src/query/peers/closest.rs#L110-L120
150198
size_t query_initial_peers = K_VALUE;
151199

include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ namespace libp2p::protocol::kademlia {
8484
std::shared_ptr<Session> openSession(
8585
std::shared_ptr<connection::Stream> stream) override;
8686

87+
// Periodic behavior is driven by configuration only
88+
8789
private:
8890
void onPutValue(const std::shared_ptr<Session> &session, Message &&msg);
8991
void onGetValue(const std::shared_ptr<Session> &session, Message &&msg);
@@ -145,6 +147,22 @@ namespace libp2p::protocol::kademlia {
145147
basic::Scheduler::Handle handle{};
146148
} random_walking_;
147149

150+
// Periodic replication and republishing
151+
basic::Scheduler::Handle replication_timer_;
152+
basic::Scheduler::Handle republishing_timer_;
153+
154+
// Periodic operation callbacks
155+
void setReplicationTimer();
156+
void setRepublishingTimer();
157+
void onReplicationTimer();
158+
void onRepublishingTimer();
159+
void performReplication();
160+
void performRepublishing();
161+
162+
// Helper methods for periodic operations
163+
std::vector<PeerId> getClosestPeers(const Key& key, size_t count);
164+
void replicateRecord(const Key& key, const Value& value, bool extend_expiration);
165+
148166
log::SubLogger log_;
149167
};
150168

include/libp2p/protocol/kademlia/impl/storage.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ namespace libp2p::protocol::kademlia {
2525

2626
/// @returns true if it has value corresponding to given @param key.
2727
virtual bool hasValue(const Key &key) const = 0;
28+
29+
/// Get all stored records for periodic operations
30+
/// @return vector of stored records
31+
virtual std::vector<std::pair<Key, Value>> getAllRecords() const = 0;
2832
};
2933

3034
} // namespace libp2p::protocol::kademlia

include/libp2p/protocol/kademlia/impl/storage_impl.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ namespace libp2p::protocol::kademlia {
6565

6666
bool hasValue(const Key &key) const override;
6767

68+
std::vector<std::pair<Key, Value>> getAllRecords() const override;
69+
6870
private:
6971
void onRefreshTimer();
7072
void setTimerRefresh();

src/network/impl/connection_manager_impl.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,18 @@ namespace libp2p::network {
6868
std::vector<ConnectionManager::ConnectionSPtr>
6969
ConnectionManagerImpl::getConnections() const {
7070
std::vector<ConnectionSPtr> out;
71-
out.reserve(connections_.size());
71+
// Pre-allocate space for better performance (only open connections)
72+
size_t total_connections = 0;
73+
for (const auto &entry : connections_) {
74+
for (const auto &conn : entry.second) {
75+
if (not conn->isClosed()) {
76+
++total_connections;
77+
}
78+
}
79+
}
80+
out.reserve(total_connections);
7281

73-
for (auto &&entry : connections_) {
82+
for (const auto &entry : connections_) {
7483
for (const auto &conn : entry.second) {
7584
if (not conn->isClosed()) {
7685
out.emplace_back(conn);

src/protocol/kademlia/impl/kademlia_impl.cpp

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ namespace libp2p::protocol::kademlia {
120120
if (config_.randomWalk.enabled) {
121121
randomWalk();
122122
}
123+
124+
// start periodic replication and republishing
125+
setReplicationTimer();
126+
setRepublishingTimer();
123127
}
124128

125129
outcome::result<void> KademliaImpl::bootstrap() {
@@ -648,4 +652,111 @@ namespace libp2p::protocol::kademlia {
648652
std::move(handler));
649653
}
650654

655+
// Periodic behavior is driven by configuration only; no runtime setters
656+
657+
void KademliaImpl::setReplicationTimer() {
658+
if (not config_.periodicReplication.enabled) {
659+
return;
660+
}
661+
replication_timer_ = scheduler_->scheduleWithHandle(
662+
[weak_self{weak_from_this()}] {
663+
auto self = weak_self.lock();
664+
if (not self) {
665+
return;
666+
}
667+
self->setReplicationTimer();
668+
self->onReplicationTimer();
669+
},
670+
config_.periodicReplication.interval);
671+
}
672+
673+
void KademliaImpl::setRepublishingTimer() {
674+
if (not config_.periodicRepublishing.enabled) {
675+
return;
676+
}
677+
republishing_timer_ = scheduler_->scheduleWithHandle(
678+
[weak_self{weak_from_this()}] {
679+
auto self = weak_self.lock();
680+
if (not self) {
681+
return;
682+
}
683+
self->setRepublishingTimer();
684+
self->onRepublishingTimer();
685+
},
686+
config_.periodicRepublishing.interval);
687+
}
688+
689+
void KademliaImpl::onReplicationTimer() {
690+
performReplication();
691+
}
692+
693+
void KademliaImpl::onRepublishingTimer() {
694+
performRepublishing();
695+
}
696+
697+
void KademliaImpl::performReplication() {
698+
log_.debug("Performing periodic replication");
699+
700+
auto records = storage_->getAllRecords();
701+
for (const auto& [key, value]: records) {
702+
replicateRecord(key, value, false); // false = don't extend expiration
703+
}
704+
}
705+
706+
void KademliaImpl::performRepublishing() {
707+
log_.debug("Performing periodic republishing");
708+
709+
auto records = storage_->getAllRecords();
710+
for (const auto& [key, value] : records) {
711+
replicateRecord(key, value, true); // true = extend expiration
712+
}
713+
}
714+
715+
std::vector<PeerId> KademliaImpl::getClosestPeers(const Key& key, size_t count) {
716+
std::vector<PeerId> closest_peers;
717+
718+
// Get peers from peer routing table
719+
HashedKey hashed_key(key);
720+
auto peers = peer_routing_table_->getNearestPeers(hashed_key.hash, count);
721+
722+
for (const auto& peer : peers) {
723+
if (peer != self_id_) { // Don't include self
724+
closest_peers.push_back(peer);
725+
}
726+
}
727+
728+
return closest_peers;
729+
}
730+
731+
void KademliaImpl::replicateRecord(const Key& key, const Value& value, bool extend_expiration) {
732+
// If republishing, extend local expiration by putting the value back to storage
733+
if (extend_expiration) {
734+
auto put_res = storage_->putValue(key, value);
735+
if (!put_res) {
736+
log_.warn("Republish: failed to extend expiration for key: {}: {}",
737+
multi::detail::encodeBase58(key), put_res.error());
738+
}
739+
}
740+
741+
auto closest_peers = getClosestPeers(key,
742+
extend_expiration ? config_.periodicRepublishing.peers_per_cycle
743+
: config_.periodicReplication.peers_per_cycle);
744+
745+
if (closest_peers.empty()) {
746+
log_.debug("No peers available for replication/republishing of key: {}",
747+
multi::detail::encodeBase58(key));
748+
return;
749+
}
750+
751+
// Create and start put value executor
752+
auto executor = createPutValueExecutor(key, value, closest_peers);
753+
if (executor) {
754+
std::ignore = executor->start();
755+
log_.debug("Started {} for key: {} to {} peers",
756+
extend_expiration ? "republishing" : "replication",
757+
multi::detail::encodeBase58(key),
758+
closest_peers.size());
759+
}
760+
}
761+
651762
} // namespace libp2p::protocol::kademlia

src/protocol/kademlia/impl/storage_impl.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,22 @@ namespace libp2p::protocol::kademlia {
117117
},
118118
config_.storageRefreshInterval);
119119
}
120+
121+
std::vector<std::pair<Key, Value>> StorageImpl::getAllRecords() const {
122+
std::vector<std::pair<Key, Value>> records;
123+
auto now = scheduler_->now();
124+
125+
// Iterate through all records and get their values from backend
126+
for (const auto& record : *table_) {
127+
// Only include non-expired records
128+
if (record.expire_time > now) {
129+
auto value_result = backend_->getValue(record.key);
130+
if (value_result) {
131+
records.emplace_back(record.key, value_result.value());
132+
}
133+
}
134+
}
135+
136+
return records;
137+
}
120138
} // namespace libp2p::protocol::kademlia

0 commit comments

Comments
 (0)