Skip to content

Commit a3492cf

Browse files
authored
Merge branch 'main' into feat/619-store-pubkey-peerid-peerstore
2 parents 2248108 + 733ef86 commit a3492cf

File tree

4 files changed

+87
-75
lines changed

4 files changed

+87
-75
lines changed

libp2p/pubsub/gossipsub.py

Lines changed: 83 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -528,82 +528,99 @@ def mesh_heartbeat(
528528
peers_to_prune[peer].append(topic)
529529
return peers_to_graft, peers_to_prune
530530

531+
def _handle_topic_heartbeat(
532+
self,
533+
topic: str,
534+
current_peers: set[ID],
535+
is_fanout: bool = False,
536+
peers_to_gossip: DefaultDict[ID, dict[str, list[str]]] | None = None,
537+
) -> tuple[set[ID], bool]:
538+
"""
539+
Helper method to handle heartbeat for a single topic,
540+
supporting both fanout and gossip.
541+
542+
:param topic: The topic to handle
543+
:param current_peers: Current set of peers in the topic
544+
:param is_fanout: Whether this is a fanout topic (affects expiration check)
545+
:param peers_to_gossip: Optional dictionary to store peers to gossip to
546+
:return: Tuple of (updated_peers, should_remove_topic)
547+
"""
548+
if self.pubsub is None:
549+
raise NoPubsubAttached
550+
551+
# Skip if no peers have subscribed to the topic
552+
if topic not in self.pubsub.peer_topics:
553+
return current_peers, False
554+
555+
# For fanout topics, check if we should remove the topic
556+
if is_fanout:
557+
if self.time_since_last_publish.get(topic, 0) + self.time_to_live < int(
558+
time.time()
559+
):
560+
return set(), True
561+
562+
# Check if peers are still in the topic and remove the ones that are not
563+
in_topic_peers: set[ID] = {
564+
peer for peer in current_peers if peer in self.pubsub.peer_topics[topic]
565+
}
566+
567+
# If we need more peers to reach target degree
568+
if len(in_topic_peers) < self.degree:
569+
# Select additional peers from peers.gossipsub[topic]
570+
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
571+
topic,
572+
self.degree - len(in_topic_peers),
573+
in_topic_peers,
574+
)
575+
# Add the selected peers
576+
in_topic_peers.update(selected_peers)
577+
578+
# Handle gossip if requested
579+
if peers_to_gossip is not None:
580+
msg_ids = self.mcache.window(topic)
581+
if msg_ids:
582+
# Select D peers from peers.gossipsub[topic] excluding current peers
583+
peers_to_emit_ihave_to = self._get_in_topic_gossipsub_peers_from_minus(
584+
topic, self.degree, current_peers
585+
)
586+
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
587+
for peer in peers_to_emit_ihave_to:
588+
peers_to_gossip[peer][topic] = msg_id_strs
589+
590+
return in_topic_peers, False
591+
531592
def fanout_heartbeat(self) -> None:
532-
# Note: the comments here are the exact pseudocode from the spec
593+
"""
594+
Maintain fanout topics by:
595+
1. Removing expired topics
596+
2. Removing peers that are no longer in the topic
597+
3. Adding new peers if needed to maintain the target degree
598+
"""
533599
for topic in list(self.fanout):
534-
if (
535-
self.pubsub is not None
536-
and topic not in self.pubsub.peer_topics
537-
and self.time_since_last_publish.get(topic, 0) + self.time_to_live
538-
< int(time.time())
539-
):
540-
# Remove topic from fanout
600+
updated_peers, should_remove = self._handle_topic_heartbeat(
601+
topic, self.fanout[topic], is_fanout=True
602+
)
603+
if should_remove:
541604
del self.fanout[topic]
542605
else:
543-
# Check if fanout peers are still in the topic and remove the ones that are not # noqa: E501
544-
# ref: https://github.com/libp2p/go-libp2p-pubsub/blob/01b9825fbee1848751d90a8469e3f5f43bac8466/gossipsub.go#L498-L504 # noqa: E501
545-
546-
in_topic_fanout_peers: list[ID] = []
547-
if self.pubsub is not None:
548-
in_topic_fanout_peers = [
549-
peer
550-
for peer in self.fanout[topic]
551-
if peer in self.pubsub.peer_topics[topic]
552-
]
553-
self.fanout[topic] = set(in_topic_fanout_peers)
554-
num_fanout_peers_in_topic = len(self.fanout[topic])
555-
556-
# If |fanout[topic]| < D
557-
if num_fanout_peers_in_topic < self.degree:
558-
# Select D - |fanout[topic]| peers from peers.gossipsub[topic] - fanout[topic] # noqa: E501
559-
selected_peers = self._get_in_topic_gossipsub_peers_from_minus(
560-
topic,
561-
self.degree - num_fanout_peers_in_topic,
562-
self.fanout[topic],
563-
)
564-
# Add the peers to fanout[topic]
565-
self.fanout[topic].update(selected_peers)
606+
self.fanout[topic] = updated_peers
566607

567608
def gossip_heartbeat(self) -> DefaultDict[ID, dict[str, list[str]]]:
568609
peers_to_gossip: DefaultDict[ID, dict[str, list[str]]] = defaultdict(dict)
569-
for topic in self.mesh:
570-
msg_ids = self.mcache.window(topic)
571-
if msg_ids:
572-
if self.pubsub is None:
573-
raise NoPubsubAttached
574-
# Get all pubsub peers in a topic and only add them if they are
575-
# gossipsub peers too
576-
if topic in self.pubsub.peer_topics:
577-
# Select D peers from peers.gossipsub[topic]
578-
peers_to_emit_ihave_to = (
579-
self._get_in_topic_gossipsub_peers_from_minus(
580-
topic, self.degree, self.mesh[topic]
581-
)
582-
)
583610

584-
msg_id_strs = [str(msg_id) for msg_id in msg_ids]
585-
for peer in peers_to_emit_ihave_to:
586-
peers_to_gossip[peer][topic] = msg_id_strs
611+
# Handle mesh topics
612+
for topic in self.mesh:
613+
self._handle_topic_heartbeat(
614+
topic, self.mesh[topic], peers_to_gossip=peers_to_gossip
615+
)
587616

588-
# TODO: Refactor and Dedup. This section is the roughly the same as the above.
589-
# Do the same for fanout, for all topics not already hit in mesh
617+
# Handle fanout topics that aren't in mesh
590618
for topic in self.fanout:
591-
msg_ids = self.mcache.window(topic)
592-
if msg_ids:
593-
if self.pubsub is None:
594-
raise NoPubsubAttached
595-
# Get all pubsub peers in topic and only add if they are
596-
# gossipsub peers also
597-
if topic in self.pubsub.peer_topics:
598-
# Select D peers from peers.gossipsub[topic]
599-
peers_to_emit_ihave_to = (
600-
self._get_in_topic_gossipsub_peers_from_minus(
601-
topic, self.degree, self.fanout[topic]
602-
)
603-
)
604-
msg_id_strs = [str(msg) for msg in msg_ids]
605-
for peer in peers_to_emit_ihave_to:
606-
peers_to_gossip[peer][topic] = msg_id_strs
619+
if topic not in self.mesh:
620+
self._handle_topic_heartbeat(
621+
topic, self.fanout[topic], peers_to_gossip=peers_to_gossip
622+
)
623+
607624
return peers_to_gossip
608625

609626
@staticmethod

newsfragments/678.misc.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactored gossipsub heartbeat logic to use a single helper method `_handle_topic_heartbeat` that handles both fanout and gossip heartbeats.

newsfragments/684.misc.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Uses the `decapsulate` method of the `Multiaddr` class to clean up the observed address.

tests/core/identity/identify/test_identify.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,9 @@ async def test_identify_protocol(security_protocol):
5656
)
5757

5858
# Check observed address
59-
# TODO: use decapsulateCode(protocols('p2p').code)
60-
# when the Multiaddr class will implement it
6159
host_b_addr = host_b.get_addrs()[0]
62-
cleaned_addr = Multiaddr.join(
63-
*(
64-
host_b_addr.split()[:-1]
65-
if str(host_b_addr.split()[-1]).startswith("/p2p/")
66-
else host_b_addr.split()
67-
)
68-
)
60+
host_b_peer_id = host_b.get_id()
61+
cleaned_addr = host_b_addr.decapsulate(Multiaddr(f"/p2p/{host_b_peer_id}"))
6962

7063
logger.debug("observed_addr: %s", Multiaddr(identify_response.observed_addr))
7164
logger.debug("host_b.get_addrs()[0]: %s", host_b.get_addrs()[0])

0 commit comments

Comments
 (0)