Skip to content

Commit dde2df5

Browse files
authored
Merge pull request libp2p#872 from Winter-Soren/enh/871-add-peerscore-scorefunc-signdpeer-gossipsub-1.1
enh/871-add-peerscore-scorefunc-signdpeer-gossipsub-1.1
2 parents 70df135 + 6dfad97 commit dde2df5

18 files changed

+4561
-27
lines changed

libp2p/peer/id.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ def update(self, input: bytes) -> None:
3030
def digest(self) -> bytes:
3131
return self._digest
3232

33-
multihash.FuncReg.register(
34-
IDENTITY_MULTIHASH_CODE, "identity", hash_new=lambda: IdentityHash()
35-
)
33+
# Register identity hash function if FuncReg is available
34+
if hasattr(multihash, "FuncReg"):
35+
multihash.FuncReg.register(
36+
IDENTITY_MULTIHASH_CODE, "identity", hash_new=lambda: IdentityHash()
37+
)
3638

3739

3840
class ID:

libp2p/pubsub/gossipsub.py

Lines changed: 153 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
)
88
import logging
99
import random
10+
import statistics
1011
import time
1112
from typing import (
1213
Any,
@@ -22,14 +23,11 @@
2223
MessageID,
2324
TProtocol,
2425
)
26+
from libp2p.peer.envelope import consume_envelope
2527
from libp2p.peer.id import (
2628
ID,
2729
)
28-
from libp2p.peer.peerinfo import (
29-
PeerInfo,
30-
peer_info_from_bytes,
31-
peer_info_to_bytes,
32-
)
30+
from libp2p.peer.peerinfo import PeerInfo
3331
from libp2p.peer.peerstore import (
3432
PERMANENT_ADDR_TTL,
3533
env_to_send_in_RPC,
@@ -54,6 +52,10 @@
5452
from .pubsub import (
5553
Pubsub,
5654
)
55+
from .score import (
56+
PeerScorer,
57+
ScoreParams,
58+
)
5759
from .utils import (
5860
parse_message_id_safe,
5961
safe_parse_message_id,
@@ -123,6 +125,7 @@ def __init__(
123125
px_peers_count: int = 16,
124126
prune_back_off: int = 60,
125127
unsubscribe_back_off: int = 10,
128+
score_params: ScoreParams | None = None,
126129
max_idontwant_messages: int = 10,
127130
) -> None:
128131
self.protocols = list(protocols)
@@ -164,10 +167,21 @@ def __init__(
164167
self.prune_back_off = prune_back_off
165168
self.unsubscribe_back_off = unsubscribe_back_off
166169

170+
# Scoring
171+
self.scorer: PeerScorer | None = PeerScorer(score_params or ScoreParams())
167172
# Gossipsub v1.2 features
168173
self.dont_send_message_ids = dict()
169174
self.max_idontwant_messages = max_idontwant_messages
170175

176+
def supports_scoring(self, peer_id: ID) -> bool:
177+
"""
178+
Check if peer supports Gossipsub v1.1 scoring features.
179+
180+
:param peer_id: The peer to check
181+
:return: True if peer supports v1.1 features, False otherwise
182+
"""
183+
return self.peer_protocol.get(peer_id) == PROTOCOL_ID_V11
184+
171185
async def run(self) -> None:
172186
self.manager.run_daemon_task(self.heartbeat)
173187
if len(self.direct_peers) > 0:
@@ -313,6 +327,11 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
313327
raise NoPubsubAttached
314328
if peer_id not in self.pubsub.peers:
315329
continue
330+
# Publish gate
331+
if self.scorer is not None and not self.scorer.allow_publish(
332+
peer_id, list(pubsub_msg.topicIDs)
333+
):
334+
continue
316335
stream = self.pubsub.peers[peer_id]
317336

318337
# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
@@ -378,7 +397,17 @@ def _get_peers_to_send(
378397
)
379398
self.fanout[topic] = fanout_peers
380399
gossipsub_peers = fanout_peers
381-
send_to.update(gossipsub_peers)
400+
# Apply gossip score gate
401+
if self.scorer is not None and gossipsub_peers:
402+
allowed = {
403+
p
404+
for p in gossipsub_peers
405+
if self.scorer.allow_gossip(p, [topic])
406+
and not self.scorer.is_graylisted(p, [topic])
407+
}
408+
send_to.update(allowed)
409+
else:
410+
send_to.update(gossipsub_peers)
382411
# Excludes `msg_forwarder` and `origin`
383412
filtered_peers = send_to.difference([msg_forwarder, origin])
384413

@@ -453,8 +482,10 @@ async def leave(self, topic: str) -> None:
453482
return
454483
# Notify the peers in mesh[topic] with a PRUNE(topic) message
455484
for peer in self.mesh[topic]:
456-
await self.emit_prune(topic, peer, self.do_px, True)
485+
# Add backoff BEFORE emitting PRUNE to avoid races where a GRAFT
486+
# could be processed before the backoff is recorded locally.
457487
self._add_back_off(peer, topic, True)
488+
await self.emit_prune(topic, peer, self.do_px, True)
458489

459490
# Forget mesh[topic]
460491
self.mesh.pop(topic, None)
@@ -552,6 +583,10 @@ async def heartbeat(self) -> None:
552583

553584
self.mcache.shift()
554585

586+
# scorer decay step
587+
if self.scorer is not None:
588+
self.scorer.on_heartbeat()
589+
555590
# Prune old IDONTWANT entries to prevent memory leaks
556591
self._prune_idontwant_entries()
557592

@@ -603,6 +638,42 @@ def mesh_heartbeat(
603638
# Emit GRAFT(topic) control message to peer
604639
peers_to_graft[peer].append(topic)
605640

641+
# Opportunistic grafting based on median scores
642+
if self.scorer is not None and num_mesh_peers_in_topic >= self.degree_low:
643+
try:
644+
scorer = self.scorer
645+
# Only consider v1.1 peers for scoring-based opportunistic grafting
646+
v11_mesh_peers = [
647+
p for p in self.mesh[topic] if self.supports_scoring(p)
648+
]
649+
if v11_mesh_peers:
650+
scores = [scorer.score(p, [topic]) for p in v11_mesh_peers]
651+
if scores:
652+
median_score = statistics.median(scores)
653+
# Find higher-than-median peers outside mesh
654+
candidates = self._get_in_topic_gossipsub_peers_from_minus(
655+
topic, self.degree, self.mesh[topic], True
656+
)
657+
# Only consider v1.1 candidates for scoring-based selection
658+
v11_candidates = [
659+
c for c in candidates if self.supports_scoring(c)
660+
]
661+
for cand in v11_candidates:
662+
if scorer.score(cand, [topic]) > median_score:
663+
self.mesh[topic].add(cand)
664+
peers_to_graft[cand].append(topic)
665+
break
666+
except (ValueError, KeyError) as e:
667+
logger.warning(
668+
"Opportunistic grafting failed for topic %s: %s", topic, e
669+
)
670+
except Exception as e:
671+
logger.error(
672+
"Unexpected error in opportunistic grafting for topic %s: %s",
673+
topic,
674+
e,
675+
)
676+
606677
if num_mesh_peers_in_topic > self.degree_high:
607678
# Select |mesh[topic]| - D peers from mesh[topic]
608679
selected_peers = self.select_from_minus(
@@ -611,6 +682,8 @@ def mesh_heartbeat(
611682
for peer in selected_peers:
612683
# Remove peer from mesh[topic]
613684
self.mesh[topic].discard(peer)
685+
if self.scorer is not None:
686+
self.scorer.on_leave_mesh(peer, topic)
614687

615688
# Emit PRUNE(topic) control message to peer
616689
peers_to_prune[peer].append(topic)
@@ -829,18 +902,54 @@ async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None:
829902
continue
830903

831904
try:
832-
peer_info = peer_info_from_bytes(peer.signedPeerRecord)
833-
try:
905+
# Validate signed peer record if provided;
906+
# otherwise try to connect directly
907+
if peer.HasField("signedPeerRecord") and len(peer.signedPeerRecord) > 0:
908+
# Validate envelope signature and freshness via peerstore consume
834909
if self.pubsub is None:
835910
raise NoPubsubAttached
836-
await self.pubsub.host.connect(peer_info)
837-
except Exception as e:
838-
logger.warning(
839-
"failed to connect to px peer %s: %s",
840-
peer_id,
841-
e,
911+
912+
envelope, record = consume_envelope(
913+
peer.signedPeerRecord, "libp2p-peer-record"
842914
)
843-
continue
915+
916+
# Ensure the record matches the advertised peer id
917+
if record.peer_id != peer_id:
918+
raise ValueError("peer id mismatch in PX signed record")
919+
920+
# Store into peerstore and update addrs
921+
self.pubsub.host.get_peerstore().consume_peer_record(
922+
envelope, ttl=7200
923+
)
924+
925+
peer_info = PeerInfo(record.peer_id, record.addrs)
926+
try:
927+
await self.pubsub.host.connect(peer_info)
928+
except Exception as e:
929+
logger.warning(
930+
"failed to connect to px peer %s: %s",
931+
peer_id,
932+
e,
933+
)
934+
continue
935+
else:
936+
# No signed record available; try to use existing connection info
937+
if self.pubsub is None:
938+
raise NoPubsubAttached
939+
940+
try:
941+
# Try to get existing peer info from peerstore
942+
existing_peer_info = self.pubsub.host.get_peerstore().peer_info(
943+
peer_id
944+
)
945+
await self.pubsub.host.connect(existing_peer_info)
946+
except Exception as e:
947+
logger.debug(
948+
"peer %s not found in peerstore or connection failed: %s",
949+
peer_id,
950+
e,
951+
)
952+
continue
844953
except Exception as e:
845954
logger.warning(
846955
"failed to parse peer info from px peer %s: %s",
@@ -932,6 +1041,12 @@ async def handle_graft(
9321041
) -> None:
9331042
topic: str = graft_msg.topicID
9341043

1044+
# Score gate for GRAFT acceptance
1045+
if self.scorer is not None:
1046+
if self.scorer.is_graylisted(sender_peer_id, [topic]):
1047+
await self.emit_prune(topic, sender_peer_id, False, False)
1048+
return
1049+
9351050
# Add peer to mesh for topic
9361051
if topic in self.mesh:
9371052
for direct_peer in self.direct_peers:
@@ -954,6 +1069,8 @@ async def handle_graft(
9541069

9551070
if sender_peer_id not in self.mesh[topic]:
9561071
self.mesh[topic].add(sender_peer_id)
1072+
if self.scorer is not None:
1073+
self.scorer.on_join_mesh(sender_peer_id, topic)
9571074
else:
9581075
# Respond with PRUNE if not subscribed to the topic
9591076
await self.emit_prune(topic, sender_peer_id, self.do_px, False)
@@ -975,9 +1092,16 @@ async def handle_prune(
9751092
self._add_back_off(sender_peer_id, topic, False)
9761093

9771094
self.mesh[topic].discard(sender_peer_id)
1095+
if self.scorer is not None:
1096+
self.scorer.on_leave_mesh(sender_peer_id, topic)
9781097

9791098
if px_peers:
980-
await self._do_px(px_peers)
1099+
# Score-gate PX acceptance
1100+
allow_px = True
1101+
if self.scorer is not None:
1102+
allow_px = self.scorer.allow_px_from(sender_peer_id, [topic])
1103+
if allow_px:
1104+
await self._do_px(px_peers)
9811105

9821106
# RPC emitters
9831107

@@ -1050,11 +1174,18 @@ async def emit_prune(
10501174
for peer in exchange_peers:
10511175
if self.pubsub is None:
10521176
raise NoPubsubAttached
1053-
peer_info = self.pubsub.host.get_peerstore().peer_info(peer)
1054-
signed_peer_record: rpc_pb2.PeerInfo = rpc_pb2.PeerInfo()
1055-
signed_peer_record.peerID = peer.to_bytes()
1056-
signed_peer_record.signedPeerRecord = peer_info_to_bytes(peer_info)
1057-
prune_msg.peers.append(signed_peer_record)
1177+
1178+
# Try to get the signed peer record envelope from peerstore
1179+
envelope = self.pubsub.host.get_peerstore().get_peer_record(peer)
1180+
peer_info_msg: rpc_pb2.PeerInfo = rpc_pb2.PeerInfo()
1181+
peer_info_msg.peerID = peer.to_bytes()
1182+
1183+
if envelope is not None:
1184+
# Use the signed envelope
1185+
peer_info_msg.signedPeerRecord = envelope.marshal_envelope()
1186+
# If no signed record available, include peer without signed record
1187+
1188+
prune_msg.peers.append(peer_info_msg)
10581189

10591190
control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
10601191
control_msg.prune.extend([prune_msg])

libp2p/pubsub/pubsub.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
from libp2p.network.stream.exceptions import (
4949
StreamClosed,
5050
StreamEOF,
51-
StreamReset,
51+
StreamError,
5252
)
5353
from libp2p.peer.id import (
5454
ID,
@@ -439,7 +439,7 @@ async def stream_handler(self, stream: INetStream) -> None:
439439

440440
try:
441441
await self.continuously_read_stream(stream)
442-
except (StreamEOF, StreamReset, ParseError, IncompleteReadError) as error:
442+
except (StreamError, ParseError, IncompleteReadError) as error:
443443
logger.debug(
444444
"fail to read from peer %s, error=%s,"
445445
"closing the stream and remove the peer from record",
@@ -768,6 +768,18 @@ async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
768768
if self._is_msg_seen(msg):
769769
return
770770

771+
try:
772+
scorer = getattr(self.router, "scorer", None)
773+
if scorer is not None:
774+
if not scorer.allow_publish(msg_forwarder, list(msg.topicIDs)):
775+
logger.debug(
776+
"Rejecting message from %s by publish score gate", msg_forwarder
777+
)
778+
return
779+
except Exception:
780+
# Router may not support scoring; ignore gracefully
781+
pass
782+
771783
# Check if signing is required and if so validate the signature
772784
if self.strict_signing:
773785
# Validate the signature of the message
@@ -780,6 +792,14 @@ async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
780792
try:
781793
await self.validate_msg(msg_forwarder, msg)
782794
except ValidationError:
795+
# Scoring: count invalid messages
796+
try:
797+
scorer = getattr(self.router, "scorer", None)
798+
if scorer is not None:
799+
for topic in msg.topicIDs:
800+
scorer.on_invalid_message(msg_forwarder, topic)
801+
except Exception:
802+
pass
783803
logger.debug(
784804
"Topic validation failed: sender %s sent data %s under topic IDs: %s %s:%s", # noqa: E501
785805
msg_forwarder,
@@ -792,6 +812,15 @@ async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
792812

793813
self._mark_msg_seen(msg)
794814

815+
# Scoring: first delivery for this sender per topic
816+
try:
817+
scorer = getattr(self.router, "scorer", None)
818+
if scorer is not None:
819+
for topic in msg.topicIDs:
820+
scorer.on_first_delivery(msg_forwarder, topic)
821+
except Exception:
822+
pass
823+
795824
# reject messages claiming to be from ourselves but not locally published
796825
self_id = self.host.get_id()
797826
if (

0 commit comments

Comments
 (0)