Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions examples/ipfs-kad/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ async fn main() -> Result<()> {
pk_record_key.put_slice("/pk/".as_bytes());
pk_record_key.put_slice(swarm.local_peer_id().to_bytes().as_slice());

let mut pk_record =
kad::Record::new(pk_record_key, local_key.public().encode_protobuf());
pk_record.publisher = Some(*swarm.local_peer_id());
let mut pk_record = kad::Record::new(
pk_record_key,
local_key.public().encode_protobuf(),
Some(*swarm.local_peer_id()),
);
pk_record.expires = Some(Instant::now().add(Duration::from_secs(60)));

swarm
Expand Down
2 changes: 2 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- tiny adaptation for #6176

## 0.17.1

- Fix panic in swarm metrics when `ConnectionClosed` events are received for connections that were established before metrics collection started.
Expand Down
2 changes: 1 addition & 1 deletion misc/metrics/src/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl super::Recorder<libp2p_kad::Event> for Metrics {
}
}

libp2p_kad::Event::InboundRequest { request } => {
libp2p_kad::Event::InboundRequest(request) => {
self.inbound_requests.get_or_create(&request.into()).inc();
}
_ => {}
Expand Down
2 changes: 2 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Allow putting `Record` with `publisher: None` sacrificing republishing.

## 0.49.0

- Remove no longer constructed GetRecordError::QuorumFailed.
Expand Down
349 changes: 164 additions & 185 deletions protocols/kad/src/behaviour.rs

Large diffs are not rendered by default.

70 changes: 53 additions & 17 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,11 @@ fn get_record_not_found() {
/// is equal to the configured replication factor.
#[test]
fn put_record() {
fn prop(records: Vec<Record>, seed: Seed, filter_records: bool, drop_records: bool) {
fn prop(mut records: Vec<Record>, seed: Seed, filter_records: bool, drop_records: bool) {
tracing::trace!("remove records without a publisher");
// this test relies on counting republished `Record` against `records.len()`
records.retain(|r| r.publisher.is_some());

let mut rng = StdRng::from_seed(seed.0);
let replication_factor =
NonZeroUsize::new(rng.gen_range(1..(K_VALUE.get() / 2) + 1)).unwrap();
Expand Down Expand Up @@ -611,15 +615,15 @@ fn put_record() {
.into_iter()
.take(num_total)
.map(|mut r| {
// We don't want records to expire prematurely, as they would
// be removed from storage and no longer replicated, but we still
// want to check that an explicitly set expiration is preserved.
// We don't want records to expire prematurely, as they would be removed from
// storage and no longer replicated, but we still want to check that
// an explicitly set expiration is preserved.
r.expires = r.expires.map(|t| t + Duration::from_secs(60));
(r.key.clone(), r)
})
.collect::<HashMap<_, _>>();

// Initiate put_record queries.
// Initiate `put_record` queries.
let mut qids = HashSet::new();
for r in records.values() {
let qid = swarms[0]
Expand All @@ -635,7 +639,7 @@ fn put_record() {
assert!(record.expires.is_some());
qids.insert(qid);
}
i => panic!("Unexpected query info: {i:?}"),
unexpected => panic!("Unexpected query info: {unexpected:?}"),
},
None => panic!("Query not found: {qid:?}"),
}
Expand Down Expand Up @@ -684,9 +688,9 @@ fn put_record() {
}
}
}
Poll::Ready(Some(SwarmEvent::Behaviour(Event::InboundRequest {
request: InboundRequest::PutRecord { record, .. },
}))) => {
Poll::Ready(Some(SwarmEvent::Behaviour(Event::InboundRequest(
InboundRequest::PutRecord { record, .. },
)))) => {
if !drop_records {
if let Some(record) = record {
assert_eq!(
Expand Down Expand Up @@ -715,14 +719,14 @@ fn put_record() {
}
}

// All swarms are Pending and not enough results have been collected
// so far, thus wait to be polled again for further progress.
// All swarms are Pending and not enough results have been collected so far, thus wait
// to be polled again for further progress.
if results.len() != records.len() {
return Poll::Pending;
}

// Consume the results, checking that each record was replicated
// correctly to the closest peers to the key.
// Consume the results, checking that each record was replicated correctly to the
// closest peers to the key.
while let Some(r) = results.pop() {
let expected = records.get(&r.key).unwrap();

Expand Down Expand Up @@ -833,7 +837,15 @@ fn get_record() {
.map(|(_addr, swarm)| swarm)
.collect::<Vec<_>>();

let record = Record::new(random_multihash(), vec![4, 5, 6]);
let record = Record::new(
random_multihash(),
vec![4, 5, 6],
if random::<bool>() {
None
} else {
Some(PeerId::random())
},
);

swarms[2].behaviour_mut().store.put(record.clone()).unwrap();
let qid = swarms[0].behaviour_mut().get_record(record.key.clone());
Expand Down Expand Up @@ -887,7 +899,15 @@ fn get_record_many() {
.collect::<Vec<_>>();
let num_results = 10;

let record = Record::new(random_multihash(), vec![4, 5, 6]);
let record = Record::new(
random_multihash(),
vec![4, 5, 6],
if random::<bool>() {
None
} else {
Some(PeerId::random())
},
);

for swarm in swarms.iter_mut().take(num_nodes) {
swarm.behaviour_mut().store.put(record.clone()).unwrap();
Expand Down Expand Up @@ -1191,8 +1211,24 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
Multihash::<64>::wrap(SHA_256_MH, &thread_rng().gen::<[u8; 32]>())
.expect("32 array to fit into 64 byte multihash"),
);
let record_bob = Record::new(key.clone(), b"bob".to_vec());
let record_trudy = Record::new(key.clone(), b"trudy".to_vec());
let record_bob = Record::new(
key.clone(),
b"bob".to_vec(),
if random::<bool>() {
None
} else {
Some(PeerId::random())
},
);
let record_trudy = Record::new(
key.clone(),
b"trudy".to_vec(),
if random::<bool>() {
None
} else {
Some(PeerId::random())
},
);

// Make `bob` and `trudy` aware of their version of the record searched by
// `alice`.
Expand Down
65 changes: 28 additions & 37 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ pub enum HandlerEvent {
query_id: QueryId,
},

/// Request to put a value in the dht records
/// Request to put a value in the DHT records
PutRecord {
record: Record,
/// Identifier of the request. Needs to be passed back when answering.
Expand Down Expand Up @@ -316,8 +316,8 @@ impl error::Error for HandlerQueryErr {
/// Event to send to the handler.
#[derive(Debug)]
pub enum HandlerIn {
/// Resets the (sub)stream associated with the given request ID,
/// thus signaling an error to the remote.
/// Resets the (sub)stream associated with the given request ID, thus signaling an error to the
/// remote.
///
/// Explicitly resetting the (sub)stream associated with a request
/// can be used as an alternative to letting requests simply time
Expand Down Expand Up @@ -399,7 +399,7 @@ pub enum HandlerIn {
request_id: RequestId,
},

/// Put a value into the dht records.
/// Put a value into the DHT records.
PutRecord {
record: Record,
/// ID of the query that generated this request.
Expand Down Expand Up @@ -624,21 +624,19 @@ impl ConnectionHandler for Handler {
_ => false,
})
{
state.close();
state.close()
}
}
HandlerIn::FindNodeReq { key, query_id } => {
let msg = KadRequestMsg::FindNode { key };
self.pending_messages.push_back((msg, query_id));
}
HandlerIn::FindNodeReq { key, query_id } => self
.pending_messages
.push_back((KadRequestMsg::FindNode { key }, query_id)),
HandlerIn::FindNodeRes {
closer_peers,
request_id,
} => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }),
HandlerIn::GetProvidersReq { key, query_id } => {
let msg = KadRequestMsg::GetProviders { key };
self.pending_messages.push_back((msg, query_id));
}
HandlerIn::GetProvidersReq { key, query_id } => self
.pending_messages
.push_back((KadRequestMsg::GetProviders { key }, query_id)),
HandlerIn::GetProvidersRes {
closer_peers,
provider_peers,
Expand All @@ -654,38 +652,31 @@ impl ConnectionHandler for Handler {
key,
provider,
query_id,
} => {
let msg = KadRequestMsg::AddProvider { key, provider };
self.pending_messages.push_back((msg, query_id));
}
HandlerIn::GetRecord { key, query_id } => {
let msg = KadRequestMsg::GetValue { key };
self.pending_messages.push_back((msg, query_id));
}
HandlerIn::PutRecord { record, query_id } => {
let msg = KadRequestMsg::PutValue { record };
self.pending_messages.push_back((msg, query_id));
}
} => self
.pending_messages
.push_back((KadRequestMsg::AddProvider { key, provider }, query_id)),
HandlerIn::GetRecord { key, query_id } => self
.pending_messages
.push_back((KadRequestMsg::GetValue { key }, query_id)),
HandlerIn::PutRecord { record, query_id } => self
.pending_messages
.push_back((KadRequestMsg::PutValue { record }, query_id)),
HandlerIn::GetRecordRes {
record,
closer_peers,
request_id,
} => {
self.answer_pending_request(
request_id,
KadResponseMsg::GetValue {
record,
closer_peers,
},
);
}
} => self.answer_pending_request(
request_id,
KadResponseMsg::GetValue {
record,
closer_peers,
},
),
HandlerIn::PutRecordRes {
key,
request_id,
value,
} => {
self.answer_pending_request(request_id, KadResponseMsg::PutValue { key, value });
}
} => self.answer_pending_request(request_id, KadResponseMsg::PutValue { key, value }),
HandlerIn::ReconfigureMode { new_mode } => {
let peer = self.remote_peer_id;

Expand Down
31 changes: 14 additions & 17 deletions protocols/kad/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@
//!
//! ## Record Persistence & Expiry
//!
//! To ensure persistence of records in the DHT, a Kademlia node
//! must periodically (re-)publish and (re-)replicate its records:
//! To ensure persistence of records in the DHT, a Kademlia node must periodically (re-)publish and
//! (re-)replicate its records:
//!
//! 1. (Re-)publishing: The original publisher or provider of a record must regularly re-publish
//! in order to prolong the expiration.
//!
//! 2. (Re-)replication: Every node storing a replica of a record must regularly re-replicate it
//! to the closest nodes to the key in order to ensure the record is present at these nodes.
//!
//! Re-publishing primarily ensures persistence of the record beyond its
//! initial TTL, for as long as the publisher stores (or provides) the record,
//! whilst (re-)replication primarily ensures persistence for the duration
//! of the TTL in the light of topology changes. Consequently, replication
//! intervals should be shorter than publication intervals and
//! Re-publishing primarily ensures persistence of the record beyond its initial TTL, for as long as
//! the publisher stores (or provides) the record, whilst (re-)replication primarily ensures
//! persistence for the duration of the TTL in the light of topology changes. Consequently,
//! replication intervals should be shorter than publication intervals and
//! publication intervals should be shorter than the TTL.
//!
//! This module implements two periodic jobs:
//! This module implements two periodic jobs.
//!
//! * [`PutRecordJob`]: For (re-)publication and (re-)replication of regular (value-)records.
//!
Expand All @@ -50,14 +49,13 @@
//! nodes to the key, where `k` is the replication factor.
//!
//! Furthermore, these jobs perform double-duty by removing expired records
//! from the `RecordStore` on every run. Expired records are never emitted
//! from the [`RecordStore`] on every run. Expired records are never emitted
//! by the jobs.
//!
//! > **Note**: The current implementation takes a snapshot of the records
//! > to replicate from the `RecordStore` when it starts and thus, to account
//! > for the worst case, it temporarily requires additional memory proportional
//! > to the size of all stored records. As a job runs, the records are moved
//! > out of the job to the consumer, where they can be dropped after being sent.
//! > **Note**: The current implementation takes a snapshot of the records to replicate from the
//! > `RecordStore` when it starts and thus, to account for the worst case, it temporarily requires
//! > additional memory proportional to the size of all stored records. As a job runs, the records
//! > are moved out of the job to the consumer, where they can be dropped after being sent.

use std::{
collections::HashSet,
Expand Down Expand Up @@ -192,9 +190,8 @@ impl PutRecordJob {

/// Polls the job for records to replicate.
///
/// Must be called in the context of a task. When `NotReady` is returned,
/// the current task is registered to be notified when the job is ready
/// to be run.
/// Must be called in the context of a task. When `NotReady` is returned, the current task is
/// registered to be notified when the job is ready to be run.
pub(crate) fn poll<T>(
&mut self,
cx: &mut Context<'_>,
Expand Down
17 changes: 8 additions & 9 deletions protocols/kad/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,14 +556,6 @@ fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
let key = record::Key::from(record.key);
let value = record.value;

let publisher = if !record.publisher.is_empty() {
PeerId::from_bytes(&record.publisher)
.map(Some)
.map_err(|_| invalid_data("Invalid publisher peer ID."))?
} else {
None
};

let expires = if record.ttl > 0 {
Some(Instant::now() + Duration::from_secs(record.ttl as u64))
} else {
Expand All @@ -573,7 +565,14 @@ fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
Ok(Record {
key,
value,
publisher,
publisher: if record.publisher.is_empty() {
None
} else {
Some(
PeerId::from_bytes(&record.publisher)
.map_err(|_| invalid_data("Invalid publisher peer ID."))?,
)
},
expires,
})
}
Expand Down
Loading