From 3b7cc34f844934a2fc3c97764b47df8ef2137814 Mon Sep 17 00:00:00 2001 From: skaunov Date: Tue, 7 Oct 2025 00:25:48 +0300 Subject: [PATCH 1/4] kad[feat]: enable putting a `Record` without `publisher` --- examples/ipfs-kad/src/main.rs | 8 +- misc/metrics/src/kad.rs | 2 +- protocols/kad/src/behaviour.rs | 349 +++++++++++++--------------- protocols/kad/src/behaviour/test.rs | 70 ++++-- protocols/kad/src/handler.rs | 65 +++--- protocols/kad/src/jobs.rs | 31 ++- protocols/kad/src/protocol.rs | 17 +- protocols/kad/src/query.rs | 15 +- protocols/kad/src/record.rs | 4 +- 9 files changed, 282 insertions(+), 279 deletions(-) diff --git a/examples/ipfs-kad/src/main.rs b/examples/ipfs-kad/src/main.rs index 5d2ea65808f..f2dd466ff25 100644 --- a/examples/ipfs-kad/src/main.rs +++ b/examples/ipfs-kad/src/main.rs @@ -96,9 +96,11 @@ async fn main() -> Result<()> { pk_record_key.put_slice("/pk/".as_bytes()); pk_record_key.put_slice(swarm.local_peer_id().to_bytes().as_slice()); - let mut pk_record = - kad::Record::new(pk_record_key, local_key.public().encode_protobuf()); - pk_record.publisher = Some(*swarm.local_peer_id()); + let mut pk_record = kad::Record::new( + pk_record_key, + local_key.public().encode_protobuf(), + Some(*swarm.local_peer_id()) + ); pk_record.expires = Some(Instant::now().add(Duration::from_secs(60))); swarm diff --git a/misc/metrics/src/kad.rs b/misc/metrics/src/kad.rs index 1b4d68f31ca..863ab37daef 100644 --- a/misc/metrics/src/kad.rs +++ b/misc/metrics/src/kad.rs @@ -254,7 +254,7 @@ impl super::Recorder for Metrics { } } - libp2p_kad::Event::InboundRequest { request } => { + libp2p_kad::Event::InboundRequest(request) => { self.inbound_requests.get_or_create(&request.into()).inc(); } _ => {} diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index f5f44baec74..33b5e1481e0 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -847,45 +847,50 @@ where id } - /// Stores a record in the DHT, locally as well as at the nodes - /// closest to the key as per the xor distance metric. - /// - /// Returns `Ok` if a record has been stored locally, providing the - /// `QueryId` of the initial query that replicates the record in the DHT. - /// The result of the query is eventually reported as a - /// [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`]. - /// - /// The record is always stored locally with the given expiration. If the record's - /// expiration is `None`, the common case, it does not expire in local storage - /// but is still replicated with the configured record TTL. To remove the record - /// locally and stop it from being re-published in the DHT, see [`Behaviour::remove_record`]. - /// - /// After the initial publication of the record, it is subject to (re-)replication - /// and (re-)publication as per the configured intervals. Periodic (re-)publication - /// does not update the record's expiration in local storage, thus a given record - /// with an explicit expiration will always expire at that instant and until then - /// is subject to regular (re-)replication and (re-)publication. + /// Stores a record in the DHT, locally as well as at the nodes closest to the key as per the + /// xor distance metric. + /// + /// Returns `Ok` if a record has been stored locally, providing the [`QueryId`] of the initial + /// query that replicates the record in the DHT. The result of the query is eventually + /// reported as a [`Event::OutboundQueryProgressed{QueryResult::PutRecord}`]. + /// + /// The record is always stored locally with the given expiration. If the record's expiration is + /// `None`, the common case, it does not expire in local storage but is still replicated + /// with the configured record TTL. To remove the record locally and stop it from being + /// re-published in the DHT, see [`Behaviour::remove_record`]. + /// + /// After the initial publication of the record, it is subject to (re-)replication and + /// (re-)publication as per the configured intervals. Periodic (re-)publication does not + /// update the record's expiration in local storage, thus a given record with an explicit + /// expiration will always expire at that instant and until then is subject to regular + /// (re-)replication and (re-)publication. + /// + /// `None` for `publisher` field of `Record` makes the record anonymous; this entails it won't + /// be republished (like a foreign `Record`). + /// `Some` will be ensured to be equal `.local_key()` `kad` has. pub fn put_record( &mut self, mut record: Record, quorum: Quorum, ) -> Result { - record.publisher = Some(*self.kbuckets.local_key().preimage()); + if record.publisher.is_some() { + record.publisher = Some(*self.kbuckets.local_key().preimage()) + } self.store.put(record.clone())?; record.expires = record .expires .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl)); - let quorum = quorum.eval(self.queries.config().replication_factor); let target = kbucket::Key::new(record.key.clone()); - let peers = self.kbuckets.closest_keys(&target); - let context = PutRecordContext::Publish; - let info = QueryInfo::PutRecord { - context, - record, - quorum, - phase: PutRecordPhase::GetClosestPeers, - }; - Ok(self.queries.add_iter_closest(target.clone(), peers, info)) + Ok(self.queries.add_iter_closest( + target.clone(), + self.kbuckets.closest_keys(&target), + QueryInfo::PutRecord { + context: PutRecordContext::Publish, + record, + quorum: quorum.eval(self.queries.config().replication_factor), + phase: PutRecordPhase::GetClosestPeers, + }, + )) } /// Stores a record at specific peers, without storing it locally. @@ -921,16 +926,18 @@ where .expires .or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl)); let context = PutRecordContext::Custom; - let info = QueryInfo::PutRecord { - context, - record, - quorum, - phase: PutRecordPhase::PutRecord { - success: Vec::new(), - get_closest_peers_stats: QueryStats::empty(), + self.queries.add_fixed( + peers, + QueryInfo::PutRecord { + context, + record, + quorum, + phase: PutRecordPhase::PutRecord { + success: Vec::new(), + get_closest_peers_stats: QueryStats::empty(), + }, }, - }; - self.queries.add_fixed(peers, info) + ) } /// Removes the record with the given key from _local_ storage, @@ -1306,16 +1313,17 @@ where /// Starts an iterative `PUT_VALUE` query for the given record. fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) { - let quorum = quorum.eval(self.queries.config().replication_factor); let target = kbucket::Key::new(record.key.clone()); - let peers = self.kbuckets.closest_keys(&target); - let info = QueryInfo::PutRecord { - record, - quorum, - context, - phase: PutRecordPhase::GetClosestPeers, - }; - self.queries.add_iter_closest(target.clone(), peers, info); + self.queries.add_iter_closest( + target.clone(), + self.kbuckets.closest_keys(&target), + QueryInfo::PutRecord { + record, + quorum: quorum.eval(self.queries.config().replication_factor), + context, + phase: PutRecordPhase::GetClosestPeers, + }, + ); } /// Updates the routing table with a new connection status and address of a peer. @@ -1625,17 +1633,19 @@ where quorum, phase: PutRecordPhase::GetClosestPeers, } => { - let info = QueryInfo::PutRecord { - context, - record, - quorum, - phase: PutRecordPhase::PutRecord { - success: vec![], - get_closest_peers_stats: q.stats, + self.queries.continue_fixed( + query_id, + q.peers.into_peerids_iter(), + QueryInfo::PutRecord { + context, + record, + quorum, + phase: PutRecordPhase::PutRecord { + success: vec![], + get_closest_peers_stats: q.stats, + }, }, - }; - self.queries - .continue_fixed(query_id, q.peers.into_peerids_iter(), info); + ); None } @@ -1832,118 +1842,90 @@ where request_id: RequestId, mut record: Record, ) { - if record.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) { - // If the (alleged) publisher is the local node, do nothing. The record of - // the original publisher should never change as a result of replication - // and the publisher is always assumed to have the "right" value. - self.queued_events.push_back(ToSwarm::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: HandlerIn::PutRecordRes { - key: record.key, - value: record.value, - request_id, - }, - }); - return; - } - - let now = Instant::now(); - - // Calculate the expiration exponentially inversely proportional to the - // number of nodes between the local node and the closest node to the key - // (beyond the replication factor). This ensures avoiding over-caching - // outside of the k closest nodes to a key. - let target = kbucket::Key::new(record.key.clone()); - let num_between = self.kbuckets.count_nodes_between(&target); - let k = self.queries.config().replication_factor.get(); - let num_beyond_k = (usize::max(k, num_between) - k) as u32; - let expiration = self - .record_ttl - .map(|ttl| now + exp_decrease(ttl, num_beyond_k)); - // The smaller TTL prevails. Only if neither TTL is set is the record - // stored "forever". - record.expires = record.expires.or(expiration).min(expiration); - - if let Some(job) = self.put_record_job.as_mut() { - // Ignore the record in the next run of the replication - // job, since we can assume the sender replicated the - // record to the k closest peers. Effectively, only - // one of the k closest peers performs a replication - // in the configured interval, assuming a shared interval. - job.skip(record.key.clone()) - } - - // While records received from a publisher, as well as records that do - // not exist locally should always (attempted to) be stored, there is a - // choice here w.r.t. the handling of replicated records whose keys refer - // to records that exist locally: The value and / or the publisher may - // either be overridden or left unchanged. At the moment and in the - // absence of a decisive argument for another option, both are always - // overridden as it avoids having to load the existing record in the - // first place. - - if !record.is_expired(now) { - // The record is cloned because of the weird libp2p protocol - // requirement to send back the value in the response, although this - // is a waste of resources. - match self.record_filtering { - StoreInserts::Unfiltered => match self.store.put(record.clone()) { - Ok(()) => { - tracing::debug!( + let mut event_reset = None; + + // If the (alleged) publisher is the local node, do nothing. The record of the + // original publisher should never change as a result of replication + // and the publisher is always assumed to have the "right" value. + if record.publisher.as_ref() != Some(self.kbuckets.local_key().preimage()) { + let now = Instant::now(); + // Calculate the expiration exponentially inversely proportional to the number of nodes + // between the local node and the closest node to the key (beyond the + // replication factor). This ensures avoiding over-caching outside of the k closest + // nodes to a key. + let target = kbucket::Key::new(record.key.clone()); + let num_between = self.kbuckets.count_nodes_between(&target); + let k = self.queries.config().replication_factor.get(); + let num_beyond_k = (usize::max(k, num_between) - k) as u32; + let expiration = self + .record_ttl + .map(|ttl| now + exp_decrease(ttl, num_beyond_k)); + // The smaller TTL prevails. Only if neither TTL is set is the record stored "forever". + record.expires = record.expires.or(expiration).min(expiration); + + // Ignore the record in the next run of the replication job, since we can assume the + // sender replicated the record to the k closest peers. Effectively, only + // one of the k closest peers performs a replication in the configured + // interval, assuming a shared interval. + if let Some(job) = self.put_record_job.as_mut() { + job.skip(record.key.clone()) + } + + // While records received from a publisher, as well as records that do not exist locally + // should always (attempted to) be stored, there is a choice here w.r.t. the + // handling of replicated records whose keys refer to records that exist locally: The + // value and/or the publisher may either be overridden or left unchanged. At + // the moment and in the absence of a decisive argument for another option, both are + // always overridden as it avoids having to load the existing record in the + // first place. + + if !record.is_expired(now) { + // The record is cloned because of the weird libp2p protocol requirement to send + // back the value in the response, although this is a waste of resources. + + if self.record_filtering == StoreInserts::Unfiltered { + match self.store.put(record.clone()) { + Ok(()) => tracing::debug!( record=?record.key, - "Record stored: {} bytes", + "`Record` stored: {} bytes", record.value.len() - ); - self.queued_events.push_back(ToSwarm::GenerateEvent( - Event::InboundRequest { - request: InboundRequest::PutRecord { - source, - connection, - record: None, - }, - }, - )); - } - Err(e) => { - tracing::info!("Record not stored: {:?}", e); - self.queued_events.push_back(ToSwarm::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: HandlerIn::Reset(request_id), - }); - - return; + ), + Err(e) => { + tracing::info!("`Record` not stored: {:?}", e); + event_reset = Some(HandlerIn::Reset(request_id)) + } } - }, - StoreInserts::FilterBoth => { + } + if event_reset.is_none() { self.queued_events - .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { - request: InboundRequest::PutRecord { + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest( + InboundRequest::PutRecord { source, connection, - record: Some(record.clone()), + record: match self.record_filtering { + StoreInserts::Unfiltered => None, + StoreInserts::FilterBoth => Some(record.clone()), + }, }, - })); + ))) } } } - // The remote receives a [`HandlerIn::PutRecordRes`] even in the - // case where the record is discarded due to being expired. Given that - // the remote sent the local node a [`HandlerEvent::PutRecord`] - // request, the remote perceives the local node as one node among the k - // closest nodes to the target. In addition returning - // [`HandlerIn::PutRecordRes`] does not reveal any internal - // information to a possibly malicious remote node. + // The remote receives a [`HandlerIn::PutRecordRes`] even in the case where the record is + // discarded due to being expired. Given that the remote sent the local node a + // [`HandlerEvent::PutRecord`] request, the remote perceives the local node as one node + // among the k closest nodes to the target. In addition returning + // [`HandlerIn::PutRecordRes`] does not reveal any internal information to a possibly + // malicious remote node. self.queued_events.push_back(ToSwarm::NotifyHandler { peer_id: source, handler: NotifyHandler::One(connection), - event: HandlerIn::PutRecordRes { + event: event_reset.unwrap_or(HandlerIn::PutRecordRes { key: record.key, value: record.value, request_id, - }, + }), }) } @@ -1964,17 +1946,15 @@ where } self.queued_events - .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { - request: InboundRequest::AddProvider { record: None }, - })); + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest( + InboundRequest::AddProvider(None), + ))) } StoreInserts::FilterBoth => { self.queued_events - .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { - request: InboundRequest::AddProvider { - record: Some(record), - }, - })); + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest( + InboundRequest::AddProvider(Some(record)), + ))) } } } @@ -2311,11 +2291,11 @@ where .collect::>(); self.queued_events - .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { - request: InboundRequest::FindNode { + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest( + InboundRequest::FindNode { num_closer_peers: closer_peers.len(), }, - })); + ))); self.queued_events.push_back(ToSwarm::NotifyHandler { peer_id: source, @@ -2324,7 +2304,7 @@ where closer_peers, request_id, }, - }); + }) } HandlerEvent::FindNodeRes { @@ -2341,12 +2321,12 @@ where .collect::>(); self.queued_events - .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { - request: InboundRequest::GetProvider { + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest( + InboundRequest::GetProvider { num_closer_peers: closer_peers.len(), num_provider_peers: provider_peers.len(), }, - })); + ))); self.queued_events.push_back(ToSwarm::NotifyHandler { peer_id: source, @@ -2356,7 +2336,7 @@ where provider_peers, request_id, }, - }); + }) } HandlerEvent::GetProvidersRes { @@ -2437,12 +2417,12 @@ where .collect::>(); self.queued_events - .push_back(ToSwarm::GenerateEvent(Event::InboundRequest { - request: InboundRequest::GetRecord { + .push_back(ToSwarm::GenerateEvent(Event::InboundRequest( + InboundRequest::GetRecord { num_closer_peers: closer_peers.len(), present_locally: record.is_some(), }, - })); + ))); self.queued_events.push_back(ToSwarm::NotifyHandler { peer_id: source, @@ -2452,7 +2432,7 @@ where closer_peers, request_id, }, - }); + }) } HandlerEvent::GetRecordRes { @@ -2507,7 +2487,7 @@ where } HandlerEvent::PutRecord { record, request_id } => { - self.record_received(source, connection, request_id, record); + self.record_received(source, connection, request_id, record) } HandlerEvent::PutRecordRes { query_id, .. } => { @@ -2566,10 +2546,9 @@ where self.add_provider_job = Some(job); } - // Run the periodic record replication / publication job. + // Run the periodic record replication/publication job. if let Some(mut job) = self.put_record_job.take() { - let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity); - for _ in 0..num { + for _ in 0..JOBS_MAX_NEW_QUERIES.min(jobs_query_capacity) { if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) { let context = if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) { @@ -2758,7 +2737,7 @@ pub enum Event { // Note on the difference between 'request' and 'query': A request is a // single request-response style exchange with a single remote peer. A query // is made of multiple requests across multiple remote peers. - InboundRequest { request: InboundRequest }, + InboundRequest(InboundRequest), /// An outbound query has made progress. OutboundQueryProgressed { @@ -2875,8 +2854,8 @@ pub enum InboundRequest { /// If filtering [`StoreInserts::FilterBoth`] is enabled, the [`ProviderRecord`] is /// included. /// - /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details.. - AddProvider { record: Option }, + /// See [`StoreInserts`] and [`Config::set_record_filtering`] for details. + AddProvider(Option), /// Request to retrieve a record. GetRecord { num_closer_peers: usize, @@ -3178,14 +3157,14 @@ pub enum AddProviderContext { pub enum PutRecordContext { /// The context is a [`Behaviour::put_record`] operation. Publish, - /// The context is periodic republishing of records stored - /// earlier via [`Behaviour::put_record`]. + /// The context is periodic republishing of records stored earlier via + /// [`Behaviour::put_record`]. Republish, - /// The context is periodic replication (i.e. without extending - /// the record TTL) of stored records received earlier from another peer. + /// The context is periodic replication (i.e. without extending the record TTL) of stored + /// records received earlier from another peer. Replicate, - /// The context is a custom store operation targeting specific - /// peers initiated by [`Behaviour::put_record_to`]. + /// The context is a custom store operation targeting specific peers initiated by + /// [`Behaviour::put_record_to`]. Custom, } diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 3819276d350..c254b07d762 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -562,7 +562,11 @@ fn get_record_not_found() { /// is equal to the configured replication factor. #[test] fn put_record() { - fn prop(records: Vec, seed: Seed, filter_records: bool, drop_records: bool) { + fn prop(mut records: Vec, seed: Seed, filter_records: bool, drop_records: bool) { + tracing::trace!("remove records without a publisher"); + // this test relies on counting republished `Record` against `records.len()` + records.retain(|r| r.publisher.is_some()); + let mut rng = StdRng::from_seed(seed.0); let replication_factor = NonZeroUsize::new(rng.gen_range(1..(K_VALUE.get() / 2) + 1)).unwrap(); @@ -611,15 +615,15 @@ fn put_record() { .into_iter() .take(num_total) .map(|mut r| { - // We don't want records to expire prematurely, as they would - // be removed from storage and no longer replicated, but we still - // want to check that an explicitly set expiration is preserved. + // We don't want records to expire prematurely, as they would be removed from + // storage and no longer replicated, but we still want to check that + // an explicitly set expiration is preserved. r.expires = r.expires.map(|t| t + Duration::from_secs(60)); (r.key.clone(), r) }) .collect::>(); - // Initiate put_record queries. + // Initiate `put_record` queries. let mut qids = HashSet::new(); for r in records.values() { let qid = swarms[0] @@ -635,7 +639,7 @@ fn put_record() { assert!(record.expires.is_some()); qids.insert(qid); } - i => panic!("Unexpected query info: {i:?}"), + unexpected => panic!("Unexpected query info: {unexpected:?}"), }, None => panic!("Query not found: {qid:?}"), } @@ -684,9 +688,9 @@ fn put_record() { } } } - Poll::Ready(Some(SwarmEvent::Behaviour(Event::InboundRequest { - request: InboundRequest::PutRecord { record, .. }, - }))) => { + Poll::Ready(Some(SwarmEvent::Behaviour(Event::InboundRequest( + InboundRequest::PutRecord { record, .. }, + )))) => { if !drop_records { if let Some(record) = record { assert_eq!( @@ -715,14 +719,14 @@ fn put_record() { } } - // All swarms are Pending and not enough results have been collected - // so far, thus wait to be polled again for further progress. + // All swarms are Pending and not enough results have been collected so far, thus wait + // to be polled again for further progress. if results.len() != records.len() { return Poll::Pending; } - // Consume the results, checking that each record was replicated - // correctly to the closest peers to the key. + // Consume the results, checking that each record was replicated correctly to the + // closest peers to the key. while let Some(r) = results.pop() { let expected = records.get(&r.key).unwrap(); @@ -833,7 +837,15 @@ fn get_record() { .map(|(_addr, swarm)| swarm) .collect::>(); - let record = Record::new(random_multihash(), vec![4, 5, 6]); + let record = Record::new( + random_multihash(), + vec![4, 5, 6], + if random::() { + None + } else { + Some(PeerId::random()) + }, + ); swarms[2].behaviour_mut().store.put(record.clone()).unwrap(); let qid = swarms[0].behaviour_mut().get_record(record.key.clone()); @@ -887,7 +899,15 @@ fn get_record_many() { .collect::>(); let num_results = 10; - let record = Record::new(random_multihash(), vec![4, 5, 6]); + let record = Record::new( + random_multihash(), + vec![4, 5, 6], + if random::() { + None + } else { + Some(PeerId::random()) + }, + ); for swarm in swarms.iter_mut().take(num_nodes) { swarm.behaviour_mut().store.put(record.clone()).unwrap(); @@ -1191,8 +1211,24 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { Multihash::<64>::wrap(SHA_256_MH, &thread_rng().gen::<[u8; 32]>()) .expect("32 array to fit into 64 byte multihash"), ); - let record_bob = Record::new(key.clone(), b"bob".to_vec()); - let record_trudy = Record::new(key.clone(), b"trudy".to_vec()); + let record_bob = Record::new( + key.clone(), + b"bob".to_vec(), + if random::() { + None + } else { + Some(PeerId::random()) + }, + ); + let record_trudy = Record::new( + key.clone(), + b"trudy".to_vec(), + if random::() { + None + } else { + Some(PeerId::random()) + }, + ); // Make `bob` and `trudy` aware of their version of the record searched by // `alice`. diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 2c7b6c52257..71b1b9e7b77 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -261,7 +261,7 @@ pub enum HandlerEvent { query_id: QueryId, }, - /// Request to put a value in the dht records + /// Request to put a value in the DHT records PutRecord { record: Record, /// Identifier of the request. Needs to be passed back when answering. @@ -316,8 +316,8 @@ impl error::Error for HandlerQueryErr { /// Event to send to the handler. #[derive(Debug)] pub enum HandlerIn { - /// Resets the (sub)stream associated with the given request ID, - /// thus signaling an error to the remote. + /// Resets the (sub)stream associated with the given request ID, thus signaling an error to the + /// remote. /// /// Explicitly resetting the (sub)stream associated with a request /// can be used as an alternative to letting requests simply time @@ -399,7 +399,7 @@ pub enum HandlerIn { request_id: RequestId, }, - /// Put a value into the dht records. + /// Put a value into the DHT records. PutRecord { record: Record, /// ID of the query that generated this request. @@ -624,21 +624,19 @@ impl ConnectionHandler for Handler { _ => false, }) { - state.close(); + state.close() } } - HandlerIn::FindNodeReq { key, query_id } => { - let msg = KadRequestMsg::FindNode { key }; - self.pending_messages.push_back((msg, query_id)); - } + HandlerIn::FindNodeReq { key, query_id } => self + .pending_messages + .push_back((KadRequestMsg::FindNode { key }, query_id)), HandlerIn::FindNodeRes { closer_peers, request_id, } => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }), - HandlerIn::GetProvidersReq { key, query_id } => { - let msg = KadRequestMsg::GetProviders { key }; - self.pending_messages.push_back((msg, query_id)); - } + HandlerIn::GetProvidersReq { key, query_id } => self + .pending_messages + .push_back((KadRequestMsg::GetProviders { key }, query_id)), HandlerIn::GetProvidersRes { closer_peers, provider_peers, @@ -654,38 +652,31 @@ impl ConnectionHandler for Handler { key, provider, query_id, - } => { - let msg = KadRequestMsg::AddProvider { key, provider }; - self.pending_messages.push_back((msg, query_id)); - } - HandlerIn::GetRecord { key, query_id } => { - let msg = KadRequestMsg::GetValue { key }; - self.pending_messages.push_back((msg, query_id)); - } - HandlerIn::PutRecord { record, query_id } => { - let msg = KadRequestMsg::PutValue { record }; - self.pending_messages.push_back((msg, query_id)); - } + } => self + .pending_messages + .push_back((KadRequestMsg::AddProvider { key, provider }, query_id)), + HandlerIn::GetRecord { key, query_id } => self + .pending_messages + .push_back((KadRequestMsg::GetValue { key }, query_id)), + HandlerIn::PutRecord { record, query_id } => self + .pending_messages + .push_back((KadRequestMsg::PutValue { record }, query_id)), HandlerIn::GetRecordRes { record, closer_peers, request_id, - } => { - self.answer_pending_request( - request_id, - KadResponseMsg::GetValue { - record, - closer_peers, - }, - ); - } + } => self.answer_pending_request( + request_id, + KadResponseMsg::GetValue { + record, + closer_peers, + }, + ), HandlerIn::PutRecordRes { key, request_id, value, - } => { - self.answer_pending_request(request_id, KadResponseMsg::PutValue { key, value }); - } + } => self.answer_pending_request(request_id, KadResponseMsg::PutValue { key, value }), HandlerIn::ReconfigureMode { new_mode } => { let peer = self.remote_peer_id; diff --git a/protocols/kad/src/jobs.rs b/protocols/kad/src/jobs.rs index 56b3e080d96..4d5ac3ff8cc 100644 --- a/protocols/kad/src/jobs.rs +++ b/protocols/kad/src/jobs.rs @@ -22,8 +22,8 @@ //! //! ## Record Persistence & Expiry //! -//! To ensure persistence of records in the DHT, a Kademlia node -//! must periodically (re-)publish and (re-)replicate its records: +//! To ensure persistence of records in the DHT, a Kademlia node must periodically (re-)publish and +//! (re-)replicate its records: //! //! 1. (Re-)publishing: The original publisher or provider of a record must regularly re-publish //! in order to prolong the expiration. @@ -31,14 +31,13 @@ //! 2. (Re-)replication: Every node storing a replica of a record must regularly re-replicate it //! to the closest nodes to the key in order to ensure the record is present at these nodes. //! -//! Re-publishing primarily ensures persistence of the record beyond its -//! initial TTL, for as long as the publisher stores (or provides) the record, -//! whilst (re-)replication primarily ensures persistence for the duration -//! of the TTL in the light of topology changes. Consequently, replication -//! intervals should be shorter than publication intervals and +//! Re-publishing primarily ensures persistence of the record beyond its initial TTL, for as long as +//! the publisher stores (or provides) the record, whilst (re-)replication primarily ensures +//! persistence for the duration of the TTL in the light of topology changes. Consequently, +//! replication intervals should be shorter than publication intervals and //! publication intervals should be shorter than the TTL. //! -//! This module implements two periodic jobs: +//! This module implements two periodic jobs. //! //! * [`PutRecordJob`]: For (re-)publication and (re-)replication of regular (value-)records. //! @@ -50,14 +49,13 @@ //! nodes to the key, where `k` is the replication factor. //! //! Furthermore, these jobs perform double-duty by removing expired records -//! from the `RecordStore` on every run. Expired records are never emitted +//! from the [`RecordStore`] on every run. Expired records are never emitted //! by the jobs. //! -//! > **Note**: The current implementation takes a snapshot of the records -//! > to replicate from the `RecordStore` when it starts and thus, to account -//! > for the worst case, it temporarily requires additional memory proportional -//! > to the size of all stored records. As a job runs, the records are moved -//! > out of the job to the consumer, where they can be dropped after being sent. +//! > **Note**: The current implementation takes a snapshot of the records to replicate from the +//! > `RecordStore` when it starts and thus, to account for the worst case, it temporarily requires +//! > additional memory proportional to the size of all stored records. As a job runs, the records +//! > are moved out of the job to the consumer, where they can be dropped after being sent. use std::{ collections::HashSet, @@ -192,9 +190,8 @@ impl PutRecordJob { /// Polls the job for records to replicate. /// - /// Must be called in the context of a task. When `NotReady` is returned, - /// the current task is registered to be notified when the job is ready - /// to be run. + /// Must be called in the context of a task. When `NotReady` is returned, the current task is + /// registered to be notified when the job is ready to be run. pub(crate) fn poll( &mut self, cx: &mut Context<'_>, diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index df181af1a86..287e72f20b6 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -556,14 +556,6 @@ fn record_from_proto(record: proto::Record) -> Result { let key = record::Key::from(record.key); let value = record.value; - let publisher = if !record.publisher.is_empty() { - PeerId::from_bytes(&record.publisher) - .map(Some) - .map_err(|_| invalid_data("Invalid publisher peer ID."))? - } else { - None - }; - let expires = if record.ttl > 0 { Some(Instant::now() + Duration::from_secs(record.ttl as u64)) } else { @@ -573,7 +565,14 @@ fn record_from_proto(record: proto::Record) -> Result { Ok(Record { key, value, - publisher, + publisher: if record.publisher.is_empty() { + None + } else { + Some( + PeerId::from_bytes(&record.publisher) + .map_err(|_| invalid_data("Invalid publisher peer ID."))?, + ) + }, expires, }) } diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 5b2035c993e..78a307a8b10 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -41,11 +41,11 @@ use crate::{ QueryInfo, ALPHA_VALUE, K_VALUE, }; -/// A `QueryPool` provides an aggregate state machine for driving `Query`s to completion. +/// A `QueryPool` provides an aggregate state machine for driving [`Query`] to completion. /// -/// Internally, a `Query` is in turn driven by an underlying `QueryPeerIter` -/// that determines the peer selection strategy, i.e. the order in which the -/// peers involved in the query should be contacted. +/// Internally, a `Query` is in turn driven by an underlying [`QueryPeerIter`] that determines the +/// peer selection strategy, i.e. the order in which the peers involved in the query should be +/// contacted. pub(crate) struct QueryPool { next_id: usize, config: QueryConfig, @@ -56,8 +56,8 @@ pub(crate) struct QueryPool { pub(crate) enum QueryPoolState<'a> { /// The pool is idle, i.e. there are no queries to process. Idle, - /// At least one query is waiting for results. `Some(request)` indicates - /// that a new request is now being waited on. + /// At least one query is waiting for results. `Some(request)` indicates that a new request is + /// now being waited on. Waiting(Option<(&'a mut Query, PeerId)>), /// A query has finished. Finished(Query), @@ -163,8 +163,7 @@ impl QueryPool { QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers)) }; - let query = Query::new(id, peer_iter, info); - self.queries.insert(id, query); + self.queries.insert(id, Query::new(id, peer_iter, info)); } fn next_query_id(&mut self) -> QueryId { diff --git a/protocols/kad/src/record.rs b/protocols/kad/src/record.rs index fea17f826a4..1731b9253eb 100644 --- a/protocols/kad/src/record.rs +++ b/protocols/kad/src/record.rs @@ -90,14 +90,14 @@ pub struct Record { impl Record { /// Creates a new record for insertion into the DHT. - pub fn new(key: K, value: Vec) -> Self + pub fn new(key: K, value: Vec, publisher: Option) -> Self where K: Into, { Record { key: key.into(), value, - publisher: None, + publisher, expires: None, } } From 50277ad2df2e9a9284ef512f1abd39c6aec52a83 Mon Sep 17 00:00:00 2001 From: skaunov Date: Tue, 7 Oct 2025 00:41:18 +0300 Subject: [PATCH 2/4] A changelog entry has been made in the appropriate crates --- protocols/kad/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index d50f7319af8..1161142677f 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,5 @@ +- Allow putting `Record` with `publisher: None` sacrificing republishing. + ## 0.49.0 - Remove no longer constructed GetRecordError::QuorumFailed. From 2a66c6539b0165b309d77bad5413b8a5fcc31843 Mon Sep 17 00:00:00 2001 From: skaunov Date: Tue, 7 Oct 2025 00:50:29 +0300 Subject: [PATCH 3/4] fmt --- examples/ipfs-kad/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/ipfs-kad/src/main.rs b/examples/ipfs-kad/src/main.rs index f2dd466ff25..71f1da2b342 100644 --- a/examples/ipfs-kad/src/main.rs +++ b/examples/ipfs-kad/src/main.rs @@ -97,9 +97,9 @@ async fn main() -> Result<()> { pk_record_key.put_slice(swarm.local_peer_id().to_bytes().as_slice()); let mut pk_record = kad::Record::new( - pk_record_key, - local_key.public().encode_protobuf(), - Some(*swarm.local_peer_id()) + pk_record_key, + local_key.public().encode_protobuf(), + Some(*swarm.local_peer_id()), ); pk_record.expires = Some(Instant::now().add(Duration::from_secs(60))); From e125e3c55ac025348ed6f5c77b4ee162006a4f1f Mon Sep 17 00:00:00 2001 From: skaunov Date: Tue, 7 Oct 2025 00:57:13 +0300 Subject: [PATCH 4/4] GA requires logging the change in metrics too --- misc/metrics/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index bab690c4d98..b8d0f9e8c77 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -1,3 +1,5 @@ +- tiny adaptation for #6176 + ## 0.17.1 - Fix panic in swarm metrics when `ConnectionClosed` events are received for connections that were established before metrics collection started.