Skip to content

Commit 92d48a5

Browse files
authored
refactor(rz): simplify discovered_peers map
Unfold the `discovered_peers` `HashMap<(PeerId, Namespace), Vec<Multiaddr>>` into `HashMap<PeerId, HashMap<Namespace>, Vec<Multiaddr>>`. This simplifies the code when accessing all addresses for a peer. Pull-Request: #5937.
1 parent 4a58542 commit 92d48a5

File tree

1 file changed

+85
-106
lines changed

1 file changed

+85
-106
lines changed

protocols/rendezvous/src/client.rs

Lines changed: 85 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub struct Behaviour {
5555
///
5656
/// Storing these internally allows us to assist the [`libp2p_swarm::Swarm`] in dialing by
5757
/// returning addresses from [`NetworkBehaviour::handle_pending_outbound_connection`].
58-
discovered_peers: HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
58+
discovered_peers: HashMap<PeerId, HashMap<Namespace, Vec<Multiaddr>>>,
5959

6060
registered_namespaces: HashMap<(PeerId, Namespace), Ttl>,
6161

@@ -310,13 +310,17 @@ impl NetworkBehaviour for Behaviour {
310310
Poll::Pending => {}
311311
}
312312

313-
if let Poll::Ready(Some(expired_registration)) =
313+
if let Poll::Ready(Some((peer, expired_registration))) =
314314
self.expiring_registrations.poll_next_unpin(cx)
315315
{
316-
self.discovered_peers.remove(&expired_registration);
317-
return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
318-
peer: expired_registration.0,
319-
}));
316+
let Some(registrations) = self.discovered_peers.get_mut(&peer) else {
317+
continue;
318+
};
319+
registrations.remove(&expired_registration);
320+
if registrations.is_empty() {
321+
self.discovered_peers.remove(&peer);
322+
}
323+
return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired { peer }));
320324
}
321325

322326
return Poll::Pending;
@@ -330,20 +334,10 @@ impl NetworkBehaviour for Behaviour {
330334
_addresses: &[Multiaddr],
331335
_effective_role: Endpoint,
332336
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
333-
let peer = match maybe_peer {
334-
None => return Ok(vec![]),
335-
Some(peer) => peer,
336-
};
337-
338-
let addresses = self
339-
.discovered_peers
340-
.iter()
341-
.filter_map(|((candidate, _), addresses)| (candidate == &peer).then_some(addresses))
342-
.flatten()
343-
.cloned()
344-
.collect();
345-
346-
Ok(addresses)
337+
let addrs = maybe_peer
338+
.map(|peer| self.discovered_peer_addrs(&peer).cloned().collect())
339+
.unwrap_or_default();
340+
Ok(addrs)
347341
}
348342
}
349343

@@ -375,101 +369,86 @@ impl Behaviour {
375369
) -> Option<Event> {
376370
match response {
377371
RegisterResponse(Ok(ttl)) => {
378-
if let Some((rendezvous_node, namespace)) =
379-
self.waiting_for_register.remove(request_id)
380-
{
381-
self.registered_namespaces
382-
.insert((rendezvous_node, namespace.clone()), ttl);
383-
384-
return Some(Event::Registered {
385-
rendezvous_node,
386-
ttl,
387-
namespace,
388-
});
389-
}
390-
391-
None
372+
let (rendezvous_node, namespace) = self.waiting_for_register.remove(request_id)?;
373+
self.registered_namespaces
374+
.insert((rendezvous_node, namespace.clone()), ttl);
375+
376+
Some(Event::Registered {
377+
rendezvous_node,
378+
ttl,
379+
namespace,
380+
})
392381
}
393382
RegisterResponse(Err(error_code)) => {
394-
if let Some((rendezvous_node, namespace)) =
395-
self.waiting_for_register.remove(request_id)
396-
{
397-
return Some(Event::RegisterFailed {
398-
rendezvous_node,
399-
namespace,
400-
error: error_code,
401-
});
402-
}
403-
404-
None
383+
let (rendezvous_node, namespace) = self.waiting_for_register.remove(request_id)?;
384+
Some(Event::RegisterFailed {
385+
rendezvous_node,
386+
namespace,
387+
error: error_code,
388+
})
405389
}
406390
DiscoverResponse(Ok((registrations, cookie))) => {
407-
if let Some((rendezvous_node, _ns)) = self.waiting_for_discovery.remove(request_id)
408-
{
409-
self.events
410-
.extend(registrations.iter().flat_map(|registration| {
411-
let peer_id = registration.record.peer_id();
412-
registration
413-
.record
414-
.addresses()
415-
.iter()
416-
.filter(|addr| {
417-
!self.discovered_peers.iter().any(
418-
|((discovered_peer_id, _), addrs)| {
419-
*discovered_peer_id == peer_id && addrs.contains(addr)
420-
},
421-
)
422-
})
423-
.map(|address| ToSwarm::NewExternalAddrOfPeer {
424-
peer_id,
425-
address: address.clone(),
426-
})
427-
.collect::<Vec<_>>()
428-
}));
429-
430-
self.discovered_peers
431-
.extend(registrations.iter().map(|registration| {
432-
let peer_id = registration.record.peer_id();
433-
let namespace = registration.namespace.clone();
434-
435-
let addresses = registration.record.addresses().to_vec();
436-
437-
((peer_id, namespace), addresses)
438-
}));
439-
440-
self.expiring_registrations
441-
.extend(registrations.iter().cloned().map(|registration| {
442-
async move {
443-
// if the timer errors we consider it expired
444-
futures_timer::Delay::new(Duration::from_secs(registration.ttl))
445-
.await;
446-
447-
(registration.record.peer_id(), registration.namespace)
391+
let (rendezvous_node, _ns) = self.waiting_for_discovery.remove(request_id)?;
392+
registrations.iter().for_each(|registration| {
393+
let peer_id = registration.record.peer_id();
394+
let addresses = registration.record.addresses();
395+
let namespace = registration.namespace.clone();
396+
let ttl = registration.ttl;
397+
398+
// Emit events for all newly discovered addresses.
399+
let new_addr_events = addresses
400+
.iter()
401+
.filter_map(|address| {
402+
if self.discovered_peer_addrs(&peer_id).any(|a| a == address) {
403+
return None;
448404
}
449-
.boxed()
450-
}));
451-
452-
return Some(Event::Discovered {
453-
rendezvous_node,
454-
registrations,
455-
cookie,
456-
});
457-
}
458-
459-
None
405+
Some(ToSwarm::NewExternalAddrOfPeer {
406+
peer_id,
407+
address: address.clone(),
408+
})
409+
})
410+
.collect::<Vec<_>>();
411+
self.events.extend(new_addr_events);
412+
413+
// Update list of discovered peers.
414+
self.discovered_peers
415+
.entry(peer_id)
416+
.or_default()
417+
.insert(namespace.clone(), addresses.to_owned());
418+
419+
// Push registration expiry future.
420+
self.expiring_registrations.push(
421+
async move {
422+
// if the timer errors we consider it expired
423+
futures_timer::Delay::new(Duration::from_secs(ttl)).await;
424+
(peer_id, namespace)
425+
}
426+
.boxed(),
427+
);
428+
});
429+
430+
Some(Event::Discovered {
431+
rendezvous_node,
432+
registrations,
433+
cookie,
434+
})
460435
}
461436
DiscoverResponse(Err(error_code)) => {
462-
if let Some((rendezvous_node, ns)) = self.waiting_for_discovery.remove(request_id) {
463-
return Some(Event::DiscoverFailed {
464-
rendezvous_node,
465-
namespace: ns,
466-
error: error_code,
467-
});
468-
}
469-
470-
None
437+
let (rendezvous_node, ns) = self.waiting_for_discovery.remove(request_id)?;
438+
Some(Event::DiscoverFailed {
439+
rendezvous_node,
440+
namespace: ns,
441+
error: error_code,
442+
})
471443
}
472444
_ => unreachable!("rendezvous clients never receive requests"),
473445
}
474446
}
447+
448+
fn discovered_peer_addrs(&self, peer: &PeerId) -> impl Iterator<Item = &Multiaddr> {
449+
self.discovered_peers
450+
.get(peer)
451+
.map(|addrs| addrs.values().flatten())
452+
.unwrap_or_default()
453+
}
475454
}

0 commit comments

Comments
 (0)